diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 8ffff26..a768fa1 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -7,11 +7,13 @@ package dtmsvr import ( + "errors" "fmt" "math/rand" "runtime/debug" "time" + "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" ) @@ -33,7 +35,7 @@ func CronTransOnce() (gid string) { trans.WaitResult = true branches := GetStore().FindBranches(gid) err := trans.Process(branches) - dtmimp.E2P(err) + dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err) return } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 7f43934..15f8c6b 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -93,12 +93,12 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } shouldRun := func(current int) bool { // if !csc.Concurrent,then check the branch in previous step is succeed - if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { + if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed { return false } // if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr for _, pre := range csc.Orders[current/2] { - if branches[pre*2+1].Status != dtmcli.StatusSucceed { + if branchResults[pre*2+1].status != dtmcli.StatusSucceed { return false } } @@ -107,7 +107,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { shouldRollback := func(current int) bool { rollbacked := func(i int) bool { // current compensate op rollbacked or related action still prepared - return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared + return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared } if rollbacked(current) { return false @@ -192,7 +192,21 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { logger.Debugf("wait once for done") } } - + prepareToCompensate := func() { + toRunActions := pickToRunActions() + for _, b := range toRunActions { + // these branches may have run. so flag them to status succeed, then run the corresponding + // compensate + branchResults[b].status = dtmcli.StatusSucceed + } + for i, b := range branchResults { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && + branchResults[i+1].status != dtmcli.StatusPrepared { + rsCToStart++ + } + } + logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults) + } timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { toRun := pickToRunActions() @@ -209,13 +223,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) { t.changeStatus(dtmcli.StatusAborting) } - for i, b := range branchResults { - if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && - branchResults[i+1].status != dtmcli.StatusPrepared { - rsCToStart++ - } + if t.Status == dtmcli.StatusAborting { + prepareToCompensate() } - logger.Debugf("rsCToStart: %d", rsCToStart) for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { toRun := pickToRunCompensates() runBranches(toRun)