Browse Source

merged

pull/343/head
yedf2 4 years ago
parent
commit
6991290fa2
  1. 7
      client/dtmcli/dtmimp/trans_base.go
  2. 14
      dtmsvr/trans_type_saga.go
  3. 10
      test/busi/base_http.go
  4. 31
      test/saga_options_test.go

7
client/dtmcli/dtmimp/trans_base.go

@ -49,6 +49,8 @@ type TransOptions struct {
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
RetryLimit int64 `json:"retry_limit,omitempty" gorm:"-"` // for trans type: saga
RetryCount int64 `json:"retry_count,omitempty" gorm:"-"` // for trans type: saga
}
// TransBase base for all trans
@ -87,6 +89,11 @@ func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) {
t.RequestTimeout = timeout
}
// WithRetryLimit defines global trans retry limit
func (t *TransBase) WithRetryLimit(retryLimit int64) {
t.RetryLimit = retryLimit
}
// TransBaseFromQuery construct transaction info from request
func TransBaseFromQuery(qs url.Values) *TransBase {
return NewTransBase(EscapeGet(qs, "gid"), EscapeGet(qs, "trans_type"), EscapeGet(qs, "dtm"), EscapeGet(qs, "branch_id"))

14
dtmsvr/trans_type_saga.go

@ -171,6 +171,20 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
br := &branchResults[r.index]
br.status = r.status
if r.op == dtmimp.OpAction {
// if t.RetryLimit > 0, should check the retry count
if t.RetryLimit > 0 && (r.status == dtmcli.StatusPrepared || r.status == dtmcli.StatusSubmitted) {
// if t.RetryCount < t.RetryLimit, branch will be retried util RetryLimit = 0
if t.RetryCount < t.RetryLimit {
t.RetryCount++
logger.Infof("Retrying branch %s %s %s, t.RetryLimit: %d, t.RetryCount: %d",
branches[r.index].BranchID, branches[r.index].Op, branches[r.index].URL, t.RetryLimit, t.RetryCount)
go asyncExecBranch(r.index)
break
}
// if t.RetryCount = t.RetryLimit, trans will be aborted
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("RetryCount is greater than RetryLimit, RetryLimit: %v", t.RetryLimit)))
break
}
rsADone++
if r.status == dtmcli.StatusFailed {
rsAFailed++

10
test/busi/base_http.go

@ -211,4 +211,14 @@ func BaseAddRoute(app *gin.Engine) {
}
return nil
}))
retryNums := 3
app.POST(BusiAPI+"/TransInRetry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
if retryNums != 0 {
retryNums--
return dtmcli.ErrOngoing
}
retryNums = 3
return nil
}))
}

31
test/saga_options_test.go

@ -137,3 +137,34 @@ func TestSagaHeadersYes1(t *testing.T) {
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
func TestSagaGlobalTransWithRetryLimitYes(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
req := busi.GenReqHTTP(30, false, false)
saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req)
saga.Add(busi.Busi+"/TransInRetry", busi.Busi+"/TransInRevert", &req)
saga.WaitResult = true
saga.WithRetryLimit(3)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
func TestSagaGlobalTransWithRetryLimitNo(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
req := busi.GenReqHTTP(30, false, false)
saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req)
saga.Add(busi.Busi+"/TransInRetry", busi.Busi+"/TransInRevert", &req)
saga.WaitResult = true
saga.WithRetryLimit(1)
err := saga.Submit()
assert.NotNil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusPrepared}, getBranchesStatus(saga.Gid))
assert.Equal(t, `RetryCount is greater than RetryLimit, RetryLimit: 1`, getTrans(gid).RollbackReason)
}

Loading…
Cancel
Save