|
|
|
@ -47,6 +47,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch { |
|
|
|
type cSagaCustom struct { |
|
|
|
Orders map[int][]int `json:"orders"` |
|
|
|
Concurrent bool `json:"concurrent"` |
|
|
|
cOrders map[int][]int |
|
|
|
} |
|
|
|
|
|
|
|
type branchResult struct { |
|
|
|
@ -64,9 +65,14 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
} |
|
|
|
n := len(branches) |
|
|
|
|
|
|
|
csc := cSagaCustom{Orders: map[int][]int{}} |
|
|
|
csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}} |
|
|
|
if t.CustomData != "" { |
|
|
|
dtmimp.MustUnmarshalString(t.CustomData, &csc) |
|
|
|
for k, v := range csc.Orders { |
|
|
|
for _, b := range v { |
|
|
|
csc.cOrders[b] = append(csc.cOrders[b], k) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync
|
|
|
|
t.updateBranchSync = true |
|
|
|
@ -83,22 +89,41 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
rsAFailed++ |
|
|
|
} |
|
|
|
} |
|
|
|
branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op} |
|
|
|
branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op} |
|
|
|
} |
|
|
|
isPreconditionsSucceed := func(current int) bool { |
|
|
|
shouldRun := func(current int) bool { |
|
|
|
// if !csc.Concurrent,then check the branch in previous step is succeed
|
|
|
|
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { |
|
|
|
if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed { |
|
|
|
return false |
|
|
|
} |
|
|
|
// if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr
|
|
|
|
for _, pre := range csc.Orders[current/2] { |
|
|
|
if branches[pre*2+1].Status != dtmcli.StatusSucceed { |
|
|
|
if branchResults[pre*2+1].status != dtmcli.StatusSucceed { |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
return true |
|
|
|
} |
|
|
|
shouldRollback := func(current int) bool { |
|
|
|
rollbacked := func(i int) bool { |
|
|
|
// current compensate op rollbacked or related action still prepared
|
|
|
|
return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared |
|
|
|
} |
|
|
|
if rollbacked(current) { |
|
|
|
return false |
|
|
|
} |
|
|
|
// if !csc.Concurrent,then check the branch in next step is rollbacked
|
|
|
|
if !csc.Concurrent && current < n-2 && !rollbacked(current+2) { |
|
|
|
return false |
|
|
|
} |
|
|
|
// if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr
|
|
|
|
for _, next := range csc.cOrders[current/2] { |
|
|
|
if !rollbacked(2 * next) { |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
resultChan := make(chan branchResult, n) |
|
|
|
asyncExecBranch := func(i int) { |
|
|
|
var err error |
|
|
|
@ -115,14 +140,24 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
} |
|
|
|
pickToRunActions := func() []int { |
|
|
|
toRun := []int{} |
|
|
|
for current := 0; current < n; current++ { |
|
|
|
for current := 1; current < n; current += 2 { |
|
|
|
br := &branchResults[current] |
|
|
|
if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && |
|
|
|
br.status == dtmcli.StatusPrepared { |
|
|
|
if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) { |
|
|
|
toRun = append(toRun, current) |
|
|
|
} |
|
|
|
} |
|
|
|
logger.Debugf("toRun picked for action is: %v", toRun) |
|
|
|
logger.Debugf("toRun picked for action is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) |
|
|
|
return toRun |
|
|
|
} |
|
|
|
pickToRunCompensates := func() []int { |
|
|
|
toRun := []int{} |
|
|
|
for current := n - 2; current >= 0; current -= 2 { |
|
|
|
br := &branchResults[current] |
|
|
|
if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) { |
|
|
|
toRun = append(toRun, current) |
|
|
|
} |
|
|
|
} |
|
|
|
logger.Debugf("toRun picked for compensate is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) |
|
|
|
return toRun |
|
|
|
} |
|
|
|
runBranches := func(toRun []int) { |
|
|
|
@ -134,20 +169,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
go asyncExecBranch(b) |
|
|
|
} |
|
|
|
} |
|
|
|
pickAndRunCompensates := func(toRunActions []int) { |
|
|
|
for _, b := range toRunActions { |
|
|
|
// these branches may have run. so flag them to status succeed, then run the corresponding
|
|
|
|
// compensate
|
|
|
|
branchResults[b].status = dtmcli.StatusSucceed |
|
|
|
} |
|
|
|
for i, b := range branchResults { |
|
|
|
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && |
|
|
|
branchResults[i+1].status != dtmcli.StatusPrepared { |
|
|
|
rsCToStart++ |
|
|
|
go asyncExecBranch(i) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
waitDoneOnce := func() { |
|
|
|
select { |
|
|
|
case r := <-resultChan: |
|
|
|
@ -171,8 +192,23 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
logger.Debugf("wait once for done") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart { |
|
|
|
prepareToCompensate := func() { |
|
|
|
toRunActions := pickToRunActions() |
|
|
|
for _, b := range toRunActions { |
|
|
|
// these branches may have run. so flag them to status succeed, then run the corresponding
|
|
|
|
// compensate
|
|
|
|
branchResults[b].status = dtmcli.StatusSucceed |
|
|
|
} |
|
|
|
for i, b := range branchResults { |
|
|
|
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && |
|
|
|
branchResults[i+1].status != dtmcli.StatusPrepared { |
|
|
|
rsCToStart++ |
|
|
|
} |
|
|
|
} |
|
|
|
logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults) |
|
|
|
} |
|
|
|
timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) |
|
|
|
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { |
|
|
|
toRun := pickToRunActions() |
|
|
|
runBranches(toRun) |
|
|
|
if rsADone == rsAStarted { // no branch is running, so break
|
|
|
|
@ -188,11 +224,16 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { |
|
|
|
t.changeStatus(dtmcli.StatusAborting) |
|
|
|
} |
|
|
|
if t.Status == dtmcli.StatusAborting { |
|
|
|
toRun := pickToRunActions() |
|
|
|
pickAndRunCompensates(toRun) |
|
|
|
for rsCDone != rsCToStart { |
|
|
|
waitDoneOnce() |
|
|
|
prepareToCompensate() |
|
|
|
} |
|
|
|
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { |
|
|
|
toRun := pickToRunCompensates() |
|
|
|
runBranches(toRun) |
|
|
|
if rsCDone == rsCToStart { // no branch is running, so break
|
|
|
|
break |
|
|
|
} |
|
|
|
logger.Debugf("rsCDone: %d rsCToStart: %d", rsCDone, rsCToStart) |
|
|
|
waitDoneOnce() |
|
|
|
} |
|
|
|
if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed { |
|
|
|
t.changeStatus(dtmcli.StatusFailed) |
|
|
|
|