|
|
|
@ -93,12 +93,12 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
} |
|
|
|
shouldRun := func(current int) bool { |
|
|
|
// if !csc.Concurrent,then check the branch in previous step is succeed
|
|
|
|
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { |
|
|
|
if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed { |
|
|
|
return false |
|
|
|
} |
|
|
|
// if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr
|
|
|
|
for _, pre := range csc.Orders[current/2] { |
|
|
|
if branches[pre*2+1].Status != dtmcli.StatusSucceed { |
|
|
|
if branchResults[pre*2+1].status != dtmcli.StatusSucceed { |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
@ -107,7 +107,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
shouldRollback := func(current int) bool { |
|
|
|
rollbacked := func(i int) bool { |
|
|
|
// current compensate op rollbacked or related action still prepared
|
|
|
|
return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared |
|
|
|
return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared |
|
|
|
} |
|
|
|
if rollbacked(current) { |
|
|
|
return false |
|
|
|
@ -192,7 +192,21 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
logger.Debugf("wait once for done") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
prepareToCompensate := func() { |
|
|
|
toRunActions := pickToRunActions() |
|
|
|
for _, b := range toRunActions { |
|
|
|
// these branches may have run. so flag them to status succeed, then run the corresponding
|
|
|
|
// compensate
|
|
|
|
branchResults[b].status = dtmcli.StatusSucceed |
|
|
|
} |
|
|
|
for i, b := range branchResults { |
|
|
|
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && |
|
|
|
branchResults[i+1].status != dtmcli.StatusPrepared { |
|
|
|
rsCToStart++ |
|
|
|
} |
|
|
|
} |
|
|
|
logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults) |
|
|
|
} |
|
|
|
timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) |
|
|
|
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { |
|
|
|
toRun := pickToRunActions() |
|
|
|
@ -209,13 +223,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) { |
|
|
|
t.changeStatus(dtmcli.StatusAborting) |
|
|
|
} |
|
|
|
for i, b := range branchResults { |
|
|
|
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && |
|
|
|
branchResults[i+1].status != dtmcli.StatusPrepared { |
|
|
|
rsCToStart++ |
|
|
|
} |
|
|
|
if t.Status == dtmcli.StatusAborting { |
|
|
|
prepareToCompensate() |
|
|
|
} |
|
|
|
logger.Debugf("rsCToStart: %d", rsCToStart) |
|
|
|
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { |
|
|
|
toRun := pickToRunCompensates() |
|
|
|
runBranches(toRun) |
|
|
|
|