|
|
|
@ -50,7 +50,7 @@ type branchResult struct { |
|
|
|
} |
|
|
|
|
|
|
|
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error { |
|
|
|
if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { |
|
|
|
if !t.needProcess() { |
|
|
|
return nil |
|
|
|
} |
|
|
|
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { |
|
|
|
@ -63,7 +63,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) |
|
|
|
dtmcli.MustUnmarshalString(t.CustomData, &csc) |
|
|
|
} |
|
|
|
// resultStats
|
|
|
|
var rsActionToStart, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int |
|
|
|
var rsActionToStart, rsActionStarted, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int |
|
|
|
branchResults := make([]branchResult, n) // save the branch result
|
|
|
|
for i := 0; i < n; i++ { |
|
|
|
b := branches[i] |
|
|
|
@ -77,10 +77,10 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) |
|
|
|
branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType} |
|
|
|
} |
|
|
|
isPreconditionsSucceed := func(current int) bool { |
|
|
|
if !csc.Concurrent && branches[current-1].Status != dtmcli.StatusSucceed { |
|
|
|
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { |
|
|
|
return false |
|
|
|
} |
|
|
|
for _, pre := range csc.Orders[current*2+1] { |
|
|
|
for _, pre := range csc.Orders[current/2] { |
|
|
|
if branches[pre*2+1].Status != dtmcli.StatusSucceed { |
|
|
|
return false |
|
|
|
} |
|
|
|
@ -140,8 +140,12 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) |
|
|
|
} |
|
|
|
rsCompensateToStart = len(toRun) |
|
|
|
} |
|
|
|
dtmcli.Logf("toRun picked for %s is: %v", branchType, toRun) |
|
|
|
for _, b := range toRun { |
|
|
|
branchResults[b].started = true |
|
|
|
if branchType == dtmcli.BranchAction { |
|
|
|
rsActionStarted++ |
|
|
|
} |
|
|
|
go asyncExecBranch(b) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -174,6 +178,9 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) |
|
|
|
|
|
|
|
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() { |
|
|
|
pickAndRun(dtmcli.BranchAction) |
|
|
|
if rsActionDone == rsActionStarted { // no branch is running, so break
|
|
|
|
break |
|
|
|
} |
|
|
|
waitOnceForDone() |
|
|
|
} |
|
|
|
if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed { |
|
|
|
|