Browse Source

Merge pull request #322 from xyctruth/main

saga wait_result submit, sync get branch error
pull/331/head
yedf2 4 years ago
committed by GitHub
parent
commit
731fb0299d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dtmgrpc/type.go
  2. 4
      dtmsvr/trans_process.go
  3. 16
      test/saga_grpc_test.go
  4. 3
      test/saga_options_test.go
  5. 2
      test/saga_test.go

7
dtmgrpc/type.go

@ -8,6 +8,7 @@ package dtmgrpc
import (
context "context"
"errors"
"fmt"
"github.com/dtm-labs/dtm/dtmcli"
@ -23,9 +24,9 @@ import (
// DtmError2GrpcError translate dtm error to grpc error
func DtmError2GrpcError(res interface{}) error {
e, ok := res.(error)
if ok && e == dtmimp.ErrFailure {
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if ok && e == dtmimp.ErrOngoing {
if ok && errors.Is(e, dtmimp.ErrFailure) {
return status.New(codes.Aborted, e.Error()).Err()
} else if ok && errors.Is(e, dtmimp.ErrOngoing) {
return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err()
}
return e

4
dtmsvr/trans_process.go

@ -45,7 +45,11 @@ func (t *TransGlobal) process(branches []TransBranch) error {
if err != nil {
return err
}
if submitting && t.Status != dtmcli.StatusSucceed {
if t.RollbackReason != "" {
return fmt.Errorf("%s. %w", t.RollbackReason, dtmcli.ErrFailure)
}
return fmt.Errorf("wait result not return success: %w", dtmcli.ErrFailure)
}
return nil

16
test/saga_grpc_test.go

@ -36,7 +36,7 @@ func TestSagaGrpcRollback(t *testing.T) {
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, "url:localhost:58081/busi.Busi/TransIn return failed: reason:Insufficient balance. FAILURE", getTrans(saga.Gid).RollbackReason)
assert.Contains(t, getTrans(saga.Gid).RollbackReason, "Insufficient balance")
}
func TestSagaGrpcCurrent(t *testing.T) {
@ -123,6 +123,20 @@ func TestSagaGrpcWithGlobalTransRequestTimeout(t *testing.T) {
waitTransProcessed(gid)
}
func TestSagaGrpcOptionsRollbackWait(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSagaGrpc(gid, false, true)
busi.MainSwitch.FailureReason.SetOnce("Insufficient balance")
saga.WaitResult = true
err := saga.Submit()
assert.Error(t, err)
assert.Contains(t, err.Error(), "Insufficient balance")
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Contains(t, getTrans(saga.Gid).RollbackReason, "Insufficient balance")
}
func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)

3
test/saga_options_test.go

@ -98,12 +98,15 @@ func TestSagaOptionsCommittedOngoingWait(t *testing.T) {
func TestSagaOptionsRollbackWait(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, true)
busi.MainSwitch.FailureReason.SetOnce("Insufficient balance")
saga.WaitResult = true
err := saga.Submit()
assert.Error(t, err)
assert.Contains(t, err.Error(), "Insufficient balance")
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Contains(t, getTrans(saga.Gid).RollbackReason, "Insufficient balance")
}
func TestSagaPassthroughHeadersYes(t *testing.T) {

2
test/saga_test.go

@ -33,7 +33,7 @@ func TestSagaRollback(t *testing.T) {
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, "url:http://localhost:8081/api/busi/TransIn return failed: {\"error\":\"reason:Insufficient balance. FAILURE\"}. FAILURE", getTrans(saga.Gid).RollbackReason)
assert.Contains(t, getTrans(saga.Gid).RollbackReason, "Insufficient balance")
}
func TestSagaOngoingSucceed(t *testing.T) {

Loading…
Cancel
Save