diff --git a/client/dtmcli/dtmimp/trans_base.go b/client/dtmcli/dtmimp/trans_base.go index cfc0f6b..a4cb8a7 100644 --- a/client/dtmcli/dtmimp/trans_base.go +++ b/client/dtmcli/dtmimp/trans_base.go @@ -49,6 +49,8 @@ type TransOptions struct { RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg + RetryLimit int64 `json:"retry_limit,omitempty" gorm:"-"` // for trans type: saga + RetryCount int64 `json:"retry_count,omitempty" gorm:"-"` // for trans type: saga } // TransBase base for all trans @@ -87,6 +89,11 @@ func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) { t.RequestTimeout = timeout } +// WithRetryLimit defines global trans retry limit +func (t *TransBase) WithRetryLimit(retryLimit int64) { + t.RetryLimit = retryLimit +} + // TransBaseFromQuery construct transaction info from request func TransBaseFromQuery(qs url.Values) *TransBase { return NewTransBase(EscapeGet(qs, "gid"), EscapeGet(qs, "trans_type"), EscapeGet(qs, "dtm"), EscapeGet(qs, "branch_id")) diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 4554a81..e445b97 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -171,6 +171,20 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { br := &branchResults[r.index] br.status = r.status if r.op == dtmimp.OpAction { + // if t.RetryLimit > 0, should check the retry count + if t.RetryLimit > 0 && (r.status == dtmcli.StatusPrepared || r.status == dtmcli.StatusSubmitted) { + // if t.RetryCount < t.RetryLimit, branch will be retried util RetryLimit = 0 + if t.RetryCount < t.RetryLimit { + t.RetryCount++ + logger.Infof("Retrying branch %s %s %s, t.RetryLimit: %d, t.RetryCount: %d", + branches[r.index].BranchID, branches[r.index].Op, branches[r.index].URL, t.RetryLimit, t.RetryCount) + go asyncExecBranch(r.index) + break + } + // if t.RetryCount = t.RetryLimit, trans will be aborted + t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("RetryCount is greater than RetryLimit, RetryLimit: %v", t.RetryLimit))) + break + } rsADone++ if r.status == dtmcli.StatusFailed { rsAFailed++ diff --git a/test/busi/base_http.go b/test/busi/base_http.go index dcf5bf7..1123585 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -211,4 +211,14 @@ func BaseAddRoute(app *gin.Engine) { } return nil })) + + retryNums := 3 + app.POST(BusiAPI+"/TransInRetry", dtmutil.WrapHandler(func(c *gin.Context) interface{} { + if retryNums != 0 { + retryNums-- + return dtmcli.ErrOngoing + } + retryNums = 3 + return nil + })) } diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 4b32933..3fedf05 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -137,3 +137,34 @@ func TestSagaHeadersYes1(t *testing.T) { cronTransOnce(t, gidYes) assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) } + +func TestSagaGlobalTransWithRetryLimitYes(t *testing.T) { + gid := dtmimp.GetFuncName() + saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) + req := busi.GenReqHTTP(30, false, false) + saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req) + saga.Add(busi.Busi+"/TransInRetry", busi.Busi+"/TransInRevert", &req) + saga.WaitResult = true + saga.WithRetryLimit(3) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) +} + +func TestSagaGlobalTransWithRetryLimitNo(t *testing.T) { + gid := dtmimp.GetFuncName() + saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) + req := busi.GenReqHTTP(30, false, false) + saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req) + saga.Add(busi.Busi+"/TransInRetry", busi.Busi+"/TransInRevert", &req) + saga.WaitResult = true + saga.WithRetryLimit(1) + err := saga.Submit() + assert.NotNil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) + assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusPrepared}, getBranchesStatus(saga.Gid)) + assert.Equal(t, `RetryCount is greater than RetryLimit, RetryLimit: 1`, getTrans(gid).RollbackReason) +}