Browse Source

concurrent saga change to saga

pull/44/head
yedf2 4 years ago
parent
commit
2fa530d17d
  1. 29
      dtmcli/concurrent_saga.go
  2. 24
      dtmcli/saga.go
  3. 5
      dtmsvr/trans.go
  4. 175
      dtmsvr/trans_concurrent_saga.go
  5. 166
      dtmsvr/trans_saga.go
  6. 3
      dtmsvr/trans_tcc.go
  7. 3
      dtmsvr/trans_xa.go
  8. 18
      examples/http_saga.go
  9. 9
      test/saga_concurrent_test.go

29
dtmcli/concurrent_saga.go

@ -1,29 +0,0 @@
package dtmcli
import "fmt"
// ConcurrentSaga struct of concurrent saga
type ConcurrentSaga struct {
Saga
orders map[int][]int
}
// NewConcurrentSaga create a concurrent saga
func NewConcurrentSaga(server string, gid string) *ConcurrentSaga {
return &ConcurrentSaga{Saga: Saga{TransBase: *NewTransBase(gid, "csaga", server, "")}, orders: map[int][]int{}}
}
// AddStepOrder specify that step should be after preSteps. Step is larger than all the element in preSteps
func (s *ConcurrentSaga) AddStepOrder(step int, preSteps []int) *ConcurrentSaga {
PanicIf(step > len(s.Steps), fmt.Errorf("step value: %d is invalid. which cannot be larger than total steps: %d", step, len(s.Steps)))
s.orders[step] = preSteps
return s
}
// Submit submit the saga trans
func (s *ConcurrentSaga) Submit() error {
if len(s.orders) > 0 {
s.CustomData = MustMarshalString(M{"orders": s.orders})
}
return s.callDtm(s, "submit")
}

24
dtmcli/saga.go

@ -1,9 +1,13 @@
package dtmcli
import "fmt"
// Saga struct of saga
type Saga struct {
TransBase
Steps []SagaStep `json:"steps"`
Steps []SagaStep `json:"steps"`
orders map[int][]int
concurrent bool
}
// SagaStep one step of saga
@ -15,7 +19,7 @@ type SagaStep struct {
// NewSaga create a saga
func NewSaga(server string, gid string) *Saga {
return &Saga{TransBase: *NewTransBase(gid, "saga", server, "")}
return &Saga{TransBase: *NewTransBase(gid, "saga", server, ""), orders: map[int][]int{}}
}
// Add add a saga step
@ -28,7 +32,23 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
return s
}
// AddStepOrder specify that step should be after preSteps. Step is larger than all the element in preSteps
func (s *Saga) AddStepOrder(step int, preSteps []int) *Saga {
PanicIf(step > len(s.Steps), fmt.Errorf("step value: %d is invalid. which cannot be larger than total steps: %d", step, len(s.Steps)))
s.orders[step] = preSteps
return s
}
// EnableConcurrent enable the concurrent exec of sub trans
func (s *Saga) EnableConcurrent() *Saga {
s.concurrent = true
return s
}
// Submit submit the saga trans
func (s *Saga) Submit() error {
if s.concurrent {
s.CustomData = MustMarshalString(M{"orders": s.orders, "concurrent": s.concurrent})
}
return s.callDtm(s, "submit")
}

5
dtmsvr/trans.go

