diff --git a/dtmcli/concurrent_saga.go b/dtmcli/concurrent_saga.go deleted file mode 100644 index 202fd24..0000000 --- a/dtmcli/concurrent_saga.go +++ /dev/null @@ -1,29 +0,0 @@ -package dtmcli - -import "fmt" - -// ConcurrentSaga struct of concurrent saga -type ConcurrentSaga struct { - Saga - orders map[int][]int -} - -// NewConcurrentSaga create a concurrent saga -func NewConcurrentSaga(server string, gid string) *ConcurrentSaga { - return &ConcurrentSaga{Saga: Saga{TransBase: *NewTransBase(gid, "csaga", server, "")}, orders: map[int][]int{}} -} - -// AddStepOrder specify that step should be after preSteps. Step is larger than all the element in preSteps -func (s *ConcurrentSaga) AddStepOrder(step int, preSteps []int) *ConcurrentSaga { - PanicIf(step > len(s.Steps), fmt.Errorf("step value: %d is invalid. which cannot be larger than total steps: %d", step, len(s.Steps))) - s.orders[step] = preSteps - return s -} - -// Submit submit the saga trans -func (s *ConcurrentSaga) Submit() error { - if len(s.orders) > 0 { - s.CustomData = MustMarshalString(M{"orders": s.orders}) - } - return s.callDtm(s, "submit") -} diff --git a/dtmcli/saga.go b/dtmcli/saga.go index f6681d8..fa9de44 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -1,9 +1,13 @@ package dtmcli +import "fmt" + // Saga struct of saga type Saga struct { TransBase - Steps []SagaStep `json:"steps"` + Steps []SagaStep `json:"steps"` + orders map[int][]int + concurrent bool } // SagaStep one step of saga @@ -15,7 +19,7 @@ type SagaStep struct { // NewSaga create a saga func NewSaga(server string, gid string) *Saga { - return &Saga{TransBase: *NewTransBase(gid, "saga", server, "")} + return &Saga{TransBase: *NewTransBase(gid, "saga", server, ""), orders: map[int][]int{}} } // Add add a saga step @@ -28,7 +32,23 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga return s } +// AddStepOrder specify that step should be after preSteps. Step is larger than all the element in preSteps +func (s *Saga) AddStepOrder(step int, preSteps []int) *Saga { + PanicIf(step > len(s.Steps), fmt.Errorf("step value: %d is invalid. which cannot be larger than total steps: %d", step, len(s.Steps))) + s.orders[step] = preSteps + return s +} + +// EnableConcurrent enable the concurrent exec of sub trans +func (s *Saga) EnableConcurrent() *Saga { + s.concurrent = true + return s +} + // Submit submit the saga trans func (s *Saga) Submit() error { + if s.concurrent { + s.CustomData = MustMarshalString(M{"orders": s.orders, "concurrent": s.concurrent}) + } return s.callDtm(s, "submit") } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 4a4b2d2..f1b0687 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -77,7 +77,7 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB { func (t *TransGlobal) isTimeout() bool { timeout := t.TimeoutToFail - if t.TimeoutToFail == 0 && t.TransType != "saga" && t.TransType != "csaga" { + if t.TimeoutToFail == 0 && t.TransType != "saga" && t.TransType != "msg" { timeout = config.TimeoutToFail } if timeout == 0 { @@ -184,9 +184,6 @@ func (t *TransGlobal) processInner(db *common.DB) (rerr error) { } }() dtmcli.Logf("processing: %s status: %s", t.Gid, t.Status) - if t.Status == dtmcli.StatusPrepared && t.TransType != "msg" { - t.changeStatus(db, dtmcli.StatusAborting) - } branches := []TransBranch{} db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) t.processStarted = time.Now() diff --git a/dtmsvr/trans_concurrent_saga.go b/dtmsvr/trans_concurrent_saga.go deleted file mode 100644 index 3d64000..0000000 --- a/dtmsvr/trans_concurrent_saga.go +++ /dev/null @@ -1,175 +0,0 @@ -package dtmsvr - -import ( - "time" - - "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli" - "gorm.io/gorm/clause" -) - -type transCSagaProcessor struct { - *TransGlobal -} - -func init() { - registorProcessorCreator("csaga", func(trans *TransGlobal) transProcessor { return &transCSagaProcessor{TransGlobal: trans} }) -} - -func (t *transCSagaProcessor) GenBranches() []TransBranch { - return genSagaBranches(t.TransGlobal) -} - -type cSagaCustom struct { - Orders map[int][]int `json:"orders"` -} - -func isPreconditionsSucceed(branches []TransBranch, pres []int) bool { - for _, pre := range pres { - if branches[pre].Status != dtmcli.StatusSucceed { - return false - } - } - return true -} - -type branchResult struct { - index int - status string - started bool - branchType string -} - -func (t *transCSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error { - if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { - return nil - } - n := len(branches) - - orders := map[int][]int{} - if t.CustomData != "" { - csc := cSagaCustom{Orders: map[int][]int{}} - dtmcli.MustUnmarshalString(t.CustomData, &csc) - for k, v := range csc.Orders { // new branches is doubled, so change the order value - orders[2*k+1] = []int{} - for j := 0; j < len(v); j++ { - orders[2*k+1] = append(orders[2*k+1], csc.Orders[k][j]*2+1) - } - } - } - // resultStats - var rsActionToStart, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int - branchResults := make([]branchResult, n) // save the branch result - for i := 0; i < n; i++ { - b := branches[i] - if b.BranchType == dtmcli.BranchAction { - if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing { - rsActionToStart++ - } else if b.Status == dtmcli.StatusFailed { - rsActionFailed++ - } - } - branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType} - } - stopChan := make(chan branchResult, n) - asyncExecBranch := func(i int) { - var err error - defer func() { - if x := recover(); x != nil { - err = dtmcli.AsError(x) - } - stopChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType} - if err != nil { - dtmcli.LogRedf("exec branch error: %v", err) - } - }() - err = t.execBranch(db, &branches[i]) - } - needRollback := func(i int) bool { - br := &branchResults[i] - return !br.started && br.branchType == dtmcli.BranchCompensate && br.status != dtmcli.StatusSucceed && branchResults[i+1].branchType == dtmcli.BranchAction && branchResults[i+1].status != dtmcli.StatusPrepared - } - pickAndRun := func(branchType string) { - toRun := []int{} - for current := 0; current < n; current++ { - br := &branchResults[current] - if br.branchType == branchType && branchType == dtmcli.BranchAction { - if (br.status == dtmcli.StatusPrepared || br.status == dtmcli.StatusDoing) && - !br.started && isPreconditionsSucceed(branches, orders[current]) { - br.status = dtmcli.StatusDoing - toRun = append(toRun, current) - } - } else if br.branchType == branchType && branchType == dtmcli.BranchCompensate { - if needRollback(current) { - toRun = append(toRun, current) - } - } - } - if branchType == dtmcli.BranchAction && len(toRun) > 0 { - updates := make([]TransBranch, len(toRun)) - for i, b := range toRun { - updates[i].ID = branches[b].ID - branches[b].Status = dtmcli.StatusDoing - updates[i].Status = dtmcli.StatusDoing - } - dbGet().Must().Clauses(clause.OnConflict{ - OnConstraint: "trans_branch_pkey", - DoUpdates: clause.AssignmentColumns([]string{"status"}), - }).Create(updates) - } else if branchType == dtmcli.BranchCompensate { - rsCompensateToStart = len(toRun) - } - for _, b := range toRun { - branchResults[b].started = true - go asyncExecBranch(b) - } - } - processorTimeout := func() bool { - return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second - } - waitOnceForDone := func() { - select { - case r := <-stopChan: - br := &branchResults[r.index] - br.status = r.status - if r.branchType == dtmcli.BranchAction { - rsActionDone++ - if r.status == dtmcli.StatusFailed { - rsActionFailed++ - } else if r.status == dtmcli.StatusSucceed { - rsActionSucceed++ - } - } else { - rsCompensateDone++ - if r.status == dtmcli.StatusSucceed { - rsCompensateSucceed++ - } - } - dtmcli.Logf("branch done: %v", r) - case <-time.After(time.Duration(time.Second * 3)): - dtmcli.Logf("wait once for done") - } - } - - for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() { - pickAndRun(dtmcli.BranchAction) - waitOnceForDone() - } - if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed { - t.changeStatus(db, dtmcli.StatusSucceed) - return nil - } - if t.Status == dtmcli.StatusSubmitted && (rsActionFailed > 0 || t.isTimeout()) { - t.changeStatus(db, dtmcli.StatusAborting) - } - if t.Status == dtmcli.StatusAborting { - pickAndRun(dtmcli.BranchCompensate) - for rsCompensateDone != rsCompensateToStart && !processorTimeout() { - waitOnceForDone() - } - } - if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsActionFailed > 0 && rsCompensateToStart == rsCompensateSucceed { - t.changeStatus(db, dtmcli.StatusFailed) - } - return nil -} diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 2fbcc51..4925851 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -2,9 +2,11 @@ package dtmsvr import ( "fmt" + "time" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" + "gorm.io/gorm/clause" ) type transSagaProcessor struct { @@ -15,7 +17,7 @@ func init() { registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor { return &transSagaProcessor{TransGlobal: trans} }) } -func genSagaBranches(t *TransGlobal) []TransBranch { +func (t *transSagaProcessor) GenBranches() []TransBranch { branches := []TransBranch{} steps := []M{} dtmcli.MustUnmarshalString(t.Data, &steps) @@ -35,48 +37,160 @@ func genSagaBranches(t *TransGlobal) []TransBranch { return branches } -func (t *transSagaProcessor) GenBranches() []TransBranch { - return genSagaBranches(t.TransGlobal) +type cSagaCustom struct { + Orders map[int][]int `json:"orders"` + Concurrent bool `json:"concurrent"` +} + +type branchResult struct { + index int + status string + started bool + branchType string } func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error { - if !t.needProcess() { + if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { return nil } - current := 0 // 当前正在处理的步骤 - for ; current < len(branches); current++ { - branch := &branches[current] - if branch.BranchType != dtmcli.BranchAction || branch.Status == dtmcli.StatusSucceed { - continue + if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { + t.changeStatus(db, dtmcli.StatusAborting) + } + n := len(branches) + + csc := cSagaCustom{Orders: map[int][]int{}} + if t.CustomData != "" { + dtmcli.MustUnmarshalString(t.CustomData, &csc) + } + // resultStats + var rsActionToStart, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int + branchResults := make([]branchResult, n) // save the branch result + for i := 0; i < n; i++ { + b := branches[i] + if b.BranchType == dtmcli.BranchAction { + if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing { + rsActionToStart++ + } else if b.Status == dtmcli.StatusFailed { + rsActionFailed++ + } + } + branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType} + } + isPreconditionsSucceed := func(current int) bool { + if !csc.Concurrent && branches[current-1].Status != dtmcli.StatusSucceed { + return false } - // 找到了一个非succeed的action - if branch.Status == dtmcli.StatusPrepared { - err := t.execBranch(db, branch) + for _, pre := range csc.Orders[current*2+1] { + if branches[pre*2+1].Status != dtmcli.StatusSucceed { + return false + } + } + return true + } + + stopChan := make(chan branchResult, n) + asyncExecBranch := func(i int) { + var err error + defer func() { + if x := recover(); x != nil { + err = dtmcli.AsError(x) + } + stopChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType} if err != nil { - return err + dtmcli.LogRedf("exec branch error: %v", err) + } + }() + err = t.execBranch(db, &branches[i]) + } + needRollback := func(i int) bool { + br := &branchResults[i] + return !br.started && br.branchType == dtmcli.BranchCompensate && br.status != dtmcli.StatusSucceed && branchResults[i+1].branchType == dtmcli.BranchAction && branchResults[i+1].status != dtmcli.StatusPrepared + } + pickAndRun := func(branchType string) { + toRun := []int{} + for current := 0; current < n; current++ { + br := &branchResults[current] + if br.branchType == branchType && branchType == dtmcli.BranchAction { + if (br.status == dtmcli.StatusPrepared || br.status == dtmcli.StatusDoing) && + !br.started && isPreconditionsSucceed(current) { + br.status = dtmcli.StatusDoing + toRun = append(toRun, current) + } + } else if br.branchType == branchType && branchType == dtmcli.BranchCompensate { + if needRollback(current) { + toRun = append(toRun, current) + } } } - if branch.Status != dtmcli.StatusSucceed { - break + if branchType == dtmcli.BranchAction && len(toRun) > 0 && csc.Concurrent { // only save doing when concurrent + updates := make([]TransBranch, len(toRun)) + for i, b := range toRun { + updates[i].ID = branches[b].ID + branches[b].Status = dtmcli.StatusDoing + updates[i].Status = dtmcli.StatusDoing + } + dbGet().Must().Clauses(clause.OnConflict{ + OnConstraint: "trans_branch_pkey", + DoUpdates: clause.AssignmentColumns([]string{"status"}), + }).Create(updates) + } else if branchType == dtmcli.BranchCompensate { + // when not concurrent, then may add one more branch, in case the newest branch state not saved and timeout + if !csc.Concurrent && len(toRun) < n/2 && branchResults[len(toRun)*2+1].status != dtmcli.StatusFailed { + toRun = append(toRun, len(toRun)*2+2) + } + rsCompensateToStart = len(toRun) } + for _, b := range toRun { + branchResults[b].started = true + go asyncExecBranch(b) + } + } + processorTimeout := func() bool { + return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second } - if current == len(branches) { // saga 事务完成 + waitOnceForDone := func() { + select { + case r := <-stopChan: + br := &branchResults[r.index] + br.status = r.status + if r.branchType == dtmcli.BranchAction { + rsActionDone++ + if r.status == dtmcli.StatusFailed { + rsActionFailed++ + } else if r.status == dtmcli.StatusSucceed { + rsActionSucceed++ + } + } else { + rsCompensateDone++ + if r.status == dtmcli.StatusSucceed { + rsCompensateSucceed++ + } + } + dtmcli.Logf("branch done: %v", r) + case <-time.After(time.Duration(time.Second * 3)): + dtmcli.Logf("wait once for done") + } + } + + for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() { + pickAndRun(dtmcli.BranchAction) + waitOnceForDone() + } + if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed { t.changeStatus(db, dtmcli.StatusSucceed) return nil } - if t.Status != dtmcli.StatusAborting && t.Status != dtmcli.StatusFailed { + if t.Status == dtmcli.StatusSubmitted && (rsActionFailed > 0 || t.isTimeout()) { t.changeStatus(db, dtmcli.StatusAborting) } - for current = current - 1; current >= 0; current-- { - branch := &branches[current] - if branch.BranchType != dtmcli.BranchCompensate || branch.Status != dtmcli.StatusPrepared { - continue - } - err := t.execBranch(db, branch) - if err != nil { - return err + if t.Status == dtmcli.StatusAborting { + pickAndRun(dtmcli.BranchCompensate) + for rsCompensateDone != rsCompensateToStart && !processorTimeout() { + waitOnceForDone() } } - t.changeStatus(db, dtmcli.StatusFailed) + if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsActionFailed > 0 && rsCompensateToStart == rsCompensateSucceed { + t.changeStatus(db, dtmcli.StatusFailed) + } return nil } diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 8e4afc4..80be0ee 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -21,6 +21,9 @@ func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) e if !t.needProcess() { return nil } + if t.Status == dtmcli.StatusPrepared && t.isTimeout() { + t.changeStatus(db, dtmcli.StatusAborting) + } branchType := dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string) for current := len(branches) - 1; current >= 0; current-- { if branches[current].BranchType == branchType && branches[current].Status == dtmcli.StatusPrepared { diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 8c6b578..007aa76 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -21,6 +21,9 @@ func (t *transXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) er if !t.needProcess() { return nil } + if t.Status == dtmcli.StatusPrepared && t.isTimeout() { + t.changeStatus(db, dtmcli.StatusAborting) + } currentType := dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchCommit, dtmcli.BranchRollback).(string) for _, branch := range branches { if branch.BranchType == currentType && branch.Status != dtmcli.StatusSucceed { diff --git a/examples/http_saga.go b/examples/http_saga.go index 40df77a..2558586 100644 --- a/examples/http_saga.go +++ b/examples/http_saga.go @@ -29,5 +29,21 @@ func init() { dtmcli.FatalIfError(err) return saga.Gid }) - + addSample("concurrent_saga", func() string { + dtmcli.Logf("a concurrent saga busi transaction begin") + req := &TransReq{Amount: 30} + csaga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). + Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). + Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). + Add(Busi+"/TransIn", Busi+"/TransInRevert", req). + Add(Busi+"/TransIn", Busi+"/TransInRevert", req). + EnableConcurrent(). + AddStepOrder(2, []int{0, 1}). + AddStepOrder(3, []int{0, 1}) + dtmcli.Logf("concurrent saga busi trans submit") + err := csaga.Submit() + dtmcli.Logf("result gid is: %s", csaga.Gid) + dtmcli.FatalIfError(err) + return csaga.Gid + }) } diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index 5d2020d..6581598 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -15,12 +15,13 @@ func TestCSaga(t *testing.T) { csagaCommittedOngoing(t) } -func genCSaga(gid string, outFailed bool, inFailed bool) *dtmcli.ConcurrentSaga { +func genCSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { dtmcli.Logf("beginning a concurrent saga test ---------------- %s", gid) - csaga := dtmcli.NewConcurrentSaga(examples.DtmServer, gid) req := examples.GenTransReq(30, outFailed, inFailed) - csaga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) - csaga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) + csaga := dtmcli.NewSaga(examples.DtmServer, gid). + Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req). + Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req). + EnableConcurrent() return csaga }