From e0400085bb9628962f4fe32f0c29a0bdcd5aba26 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 29 Oct 2021 21:11:55 +0800 Subject: [PATCH] compensate use toRun --- dtmcli/consts.go | 2 - dtmsvr/trans_saga.go | 71 ++++++++++++++---------------------- test/saga_concurrent_test.go | 2 +- test/saga_test.go | 8 ++-- 4 files changed, 32 insertions(+), 51 deletions(-) diff --git a/dtmcli/consts.go b/dtmcli/consts.go index 5cbb0ef..438deed 100644 --- a/dtmcli/consts.go +++ b/dtmcli/consts.go @@ -11,8 +11,6 @@ const ( StatusFailed = "failed" // StatusAborting status for global trans status. StatusAborting = "aborting" - // StatusDoing status for branch status - StatusDoing = "doing" // BranchTry branch type for TCC BranchTry = "try" diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 2e1c642..dc74237 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -6,7 +6,6 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "gorm.io/gorm/clause" ) type transSagaProcessor struct { @@ -67,7 +66,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) for i := 0; i < n; i++ { b := branches[i] if b.BranchType == dtmcli.BranchAction { - if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing { + if b.Status == dtmcli.StatusPrepared { rsAToStart++ } else if b.Status == dtmcli.StatusFailed { rsAFailed++ @@ -103,55 +102,37 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) }() err = t.execBranch(db, &branches[i]) } - needRollback := func(i int) bool { - br := &branchResults[i] - return !br.started && br.branchType == dtmcli.BranchCompensate && br.status != dtmcli.StatusSucceed && branchResults[i+1].branchType == dtmcli.BranchAction && branchResults[i+1].status != dtmcli.StatusPrepared - } - pickAndRun := func(branchType string) { + pickToRunActions := func() []int { toRun := []int{} for current := 0; current < n; current++ { br := &branchResults[current] - if br.branchType == branchType && branchType == dtmcli.BranchAction { - if (br.status == dtmcli.StatusPrepared || br.status == dtmcli.StatusDoing) && - !br.started && isPreconditionsSucceed(current) { - br.status = dtmcli.StatusDoing - toRun = append(toRun, current) - } - } else if br.branchType == branchType && branchType == dtmcli.BranchCompensate { - if needRollback(current) { - toRun = append(toRun, current) - } - } - } - if branchType == dtmcli.BranchAction && len(toRun) > 0 && csc.Concurrent { // only save doing when concurrent - updates := make([]TransBranch, len(toRun)) - for i, b := range toRun { - updates[i].ID = branches[b].ID - branches[b].Status = dtmcli.StatusDoing - updates[i].Status = dtmcli.StatusDoing + if br.branchType == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared { + toRun = append(toRun, current) } - dbGet().Must().Clauses(clause.OnConflict{ - OnConstraint: "trans_branch_pkey", - DoUpdates: clause.AssignmentColumns([]string{"status"}), - }).Create(updates) - } else if branchType == dtmcli.BranchCompensate && len(toRun) > 0 { - // when not concurrent, then may add one more branch, in case the newest branch state not saved and timeout - if !csc.Concurrent && len(toRun) < n/2 && branchResults[len(toRun)*2+1].status != dtmcli.StatusFailed { - toRun = append(toRun, len(toRun)*2+2) - } - rsCToStart = len(toRun) } - dtmcli.Logf("toRun picked for %s is: %v", branchType, toRun) + dtmcli.Logf("toRun picked for action is: %v", toRun) + return toRun + } + runBranches := func(toRun []int) { for _, b := range toRun { branchResults[b].started = true - if branchType == dtmcli.BranchAction { + if branchResults[b].branchType == dtmcli.BranchAction { rsAStarted++ } go asyncExecBranch(b) } } - processorTimeout := func() bool { - return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second + pickAndRunCompensates := func(toRunActions []int) { + for _, b := range toRunActions { + // these branches may have run. so flag them to status succeed, then run the corresponding compensate + branchResults[b].status = dtmcli.StatusSucceed + } + for i, b := range branchResults { + if b.branchType == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared { + rsCToStart++ + go asyncExecBranch(i) + } + } } waitDoneOnce := func() { select { @@ -177,8 +158,9 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) } } - for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart && !processorTimeout() { - pickAndRun(dtmcli.BranchAction) + for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart { + toRun := pickToRunActions() + runBranches(toRun) if rsADone == rsAStarted { // no branch is running, so break break } @@ -192,12 +174,13 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) t.changeStatus(db, dtmcli.StatusAborting) } if t.Status == dtmcli.StatusAborting { - pickAndRun(dtmcli.BranchCompensate) - for rsCDone != rsCToStart && !processorTimeout() { + toRun := pickToRunActions() + pickAndRunCompensates(toRun) + for rsCDone != rsCToStart { waitDoneOnce() } } - if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsAFailed > 0 && rsCToStart == rsCSucceed { + if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed { t.changeStatus(db, dtmcli.StatusFailed) } return nil diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index 6581598..e71da70 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -64,7 +64,7 @@ func csagaCommittedOngoing(t *testing.T) { examples.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) csaga.Submit() WaitTransProcessed(csaga.Gid) - assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusDoing, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid)) + assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid)) assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(csaga.Gid)) CronTransOnce() diff --git a/test/saga_test.go b/test/saga_test.go index 2c1cb0f..8ceaada 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -9,11 +9,11 @@ import ( ) func TestSaga(t *testing.T) { - // sagaNormal(t) - // sagaCommittedOngoing(t) - // sagaRollback(t) + sagaNormal(t) + sagaCommittedOngoing(t) + sagaRollback(t) sagaRollback2(t) - // sagaTimeout(t) + sagaTimeout(t) } func sagaNormal(t *testing.T) {