@ -77,7 +77,7 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB {
func (t *TransGlobal) isTimeout() bool {
timeout := t.TimeoutToFail
if t.TimeoutToFail == 0 && t.TransType != "saga" && t.TransType != "csaga" {
if t.TimeoutToFail == 0 && t.TransType != "saga" && t.TransType != "msg" {
timeout = config.TimeoutToFail
}
if timeout == 0 {
@ -184,9 +184,6 @@ func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
}
}()
dtmcli.Logf("processing: %s status: %s", t.Gid, t.Status)
if t.Status == dtmcli.StatusPrepared && t.TransType != "msg" {
t.changeStatus(db, dtmcli.StatusAborting)
}
branches := []TransBranch{}
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
t.processStarted = time.Now()

175
dtmsvr/trans_concurrent_saga.go

@ -1,175 +0,0 @@
package dtmsvr
import (
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"gorm.io/gorm/clause"
)
type transCSagaProcessor struct {
*TransGlobal
}
func init() {
registorProcessorCreator("csaga", func(trans *TransGlobal) transProcessor { return &transCSagaProcessor{TransGlobal: trans} })
}
func (t *transCSagaProcessor) GenBranches() []TransBranch {
return genSagaBranches(t.TransGlobal)
}
type cSagaCustom struct {
Orders map[int][]int `json:"orders"`
}
func isPreconditionsSucceed(branches []TransBranch, pres []int) bool {
for _, pre := range pres {
if branches[pre].Status != dtmcli.StatusSucceed {
return false
}
}
return true
}
type branchResult struct {
index int
status string
started bool
branchType string
}
func (t *transCSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error {
if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed {
return nil
}
n := len(branches)
orders := map[int][]int{}
if t.CustomData != "" {
csc := cSagaCustom{Orders: map[int][]int{}}
dtmcli.MustUnmarshalString(t.CustomData, &csc)
for k, v := range csc.Orders { // new branches is doubled, so change the order value
orders[2*k+1] = []int{}
for j := 0; j < len(v); j++ {
orders[2*k+1] = append(orders[2*k+1], csc.Orders[k][j]*2+1)
}
}
}
// resultStats
var rsActionToStart, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
if b.BranchType == dtmcli.BranchAction {
if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing {
rsActionToStart++
} else if b.Status == dtmcli.StatusFailed {
rsActionFailed++
}
}
branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType}
}
stopChan := make(chan branchResult, n)
asyncExecBranch := func(i int) {
var err error
defer func() {
if x := recover(); x != nil {
err = dtmcli.AsError(x)
}
stopChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType}
if err != nil {
dtmcli.LogRedf("exec branch error: %v", err)
}
}()
err = t.execBranch(db, &branches[i])
}
needRollback := func(i int) bool {
br := &branchResults[i]
return !br.started && br.branchType == dtmcli.BranchCompensate && br.status != dtmcli.StatusSucceed && branchResults[i+1].branchType == dtmcli.BranchAction && branchResults[i+1].status != dtmcli.StatusPrepared
}
pickAndRun := func(branchType string) {
toRun := []int{}
for current := 0; current < n; current++ {
br := &branchResults[current]
if br.branchType == branchType && branchType == dtmcli.BranchAction {
if (br.status == dtmcli.StatusPrepared || br.status == dtmcli.StatusDoing) &&
!br.started && isPreconditionsSucceed(branches, orders[current]) {
br.status = dtmcli.StatusDoing
toRun = append(toRun, current)
}
} else if br.branchType == branchType && branchType == dtmcli.BranchCompensate {
if needRollback(current) {
toRun = append(toRun, current)
}
}
}
if branchType == dtmcli.BranchAction && len(toRun) > 0 {
updates := make([]TransBranch, len(toRun))
for i, b := range toRun {
updates[i].ID = branches[b].ID
branches[b].Status = dtmcli.StatusDoing
updates[i].Status = dtmcli.StatusDoing
}
dbGet().Must().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_pkey",
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(updates)
} else if branchType == dtmcli.BranchCompensate {
rsCompensateToStart = len(toRun)
}
for _, b := range toRun {
branchResults[b].started = true
go asyncExecBranch(b)
}
}
processorTimeout := func() bool {
return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second
}
waitOnceForDone := func() {
select {
case r := <-stopChan:
br := &branchResults[r.index]
br.status = r.status
if r.branchType == dtmcli.BranchAction {
rsActionDone++
if r.status == dtmcli.StatusFailed {
rsActionFailed++
} else if r.status == dtmcli.StatusSucceed {
rsActionSucceed++
}
} else {
rsCompensateDone++
if r.status == dtmcli.StatusSucceed {
rsCompensateSucceed++
}
}
dtmcli.Logf("branch done: %v", r)
case <-time.After(time.Duration(time.Second * 3)):
dtmcli.Logf("wait once for done")
}
}
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() {
pickAndRun(dtmcli.BranchAction)
waitOnceForDone()
}
if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed {
t.changeStatus(db, dtmcli.StatusSucceed)
return nil
}
if t.Status == dtmcli.StatusSubmitted && (rsActionFailed > 0 || t.isTimeout()) {
t.changeStatus(db, dtmcli.StatusAborting)
}
if t.Status == dtmcli.StatusAborting {
pickAndRun(dtmcli.BranchCompensate)
for rsCompensateDone != rsCompensateToStart && !processorTimeout() {
waitOnceForDone()
}
}
if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsActionFailed > 0 && rsCompensateToStart == rsCompensateSucceed {
t.changeStatus(db, dtmcli.StatusFailed)
}
return nil
}

