Browse Source

partial

pull/44/head
yedf2 4 years ago
parent
commit
b9fec6c390
  1. 2
      common/types.go
  2. 15
      dtmsvr/trans.go
  3. 53
      dtmsvr/trans_saga.go
  4. 33
      test/saga_test.go
  5. 17
      test/xa_test.go

2
common/types.go

@ -143,7 +143,7 @@ func init() {
return
}
DtmConfig.TransCronInterval = getIntEnv("TRANS_CRON_INTERVAL", "3")
DtmConfig.TimeoutToFail = getIntEnv("TIMEOUT_TO_FAIL", "10")
DtmConfig.TimeoutToFail = getIntEnv("TIMEOUT_TO_FAIL", "35")
DtmConfig.RetryInterval = getIntEnv("RETRY_INTERVAL", "10")
DtmConfig.DB = map[string]string{
"driver": dtmcli.OrString(os.Getenv("DB_DRIVER"), "mysql"),

15
dtmsvr/trans.go

@ -156,6 +156,10 @@ func (t *TransGlobal) Process(db *common.DB) dtmcli.M {
}
func (t *TransGlobal) process(db *common.DB) dtmcli.M {
if t.Options != "" {
dtmcli.MustUnmarshalString(t.Options, &t.TransOptions)
}
if !t.WaitResult {
go t.processInner(db)
return dtmcli.MapSuccess
@ -295,6 +299,10 @@ func (t *TransGlobal) saveNew(db *common.DB) error {
return db.Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
t.setNextCron(cronReset)
t.Options = dtmcli.MustMarshalString(t.TransOptions)
if t.Options == "{}" {
t.Options = ""
}
writeTransLog(t.Gid, "create trans", t.Status, "", t.Data)
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
@ -326,10 +334,6 @@ func TransFromContext(c *gin.Context) *TransGlobal {
}
m := TransGlobal{}
dtmcli.MustRemarshal(data, &m)
m.Options = dtmcli.MustMarshalString(m.TransOptions)
if m.Options == "{}" {
m.Options = ""
}
m.Protocol = "http"
return &m
}
@ -353,9 +357,6 @@ func TransFromDb(db *common.DB, gid string) *TransGlobal {
return nil
}
e2p(dbr.Error)
if m.Options != "" {
dtmcli.MustUnmarshalString(m.Options, &m.TransOptions)
}
return &m
}

53
dtmsvr/trans_saga.go

@ -50,9 +50,8 @@ type branchResult struct {
}
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error {
if !t.needProcess() {
return nil
}
// when saga tasks is fetched, it always need to process
dtmcli.Logf("status: %s timeout: %t", t.Status, t.isTimeout())
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(db, dtmcli.StatusAborting)
}
@ -63,23 +62,25 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
dtmcli.MustUnmarshalString(t.CustomData, &csc)
}
// resultStats
var rsActionToStart, rsActionStarted, rsActionDone, rsActionFailed, rsActionSucceed, rsCompensateToStart, rsCompensateDone, rsCompensateSucceed int
var rsAToStart, rsAStarted, rsADone, rsAFailed, rsASucceed, rsCToStart, rsCDone, rsCSucceed int
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
if b.BranchType == dtmcli.BranchAction {
if b.Status == dtmcli.StatusPrepared || b.Status == dtmcli.StatusDoing {
rsActionToStart++
rsAToStart++
} else if b.Status == dtmcli.StatusFailed {
rsActionFailed++
rsAFailed++
}
}
branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType}
}
isPreconditionsSucceed := func(current int) bool {
// if !csc.Concurrent,then check the branch in previous step is succeed
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed {
return false
}
// if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr
for _, pre := range csc.Orders[current/2] {
if branches[pre*2+1].Status != dtmcli.StatusSucceed {
return false
@ -88,14 +89,14 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
return true
}
stopChan := make(chan branchResult, n)
resultChan := make(chan branchResult, n)
asyncExecBranch := func(i int) {
var err error
defer func() {
if x := recover(); x != nil {
err = dtmcli.AsError(x)
}
stopChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType}
resultChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType}
if err != nil {
dtmcli.LogRedf("exec branch error: %v", err)
}
@ -133,18 +134,18 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
OnConstraint: "trans_branch_pkey",
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(updates)
} else if branchType == dtmcli.BranchCompensate {
} else if branchType == dtmcli.BranchCompensate && len(toRun) > 0 {
// when not concurrent, then may add one more branch, in case the newest branch state not saved and timeout
if !csc.Concurrent && len(toRun) < n/2 && branchResults[len(toRun)*2+1].status != dtmcli.StatusFailed {
toRun = append(toRun, len(toRun)*2+2)
}
rsCompensateToStart = len(toRun)
rsCToStart = len(toRun)
}
dtmcli.Logf("toRun picked for %s is: %v", branchType, toRun)
for _, b := range toRun {
branchResults[b].started = true
if branchType == dtmcli.BranchAction {
rsActionStarted++
rsAStarted++
}
go asyncExecBranch(b)
}
@ -152,22 +153,22 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
processorTimeout := func() bool {
return time.Since(t.processStarted)+NowForwardDuration > time.Duration(t.getRetryInterval()-3)*time.Second
}
waitOnceForDone := func() {
waitDoneOnce := func() {
select {
case r := <-stopChan:
case r := <-resultChan:
br := &branchResults[r.index]
br.status = r.status
if r.branchType == dtmcli.BranchAction {
rsActionDone++
rsADone++
if r.status == dtmcli.StatusFailed {
rsActionFailed++
rsAFailed++
} else if r.status == dtmcli.StatusSucceed {
rsActionSucceed++
rsASucceed++
}
} else {
rsCompensateDone++
rsCDone++
if r.status == dtmcli.StatusSucceed {
rsCompensateSucceed++
rsCSucceed++
}
}
dtmcli.Logf("branch done: %v", r)
@ -176,27 +177,27 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
}
}
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsActionFailed == 0 && rsActionDone != rsActionToStart && !processorTimeout() {
for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart && !processorTimeout() {
pickAndRun(dtmcli.BranchAction)
if rsActionDone == rsActionStarted { // no branch is running, so break
if rsADone == rsAStarted { // no branch is running, so break
break
}
waitOnceForDone()
waitDoneOnce()
}
if t.Status == dtmcli.StatusSubmitted && rsActionFailed == 0 && rsActionToStart == rsActionSucceed {
if t.Status == dtmcli.StatusSubmitted && rsAFailed == 0 && rsAToStart == rsASucceed {
t.changeStatus(db, dtmcli.StatusSucceed)
return nil
}
if t.Status == dtmcli.StatusSubmitted && (rsActionFailed > 0 || t.isTimeout()) {
if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) {
t.changeStatus(db, dtmcli.StatusAborting)
}
if t.Status == dtmcli.StatusAborting {
pickAndRun(dtmcli.BranchCompensate)
for rsCompensateDone != rsCompensateToStart && !processorTimeout() {
waitOnceForDone()
for rsCDone != rsCToStart && !processorTimeout() {
waitDoneOnce()
}
}
if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsActionFailed > 0 && rsCompensateToStart == rsCompensateSucceed {
if (t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting) && rsAFailed > 0 && rsCToStart == rsCSucceed {
t.changeStatus(db, dtmcli.StatusFailed)
}
return nil

33
test/saga_test.go

@ -9,9 +9,11 @@ import (
)
func TestSaga(t *testing.T) {
sagaNormal(t)
sagaCommittedOngoing(t)
sagaRollback(t)
// sagaNormal(t)
// sagaCommittedOngoing(t)
// sagaRollback(t)
sagaRollback2(t)
// sagaTimeout(t)
}
func sagaNormal(t *testing.T) {
@ -37,7 +39,7 @@ func sagaCommittedOngoing(t *testing.T) {
}
func sagaRollback(t *testing.T) {
saga := genSaga("gid-rollbackSaga2", false, true)
saga := genSaga("gid-rollback-saga", false, true)
examples.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing)
err := saga.Submit()
assert.Nil(t, err)
@ -50,6 +52,29 @@ func sagaRollback(t *testing.T) {
assert.Error(t, err)
}
func sagaRollback2(t *testing.T) {
saga := genSaga("gid-rollback-saga2", false, false)
saga.TimeoutToFail = 1800
examples.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing)
err := saga.Submit()
assert.Nil(t, err)
WaitTransProcessed(saga.Gid)
cronTransOnceForwardNow(3600)
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
}
func sagaTimeout(t *testing.T) {
saga := genSaga("gid-timeout-saga", false, false)
saga.TimeoutToFail = 1800
examples.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(saga.Gid))
cronTransOnceForwardNow(3600)
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
}
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
dtmcli.Logf("beginning a saga test ---------------- %s", gid)
saga := dtmcli.NewSaga(examples.DtmServer, gid)

17
test/xa_test.go

@ -16,6 +16,7 @@ func TestXa(t *testing.T) {
xaNormal(t)
xaDuplicate(t)
xaRollback(t)
xaTimeout(t)
}
func xaLocalError(t *testing.T) {
@ -75,3 +76,19 @@ func xaRollback(t *testing.T) {
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusPrepared}, getBranchesStatus(gid))
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}
func xaTimeout(t *testing.T) {
xc := examples.XaClient
gid := "xaTimeout"
timeoutChan := make(chan int, 1)
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
go func() {
cronTransOnceForwardNow(1)
cronTransOnceForwardNow(300)
timeoutChan <- 0
}()
_ = <-timeoutChan
return nil, nil
})
assert.Error(t, err)
}

Loading…
Cancel
Save