Browse Source

saga rollback order keeped

pull/175/head
yedf2 4 years ago
parent
commit
259d096543
  1. 87
      dtmsvr/trans_type_saga.go
  2. 2
      test/busi/busi.go
  3. 9
      test/saga_test.go

87
dtmsvr/trans_type_saga.go

@ -47,6 +47,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch {
type cSagaCustom struct {
Orders map[int][]int `json:"orders"`
Concurrent bool `json:"concurrent"`
cOrders map[int][]int
}
type branchResult struct {
@ -64,9 +65,14 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
n := len(branches)
csc := cSagaCustom{Orders: map[int][]int{}}
csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}}
if t.CustomData != "" {
dtmimp.MustUnmarshalString(t.CustomData, &csc)
for k, v := range csc.Orders {
for _, b := range v {
csc.cOrders[b] = append(csc.cOrders[b], k)
}
}
}
if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync
t.updateBranchSync = true
@ -83,9 +89,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
rsAFailed++
}
}
branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op}
branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
}
isPreconditionsSucceed := func(current int) bool {
shouldRun := func(current int) bool {
// if !csc.Concurrent,then check the branch in previous step is succeed
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed {
return false
@ -98,7 +104,26 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
return true
}
shouldRollback := func(current int) bool {
rollbacked := func(i int) bool {
// current compensate op rollbacked or related action still prepared
return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared
}
if rollbacked(current) {
return false
}
// if !csc.Concurrent,then check the branch in next step is rollbacked
if !csc.Concurrent && current < n-2 && !rollbacked(current+2) {
return false
}
// if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr
for _, next := range csc.cOrders[current/2] {
if !rollbacked(next) {
return false
}
}
return true
}
resultChan := make(chan branchResult, n)
asyncExecBranch := func(i int) {
var err error
@ -115,14 +140,24 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
pickToRunActions := func() []int {
toRun := []int{}
for current := 0; current < n; current++ {
for current := 1; current < n; current += 2 {
br := &branchResults[current]
if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) {
toRun = append(toRun, current)
}
}
logger.Debugf("toRun picked for action is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders)
return toRun
}
pickToRunCompensates := func() []int {
toRun := []int{}
for current := n - 2; current >= 0; current -= 2 {
br := &branchResults[current]
if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) &&
br.status == dtmcli.StatusPrepared {
if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) {
toRun = append(toRun, current)
}
}
logger.Debugf("toRun picked for action is: %v", toRun)
logger.Debugf("toRun picked for compensate is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders)
return toRun
}
runBranches := func(toRun []int) {
@ -134,20 +169,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
go asyncExecBranch(b)
}
}
pickAndRunCompensates := func(toRunActions []int) {
for _, b := range toRunActions {
// these branches may have run. so flag them to status succeed, then run the corresponding
// compensate
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
go asyncExecBranch(i)
}
}
}
waitDoneOnce := func() {
select {
case r := <-resultChan:
@ -172,7 +193,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
}
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart {
timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second)
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 {
toRun := pickToRunActions()
runBranches(toRun)
if rsADone == rsAStarted { // no branch is running, so break
@ -187,13 +209,22 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) {
t.changeStatus(dtmcli.StatusAborting)
}
if t.Status == dtmcli.StatusAborting {
toRun := pickToRunActions()
pickAndRunCompensates(toRun)
for rsCDone != rsCToStart {
waitDoneOnce()
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
}
}
logger.Debugf("rsCToStart: %d", rsCToStart)
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting {
toRun := pickToRunCompensates()
runBranches(toRun)
if rsCDone == rsCToStart { // no branch is running, so break
break
}
logger.Debugf("rsCDone: %d rsCToStart: %d", rsCDone, rsCToStart)
waitDoneOnce()
}
if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed {
t.changeStatus(dtmcli.StatusFailed)
}

2
test/busi/busi.go

@ -24,7 +24,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string
} else if res == dtmcli.ResultFailure {
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if res == dtmcli.ResultOngoing {
return status.New(codes.Aborted, dtmcli.ResultOngoing).Err()
return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err()
}
return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err()
}

9
test/saga_test.go

@ -24,6 +24,15 @@ func TestSagaNormal(t *testing.T) {
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
func TestSagaRollback(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, true)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
}
func TestSagaOngoingSucceed(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)

Loading…
Cancel
Save