From 259d09654388bd70cb7a2bc5a5b6e7b6465c1b45 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 11 Jan 2022 17:35:14 +0800 Subject: [PATCH] saga rollback order keeped --- dtmsvr/trans_type_saga.go | 87 ++++++++++++++++++++++++++------------- test/busi/busi.go | 2 +- test/saga_test.go | 9 ++++ 3 files changed, 69 insertions(+), 29 deletions(-) diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index f2a88fd..b7d15ca 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -47,6 +47,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch { type cSagaCustom struct { Orders map[int][]int `json:"orders"` Concurrent bool `json:"concurrent"` + cOrders map[int][]int } type branchResult struct { @@ -64,9 +65,14 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } n := len(branches) - csc := cSagaCustom{Orders: map[int][]int{}} + csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}} if t.CustomData != "" { dtmimp.MustUnmarshalString(t.CustomData, &csc) + for k, v := range csc.Orders { + for _, b := range v { + csc.cOrders[b] = append(csc.cOrders[b], k) + } + } } if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync t.updateBranchSync = true @@ -83,9 +89,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { rsAFailed++ } } - branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op} + branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op} } - isPreconditionsSucceed := func(current int) bool { + shouldRun := func(current int) bool { // if !csc.Concurrent,then check the branch in previous step is succeed if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { return false @@ -98,7 +104,26 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } return true } - + shouldRollback := func(current int) bool { + rollbacked := func(i int) bool { + // current compensate op rollbacked or related action still prepared + return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared + } + if rollbacked(current) { + return false + } + // if !csc.Concurrent,then check the branch in next step is rollbacked + if !csc.Concurrent && current < n-2 && !rollbacked(current+2) { + return false + } + // if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr + for _, next := range csc.cOrders[current/2] { + if !rollbacked(next) { + return false + } + } + return true + } resultChan := make(chan branchResult, n) asyncExecBranch := func(i int) { var err error @@ -115,14 +140,24 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } pickToRunActions := func() []int { toRun := []int{} - for current := 0; current < n; current++ { + for current := 1; current < n; current += 2 { + br := &branchResults[current] + if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) { + toRun = append(toRun, current) + } + } + logger.Debugf("toRun picked for action is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) + return toRun + } + pickToRunCompensates := func() []int { + toRun := []int{} + for current := n - 2; current >= 0; current -= 2 { br := &branchResults[current] - if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && - br.status == dtmcli.StatusPrepared { + if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) { toRun = append(toRun, current) } } - logger.Debugf("toRun picked for action is: %v", toRun) + logger.Debugf("toRun picked for compensate is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) return toRun } runBranches := func(toRun []int) { @@ -134,20 +169,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { go asyncExecBranch(b) } } - pickAndRunCompensates := func(toRunActions []int) { - for _, b := range toRunActions { - // these branches may have run. so flag them to status succeed, then run the corresponding - // compensate - branchResults[b].status = dtmcli.StatusSucceed - } - for i, b := range branchResults { - if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && - branchResults[i+1].status != dtmcli.StatusPrepared { - rsCToStart++ - go asyncExecBranch(i) - } - } - } waitDoneOnce := func() { select { case r := <-resultChan: @@ -172,7 +193,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } - for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart { + timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) + for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { toRun := pickToRunActions() runBranches(toRun) if rsADone == rsAStarted { // no branch is running, so break @@ -187,13 +209,22 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) { t.changeStatus(dtmcli.StatusAborting) } - if t.Status == dtmcli.StatusAborting { - toRun := pickToRunActions() - pickAndRunCompensates(toRun) - for rsCDone != rsCToStart { - waitDoneOnce() + for i, b := range branchResults { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && + branchResults[i+1].status != dtmcli.StatusPrepared { + rsCToStart++ } } + logger.Debugf("rsCToStart: %d", rsCToStart) + for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { + toRun := pickToRunCompensates() + runBranches(toRun) + if rsCDone == rsCToStart { // no branch is running, so break + break + } + logger.Debugf("rsCDone: %d rsCToStart: %d", rsCDone, rsCToStart) + waitDoneOnce() + } if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed { t.changeStatus(dtmcli.StatusFailed) } diff --git a/test/busi/busi.go b/test/busi/busi.go index 5a41192..3fda2f2 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -24,7 +24,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string } else if res == dtmcli.ResultFailure { return status.New(codes.Aborted, dtmcli.ResultFailure).Err() } else if res == dtmcli.ResultOngoing { - return status.New(codes.Aborted, dtmcli.ResultOngoing).Err() + return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err() } return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err() } diff --git a/test/saga_test.go b/test/saga_test.go index 9b6f0e4..b7afd3a 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -24,6 +24,15 @@ func TestSagaNormal(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } +func TestSagaRollback(t *testing.T) { + saga := genSaga(dtmimp.GetFuncName(), false, true) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) +} + func TestSagaOngoingSucceed(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)