Browse Source

compensate use toRun

pull/44/head
yedf2 4 years ago
parent
commit
e0400085bb
  1. 2
      dtmcli/consts.go
  2. 71
      dtmsvr/trans_saga.go
  3. 2
      test/saga_concurrent_test.go
  4. 8
      test/saga_test.go

2
dtmcli/consts.go

@ -11,8 +11,6 @@ const (
StatusFailed = "failed"
// StatusAborting status for global trans status.
StatusAborting = "aborting"
// StatusDoing status for branch status
StatusDoing = "doing"
// BranchTry branch type for TCC
BranchTry = "try"

71
dtmsvr/trans_saga.go

@ -6,7 +6,6 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"gorm.io/gorm/clause"
)
type transSagaProcessor struct {
@ -67,7 +66,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
for i := 0; i < n; i++ {
b := branches[i]
if b.BranchType == dtmcli.BranchAction {
if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing {
if b.Status == dtmcli.StatusPrepared {
rsAToStart++
} else if b.Status == dtmcli.StatusFailed {
rsAFailed++
@ -103,55 +102,37 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
}()
err = t.execBranch(db, &branches[i])
}
needRollback := func(i int) bool {
br := &branchResults[i]
return !br.started && br.branchType == dtmcli.BranchCompensate && br.status != dtmcli.StatusSucceed && branchResults[i+1].branchType == dtmcli.BranchAction && branchResults[i+1].status != dtmcli.StatusPrepared
}
pickAndRun := func(branchType string) {
pickToRunActions := func() []int {
toRun := []int{}
for current := 0; current < n; current++ {
br := &branchResults[current]
if br.branchType == branchType && branchType == dtmcli.BranchAction {
if (br.status == dtmcli.StatusPrepared || br.status == dtmcli.StatusDoing) &&
!br.started && isPreconditionsSucceed(current) {
br.status = dtmcli.StatusDoing
toRun = append(toRun, current)
}
} else if br.branchType == branchType && branchType == dtmcli.BranchCompensate {
if needRollback(current) {
toRun = append(toRun, current)
}
}
}
if branchType == dtmcli.BranchAction && len(toRun) > 0 && csc.Concurrent { // only save doing when concurrent
updates := make([]TransBranch, len(toRun))
for i, b := range toRun {
updates[i].ID = branches[b].ID
branches[b].Status = dtmcli.StatusDoing
updates[i].Status = dtmcli.StatusDoing
if br.branchType == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared {
toRun = append(toRun, current)
}
dbGet().Must().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_pkey",
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(updates)
} else if branchType == dtmcli.BranchCompensate && len(toRun) > 0 {
// when not concurrent, then may add one more branch, in case the newest branch state not saved and timeout
if !csc.Concurrent && len(toRun) < n/2 && branchResults[len(toRun)*2+1].status != dtmcli.StatusFailed {
toRun = append(toRun, len(toRun)*2+2)
}
rsCToStart = len(toRun)
}
dtmcli.Logf("toRun picked for %s is: %v", branchType, toRun)
dtmcli.Logf("toRun picked for action is: %v", toRun)
return toRun
}
runBranches := func(toRun []int) {
for _, b := range toRun {
branchResults[b].started = true
if branchType == dtmcli.BranchAction {
if branchResults[b].branchType == dtmcli.BranchAction {
rsAStarted++
}
go asyncExecBranch(b)
}
}
processorTimeout := func() bool {
return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second
pickAndRunCompensates := func(toRunActions []int) {
for _, b := range toRunActions {
// these branches may have run. so flag them to status succeed, then run the corresponding compensate
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.branchType == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
go asyncExecBranch(i)
}
}
}
waitDoneOnce := func() {
select {
@ -177,8 +158,9 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
}
}
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart && !processorTimeout() {
pickAndRun(dtmcli.BranchAction)
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart {
toRun := pickToRunActions()
runBranches(toRun)
if rsADone == rsAStarted { // no branch is running, so break
break
}
@ -192,12 +174,13 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
t.changeStatus(db, dtmcli.StatusAborting)
}
if t.Status == dtmcli.StatusAborting {
pickAndRun(dtmcli.BranchCompensate)
for rsCDone != rsCToStart && !processorTimeout() {
toRun := pickToRunActions()
pickAndRunCompensates(toRun)
for rsCDone != rsCToStart {
waitDoneOnce()
}
}
if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsAFailed > 0 && rsCToStart == rsCSucceed {
if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed {
t.changeStatus(db, dtmcli.StatusFailed)
}
return nil

2
test/saga_concurrent_test.go

@ -64,7 +64,7 @@ func csagaCommittedOngoing(t *testing.T) {
examples.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
csaga.Submit()
WaitTransProcessed(csaga.Gid)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusDoing, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid))
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid))
assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(csaga.Gid))
CronTransOnce()

8
test/saga_test.go

@ -9,11 +9,11 @@ import (
)
func TestSaga(t *testing.T) {
// sagaNormal(t)
// sagaCommittedOngoing(t)
// sagaRollback(t)
sagaNormal(t)
sagaCommittedOngoing(t)
sagaRollback(t)
sagaRollback2(t)
// sagaTimeout(t)
sagaTimeout(t)
}
func sagaNormal(t *testing.T) {

Loading…
Cancel
Save