diff --git a/dtmgrpc/type.go b/dtmgrpc/type.go index f92d830..14244e3 100644 --- a/dtmgrpc/type.go +++ b/dtmgrpc/type.go @@ -8,6 +8,7 @@ package dtmgrpc import ( context "context" + "fmt" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -38,7 +39,8 @@ func GrpcError2DtmError(err error) error { if st.Message() == dtmcli.ResultOngoing { return dtmcli.ErrOngoing } - return dtmcli.ErrFailure + + return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure) } else if ok && st.Code() == codes.FailedPrecondition { return dtmcli.ErrOngoing } diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index 43eed98..dd95290 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -68,6 +68,7 @@ type TransBranchStore struct { Status string `json:"status,omitempty"` FinishTime *time.Time `json:"finish_time,omitempty"` RollbackTime *time.Time `json:"rollback_time,omitempty"` + Error error `json:"-" gorm:"-"` } // TableName TableName diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index a9d497b..fa1e51e 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -190,6 +190,7 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) { if err == nil { return dtmcli.StatusSucceed, nil } else if t.TransType == "saga" && branch.Op == dtmimp.OpAction && errors.Is(err, dtmcli.ErrFailure) { + branch.Error = fmt.Errorf("url:%s return failed: %w", branch.URL, err) return dtmcli.StatusFailed, nil } else if errors.Is(err, dtmcli.ErrOngoing) { return "", dtmcli.ErrOngoing diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 2009ae4..9117515 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -49,13 +49,14 @@ type branchResult struct { status string started bool op string + err error } func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { // when saga tasks is fetched, it always need to process logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout()) if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { - t.changeStatus(dtmcli.StatusAborting) + t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail))) } n := len(branches) @@ -73,6 +74,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } // resultStats var rsAToStart, rsAStarted, rsADone, rsAFailed, rsASucceed, rsCToStart, rsCDone, rsCSucceed int + var failureError error branchResults := make([]branchResult, n) // save the branch result for i := 0; i < n; i++ { b := branches[i] @@ -125,7 +127,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { if x := recover(); x != nil { err = dtmimp.AsError(x) } - resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op} + resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, err: branches[i].Error} if err != nil && !errors.Is(err, dtmcli.ErrOngoing) { logger.Errorf("exec branch %s %s %s error: %v", branches[i].BranchID, branches[i].Op, branches[i].URL, err) } @@ -172,6 +174,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { rsADone++ if r.status == dtmcli.StatusFailed { rsAFailed++ + failureError = r.err } else if r.status == dtmcli.StatusSucceed { rsASucceed++ } @@ -219,8 +222,11 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { t.changeStatus(dtmcli.StatusSucceed) return nil } - if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) { - t.changeStatus(dtmcli.StatusAborting) + if t.Status == dtmcli.StatusSubmitted && rsAFailed > 0 { + t.changeStatus(dtmcli.StatusAborting, withRollbackReason(failureError.Error())) + } + if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { + t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail))) } if t.Status == dtmcli.StatusAborting { prepareToCompensate() diff --git a/test/busi/base_types.go b/test/busi/base_types.go index 541c2cb..0d4171d 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -142,6 +142,7 @@ type mainSwitchType struct { QueryPreparedResult AutoEmptyString NextResult AutoEmptyString JrpcResult AutoEmptyString + FailureReason AutoEmptyString } // MainSwitch controls busi success or fail diff --git a/test/busi/busi.go b/test/busi/busi.go index 7d24985..81c9c9e 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -34,7 +34,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string if res == dtmcli.ResultSuccess { return nil } else if res == dtmcli.ResultFailure { - return status.New(codes.Aborted, dtmcli.ResultFailure).Err() + return status.New(codes.Aborted, fmt.Sprintf("reason:%s", MainSwitch.FailureReason.Fetch())).Err() } else if res == dtmcli.ResultOngoing { return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err() } @@ -48,6 +48,9 @@ func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi if res == "ERROR" { return errors.New("ERROR from user") } + if res == dtmimp.ResultFailure { + return fmt.Errorf("reason:%s. %w", MainSwitch.FailureReason.Fetch(), dtmimp.ErrFailure) + } return dtmcli.String2DtmError(res) } diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index 94f1732..9a98d06 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -28,6 +28,7 @@ func TestSagaGrpcNormal(t *testing.T) { func TestSagaGrpcRollback(t *testing.T) { gid := dtmimp.GetFuncName() saga := genSagaGrpc(gid, false, true) + busi.MainSwitch.FailureReason.SetOnce("Insufficient balance") busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing) saga.Submit() waitTransProcessed(saga.Gid) @@ -35,6 +36,7 @@ func TestSagaGrpcRollback(t *testing.T) { cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "url:localhost:58081/busi.Busi/TransIn return failed: reason:Insufficient balance. FAILURE", getTrans(saga.Gid).RollbackReason) } func TestSagaGrpcCurrent(t *testing.T) { diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 6134ee7..557ab1d 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -56,6 +56,8 @@ func TestSagaOptionsTimeout(t *testing.T) { cronTransOnceForwardNow(t, gid, 3600) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) + assert.Regexp(t, `^Timeout after \d+ seconds$`, getTrans(gid).RollbackReason) + } func TestSagaGlobalTransWithRequestTimeout(t *testing.T) { diff --git a/test/saga_test.go b/test/saga_test.go index d7b9ad7..f536a4c 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -22,15 +22,18 @@ func TestSagaNormal(t *testing.T) { waitTransProcessed(saga.Gid) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + assert.Equal(t, "", getTrans(saga.Gid).RollbackReason) } func TestSagaRollback(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, true) + busi.MainSwitch.FailureReason.SetOnce("Insufficient balance") err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) + assert.Equal(t, "url:http://localhost:8081/api/busi/TransIn return failed: {\"error\":\"reason:Insufficient balance. FAILURE\"}. FAILURE", getTrans(saga.Gid).RollbackReason) } func TestSagaOngoingSucceed(t *testing.T) { @@ -44,6 +47,7 @@ func TestSagaOngoingSucceed(t *testing.T) { cronTransOnce(t, gid) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + assert.Equal(t, "", getTrans(saga.Gid).RollbackReason) } func TestSagaFailed(t *testing.T) {