166
dtmsvr/trans_saga.go

@ -2,9 +2,11 @@ package dtmsvr
import (
"fmt"
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"gorm.io/gorm/clause"
)
type transSagaProcessor struct {
@ -15,7 +17,7 @@ func init() {
registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor { return &transSagaProcessor{TransGlobal: trans} })
}
func genSagaBranches(t *TransGlobal) []TransBranch {
func (t *transSagaProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
steps := []M{}
dtmcli.MustUnmarshalString(t.Data, &steps)
@ -35,48 +37,160 @@ func genSagaBranches(t *TransGlobal) []TransBranch {
return branches
}
func (t *transSagaProcessor) GenBranches() []TransBranch {
return genSagaBranches(t.TransGlobal)
type cSagaCustom struct {
Orders map[int][]int `json:"orders"`
Concurrent bool `json:"concurrent"`
}
type branchResult struct {
index int
status string
started bool
branchType string
}
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error {
if !t.needProcess() {
if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed {
return nil
}
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
if branch.BranchType != dtmcli.BranchAction || branch.Status == dtmcli.StatusSucceed {
continue
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(db, dtmcli.StatusAborting)
}
n := len(branches)
csc := cSagaCustom{Orders: map[int][]int{}}
if t.CustomData != "" {
dtmcli.MustUnmarshalString(t.CustomData, &csc)
}
// resultStats
var rsActionToStart, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
if b.BranchType == dtmcli.BranchAction {
if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing {
rsActionToStart++
} else if b.Status == dtmcli.StatusFailed {
rsActionFailed++
}
}
branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType}
}
isPreconditionsSucceed := func(current int) bool {
if !csc.Concurrent && branches[current-1].Status != dtmcli.StatusSucceed {
return false
}
// 找到了一个非succeed的action
if branch.Status == dtmcli.StatusPrepared {
err := t.execBranch(db, branch)
for _, pre := range csc.Orders[current*2+1] {
if branches[pre*2+1].Status != dtmcli.StatusSucceed {
return false
}
}
return true
}
stopChan := make(chan branchResult, n)
asyncExecBranch := func(i int) {
var err error
defer func() {
if x := recover(); x != nil {
err = dtmcli.AsError(x)
}
stopChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType}
if err != nil {
return err
dtmcli.LogRedf("exec branch error: %v", err)
}
}()
err = t.execBranch(db, &branches[i])
}
needRollback := func(i int) bool {
br := &branchResults[i]
return !br.started && br.branchType == dtmcli.BranchCompensate && br.status != dtmcli.StatusSucceed && branchResults[i+1].branchType == dtmcli.BranchAction && branchResults[i+1].status != dtmcli.StatusPrepared
}
pickAndRun := func(branchType string) {
toRun := []int{}
for current := 0; current < n; current++ {
br := &branchResults[current]
if br.branchType == branchType && branchType == dtmcli.BranchAction {
if (br.status == dtmcli.StatusPrepared || br.status == dtmcli.StatusDoing) &&
!br.started && isPreconditionsSucceed(current) {
br.status = dtmcli.StatusDoing
toRun = append(toRun, current)
}
} else if br.branchType == branchType && branchType == dtmcli.BranchCompensate {
if needRollback(current) {
toRun = append(toRun, current)
}
}
}
if branch.Status != dtmcli.StatusSucceed {
break
if branchType == dtmcli.BranchAction && len(toRun) > 0 && csc.Concurrent { // only save doing when concurrent
updates := make([]TransBranch, len(toRun))
for i, b := range toRun {
updates[i].ID = branches[b].ID
branches[b].Status = dtmcli.StatusDoing
updates[i].Status = dtmcli.StatusDoing
}
dbGet().Must().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_pkey",
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(updates)
} else if branchType == dtmcli.BranchCompensate {
// when not concurrent, then may add one more branch, in case the newest branch state not saved and timeout
if !csc.Concurrent && len(toRun) < n/2 && branchResults[len(toRun)*2+1].status != dtmcli.StatusFailed {
toRun = append(toRun, len(toRun)*2+2)
}
rsCompensateToStart = len(toRun)
}
for _, b := range toRun {
branchResults[b].started = true
go asyncExecBranch(b)
}
}
processorTimeout := func() bool {
return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second
}
if current == len(branches) { // saga 事务完成
waitOnceForDone := func() {
select {
case r := <-stopChan:
br := &branchResults[r.index]
br.status = r.status
if r.branchType == dtmcli.BranchAction {
rsActionDone++
if r.status == dtmcli.StatusFailed {
rsActionFailed++
} else if r.status == dtmcli.StatusSucceed {
rsActionSucceed++
}
} else {
rsCompensateDone++
if r.status == dtmcli.StatusSucceed {
rsCompensateSucceed++
}
}
dtmcli.Logf("branch done: %v", r)
case <-time.After(time.Duration(time.Second * 3)):
dtmcli.Logf("wait once for done")
}
}
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() {
pickAndRun(dtmcli.BranchAction)
waitOnceForDone()
}
if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed {
t.changeStatus(db, dtmcli.StatusSucceed)
return nil
}
if t.Status != dtmcli.StatusAborting && t.Status != dtmcli.StatusFailed {
if t.Status == dtmcli.StatusSubmitted && (rsActionFailed > 0 || t.isTimeout()) {
t.changeStatus(db, dtmcli.StatusAborting)
}
for current = current - 1; current >= 0; current-- {
branch := &branches[current]
if branch.BranchType != dtmcli.BranchCompensate || branch.Status != dtmcli.StatusPrepared {
continue
}
err := t.execBranch(db, branch)
if err != nil {
return err
if t.Status == dtmcli.StatusAborting {
pickAndRun(dtmcli.BranchCompensate)
for rsCompensateDone != rsCompensateToStart && !processorTimeout() {
waitOnceForDone()
}
}
t.changeStatus(db, dtmcli.StatusFailed)
if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsActionFailed > 0 && rsCompensateToStart == rsCompensateSucceed {
t.changeStatus(db, dtmcli.StatusFailed)
}
return nil
}

3
dtmsvr/trans_tcc.go

@ -21,6 +21,9 @@ func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) e
if !t.needProcess() {
return nil
}
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(db, dtmcli.StatusAborting)
}
branchType := dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
if branches[current].BranchType == branchType && branches[current].Status == dtmcli.StatusPrepared {

3
dtmsvr/trans_xa.go

@ -21,6 +21,9 @@ func (t *transXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) er
if !t.needProcess() {
return nil
}
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(db, dtmcli.StatusAborting)
}
currentType := dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchCommit, dtmcli.BranchRollback).(string)
for _, branch := range branches {
if branch.BranchType == currentType && branch.Status != dtmcli.StatusSucceed {

18
examples/http_saga.go

@ -29,5 +29,21 @@ func init() {
dtmcli.FatalIfError(err)
return saga.Gid
})
addSample("concurrent_saga", func() string {
dtmcli.Logf("a concurrent saga busi transaction begin")
req := &TransReq{Amount: 30}
csaga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req).
EnableConcurrent().
AddStepOrder(2, []int{0, 1}).
AddStepOrder(3, []int{0, 1})
dtmcli.Logf("concurrent saga busi trans submit")
err := csaga.Submit()
dtmcli.Logf("result gid is: %s", csaga.Gid)
dtmcli.FatalIfError(err)
return csaga.Gid
})
}

9
test/saga_concurrent_test.go

@ -15,12 +15,13 @@ func TestCSaga(t *testing.T) {
csagaCommittedOngoing(t)
}
func genCSaga(gid string, outFailed bool, inFailed bool) *dtmcli.ConcurrentSaga {
func genCSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
dtmcli.Logf("beginning a concurrent saga test ---------------- %s", gid)
csaga := dtmcli.NewConcurrentSaga(examples.DtmServer, gid)
req := examples.GenTransReq(30, outFailed, inFailed)
csaga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req)
csaga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req)
csaga := dtmcli.NewSaga(examples.DtmServer, gid).
Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req).
Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req).
EnableConcurrent()
return csaga
}

Loading…
Cancel
Save