From ebd8b70dc9b72b3265a2b1023bb941d94e43f385 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 29 Oct 2021 16:39:31 +0800 Subject: [PATCH] saga test passed --- dtmsvr/trans_saga.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 4925851..1651020 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -50,7 +50,7 @@ type branchResult struct { } func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error { - if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { + if !t.needProcess() { return nil } if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { @@ -63,7 +63,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) dtmcli.MustUnmarshalString(t.CustomData, &csc) } // resultStats - var rsActionToStart, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int + var rsActionToStart, rsActionStarted, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int branchResults := make([]branchResult, n) // save the branch result for i := 0; i < n; i++ { b := branches[i] @@ -77,10 +77,10 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType} } isPreconditionsSucceed := func(current int) bool { - if !csc.Concurrent && branches[current-1].Status != dtmcli.StatusSucceed { + if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { return false } - for _, pre := range csc.Orders[current*2+1] { + for _, pre := range csc.Orders[current/2] { if branches[pre*2+1].Status != dtmcli.StatusSucceed { return false } @@ -140,8 +140,12 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) } rsCompensateToStart = len(toRun) } + dtmcli.Logf("toRun picked for %s is: %v", branchType, toRun) for _, b := range toRun { branchResults[b].started = true + if branchType == dtmcli.BranchAction { + rsActionStarted++ + } go asyncExecBranch(b) } } @@ -174,6 +178,9 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() { pickAndRun(dtmcli.BranchAction) + if rsActionDone == rsActionStarted { // no branch is running, so break + break + } waitOnceForDone() } if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed {