Browse Source

xa use barrier

pull/251/head
yedf2 4 years ago
parent
commit
28412bd757
  1. 22
      dtmcli/barrier.go
  2. 10
      dtmcli/barrier_mongo.go
  3. 4
      dtmcli/barrier_redis.go
  4. 15
      dtmcli/consts.go
  5. 19
      dtmcli/dtmimp/consts.go
  6. 10
      dtmcli/dtmimp/trans_xa_base.go
  7. 12
      dtmcli/dtmimp/utils.go
  8. 10
      dtmcli/tcc.go
  9. 2
      dtmcli/xa.go
  10. 6
      dtmsvr/api.go
  11. 2
      dtmsvr/trans_status.go
  12. 6
      dtmsvr/trans_type_msg.go
  13. 10
      dtmsvr/trans_type_saga.go
  14. 2
      dtmsvr/trans_type_tcc.go
  15. 2
      dtmsvr/trans_type_xa.go
  16. 2
      test/base_test.go
  17. 2
      test/busi/base_http.go
  18. 18
      test/tcc_barrier_test.go

22
dtmcli/barrier.go

@ -55,14 +55,6 @@ func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) {
return ti, nil
}
func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
if op == "" {
return 0, nil
}
sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate(dtmimp.BarrierTableName+"(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}
// Call see detail description in https://en.dtm.pub/practice/barrier.html
// tx: local transaction connection
// busiCall: busi func
@ -74,12 +66,12 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
return tx.Rollback()
})
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
dtmimp.OpCancel: dtmimp.OpTry,
dtmimp.OpCompensate: dtmimp.OpAction,
}[bb.Op]
originAffected, oerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op)
currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op)
currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
@ -90,7 +82,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
rerr = oerr
}
if (bb.Op == BranchCancel || bb.Op == BranchCompensate) && originAffected > 0 || // null compensate
if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate) && originAffected > 0 || // null compensate
currentAffected == 0 { // repeated request or dangled request
return
}
@ -111,13 +103,13 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
// QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, "rollback")
_, err := dtmimp.InsertBarrier(db, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, dtmimp.OpRollback)
var reason string
if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
err = db.QueryRow(sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1).Scan(&reason)
}
if reason == "rollback" {
if reason == dtmimp.OpRollback {
return ErrFailure
}
return err

10
dtmcli/barrier_mongo.go

@ -25,8 +25,8 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
return sc.AbortTransaction(sc)
})
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
dtmimp.OpCancel: dtmimp.OpTry,
dtmimp.OpCompensate: dtmimp.OpAction,
}[bb.Op]
originAffected, oerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op)
@ -40,7 +40,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
if rerr == nil {
rerr = oerr
}
if (bb.Op == BranchCancel || bb.Op == BranchCompensate) && originAffected > 0 || // null compensate
if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate) && originAffected > 0 || // null compensate
currentAffected == 0 { // repeated request or dangled request
return
}
@ -54,7 +54,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
// MongoQueryPrepared query prepared for redis
// experimental
func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error {
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, "rollback")
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, dtmimp.OpRollback)
var result bson.M
if err == nil {
fs := strings.Split(dtmimp.BarrierTableName, ".")
@ -70,7 +70,7 @@ func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error {
if err == nil {
reason, _ = result["reason"].(string)
}
if err == nil && reason == "rollback" {
if err == nil && reason == dtmimp.OpRollback {
return ErrFailure
}
return err

4
dtmcli/barrier_redis.go

@ -13,8 +13,8 @@ func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, am
bid := bb.newBarrierID()
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, bb.Op, bid)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
dtmimp.OpCancel: dtmimp.OpTry,
dtmimp.OpCompensate: dtmimp.OpAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, originOp, bid)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount

15
dtmcli/consts.go

@ -24,21 +24,6 @@ const (
// StatusAborting status for global trans status.
StatusAborting = "aborting"
// BranchTry branch type for TCC
BranchTry = "try"
// BranchConfirm branch type for TCC
BranchConfirm = "confirm"
// BranchCancel branch type for TCC
BranchCancel = "cancel"
// BranchAction branch type for message, SAGA, XA
BranchAction = "action"
// BranchCompensate branch type for SAGA
BranchCompensate = "compensate"
// BranchCommit branch type for XA
BranchCommit = "commit"
// BranchRollback branch type for XA
BranchRollback = "rollback"
// ResultSuccess for result of a trans/trans branch
ResultSuccess = dtmimp.ResultSuccess
// ResultFailure for result of a trans/trans branch

19
dtmcli/dtmimp/consts.go

@ -16,6 +16,22 @@ const (
// ResultOngoing for result of a trans/trans branch
// Same as HTTP status 425 and GRPC code 9
ResultOngoing = "ONGOING"
// OpTry branch type for TCC
OpTry = "try"
// OpConfirm branch type for TCC
OpConfirm = "confirm"
// OpCancel branch type for TCC
OpCancel = "cancel"
// OpAction branch type for message, SAGA, XA
OpAction = "action"
// OpCompensate branch type for SAGA
OpCompensate = "compensate"
// OpCommit branch type for XA
OpCommit = "commit"
// OpRollback branch type for XA
OpRollback = "rollback"
// DBTypeMysql const for driver mysql
DBTypeMysql = "mysql"
// DBTypePostgres const for driver postgres
@ -36,4 +52,7 @@ const (
MsgDoBarrier1 = "01"
// MsgDoOp const for DoAndSubmit barrier op
MsgDoOp = "msg"
// XaBarrier1 const for xa barrier id
XaBarrier1 = "01"
)

10
dtmcli/dtmimp/trans_xa_base.go

@ -33,6 +33,10 @@ func (xc *XaClientBase) HandleCallback(gid string, branchID string, action strin
(strings.Contains(err.Error(), "XAER_NOTA") || strings.Contains(err.Error(), "does not exist")) { // Repeat commit/rollback with the same id, report this error, ignore
err = nil
}
if action == OpRollback && err == nil {
// rollback insert a row after prepare. no-error means prepare has finished.
_, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, action)
}
return err
}
@ -57,7 +61,11 @@ func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) error)
defer func() {
_, _ = DBExec(db, GetDBSpecial().GetXaSQL("end", xaBranch))
}()
rerr = cb(db)
// prepare and rollback both insert a row
_, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction)
if rerr == nil {
rerr = cb(db)
}
return
}

12
dtmcli/dtmimp/utils.go

@ -242,10 +242,20 @@ func Escape(input string) string {
v := strings.Replace(input, "\n", "", -1)
v = strings.Replace(v, "\r", "", -1)
v = strings.Replace(v, ";", "", -1)
return strings.Replace(v, "'", "", -1)
// v = strings.Replace(v, "'", "", -1)
return v
}
// EscapeGet escape get
func EscapeGet(qs url.Values, key string) string {
return Escape(qs.Get(key))
}
// InsertBarrier insert a record to barrier
func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
if op == "" {
return 0, nil
}
sql := GetDBSpecial().GetInsertIgnoreTemplate(BarrierTableName+"(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
return DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}

10
dtmcli/tcc.go

@ -60,13 +60,13 @@ func TccFromQuery(qs url.Values) (*Tcc, error) {
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
branchID := t.NewSubBranchID()
err := dtmimp.TransRegisterBranch(&t.TransBase, map[string]string{
"data": dtmimp.MustMarshalString(body),
"branch_id": branchID,
BranchConfirm: confirmURL,
BranchCancel: cancelURL,
"data": dtmimp.MustMarshalString(body),
"branch_id": branchID,
dtmimp.OpConfirm: confirmURL,
dtmimp.OpCancel: cancelURL,
}, "registerBranch")
if err != nil {
return nil, err
}
return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, BranchTry, tryURL)
return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, dtmimp.OpTry, tryURL)
}

2
dtmcli/xa.go

@ -101,5 +101,5 @@ func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc Xa
// CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewSubBranchID()
return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, BranchAction, url)
return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, dtmimp.OpAction, url)
}

6
dtmsvr/api.go

@ -70,14 +70,14 @@ func svcForceStop(t *TransGlobal) interface{} {
func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) error {
branches := []TransBranch{*branch, *branch}
if transType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
for i, b := range []string{dtmimp.OpCancel, dtmimp.OpConfirm} {
branches[i].Op = b
branches[i].URL = data[b]
}
} else if transType == "xa" {
branches[0].Op = dtmcli.BranchRollback
branches[0].Op = dtmimp.OpRollback
branches[0].URL = data["url"]
branches[1].Op = dtmcli.BranchCommit
branches[1].Op = dtmimp.OpCommit
branches[1].URL = data["url"]
} else {
return fmt.Errorf("unknow trans type: %s", transType)

2
dtmsvr/trans_status.go

@ -176,7 +176,7 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
err := t.getURLResult(branch.URL, branch.BranchID, branch.Op, branch.BinData)
if err == nil {
return dtmcli.StatusSucceed, nil
} else if t.TransType == "saga" && branch.Op == dtmcli.BranchAction && errors.Is(err, dtmcli.ErrFailure) {
} else if t.TransType == "saga" && branch.Op == dtmimp.OpAction && errors.Is(err, dtmcli.ErrFailure) {
return dtmcli.StatusFailed, nil
} else if errors.Is(err, dtmcli.ErrOngoing) {
return "", dtmcli.ErrOngoing

6
dtmsvr/trans_type_msg.go

@ -30,8 +30,8 @@ func (t *transMsgProcessor) GenBranches() []TransBranch {
Gid: t.Gid,
BranchID: fmt.Sprintf("%02d", i+1),
BinData: t.BinPayloads[i],
URL: step[dtmcli.BranchAction],
Op: dtmcli.BranchAction,
URL: step[dtmimp.OpAction],
Op: dtmimp.OpAction,
Status: dtmcli.StatusPrepared,
}
branches = append(branches, *b)
@ -79,7 +79,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
var err error
for i := range branches {
b := &branches[i]
if b.Op != dtmcli.BranchAction || b.Status != dtmcli.StatusPrepared {
if b.Op != dtmimp.OpAction || b.Status != dtmcli.StatusPrepared {
continue
}
if t.Concurrent {

10
dtmsvr/trans_type_saga.go

@ -30,7 +30,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
for i, step := range t.Steps {
branch := fmt.Sprintf("%02d", i+1)
for _, op := range []string{dtmcli.BranchCompensate, dtmcli.BranchAction} {
for _, op := range []string{dtmimp.OpCompensate, dtmimp.OpAction} {
branches = append(branches, TransBranch{
Gid: t.Gid,
BranchID: branch,
@ -82,7 +82,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
if b.Op == dtmcli.BranchAction {
if b.Op == dtmimp.OpAction {
if b.Status == dtmcli.StatusPrepared {
rsAToStart++
} else if b.Status == dtmcli.StatusFailed {
@ -163,7 +163,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
runBranches := func(toRun []int) {
for _, b := range toRun {
branchResults[b].started = true
if branchResults[b].op == dtmcli.BranchAction {
if branchResults[b].op == dtmimp.OpAction {
rsAStarted++
}
go asyncExecBranch(b)
@ -174,7 +174,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
case r := <-resultChan:
br := &branchResults[r.index]
br.status = r.status
if r.op == dtmcli.BranchAction {
if r.op == dtmimp.OpAction {
rsADone++
if r.status == dtmcli.StatusFailed {
rsAFailed++
@ -202,7 +202,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
if b.op == dtmimp.OpCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
}

2
dtmsvr/trans_type_tcc.go

@ -31,7 +31,7 @@ func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting)
}
op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpConfirm, dtmimp.OpCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID)

2
dtmsvr/trans_type_xa.go

@ -30,7 +30,7 @@ func (t *transXaProcessor) ProcessOnce(branches []TransBranch) error {
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting)
}
currentType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchCommit, dtmcli.BranchRollback).(string)
currentType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpCommit, dtmimp.OpRollback).(string)
for i, branch := range branches {
if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed {
err := t.execBranch(&branch, i)

2
test/base_test.go

@ -35,7 +35,7 @@ func TestBaseSqlDB(t *testing.T) {
TransType: "saga",
Gid: "gid2",
BranchID: "branch_id2",
Op: dtmcli.BranchAction,
Op: dtmimp.OpAction,
BarrierID: 1,
}
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')")

2
test/busi/base_http.go

@ -194,7 +194,7 @@ func BaseAddRoute(app *gin.Engine) {
}
return nil
}))
app.POST(BusiAPI+"/TccBSleepCancel", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
app.POST(BusiAPI+"/SleepCancel", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return sleepCancelHandler(c)
}))
app.POST(BusiAPI+"/TransOutHeaderYes", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {

18
test/tcc_barrier_test.go

@ -73,7 +73,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
body := &busi.TransReq{Amount: 30, Store: store}
tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm"
cancelURL := Busi + "/TccBSleepCancel"
cancelURL := Busi + "/SleepCancel"
// refer to time diagram for barrier, here we simulate it
branchID := tcc.NewSubBranchID()
busi.SetSleepCancelHandler(func(c *gin.Context) interface{} {
@ -88,13 +88,13 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
// register tcc branch
resp, err := dtmimp.RestyClient.R().
SetBody(map[string]interface{}{
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"status": StatusPrepared,
"data": string(dtmimp.MustMarshal(body)),
dtmcli.BranchConfirm: confirmURL,
dtmcli.BranchCancel: cancelURL,
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"status": StatusPrepared,
"data": string(dtmimp.MustMarshal(body)),
dtmimp.OpConfirm: confirmURL,
dtmimp.OpCancel: cancelURL,
}).Post(fmt.Sprintf("%s/%s", tcc.Dtm, "registerBranch"))
assert.Nil(t, err)
assert.Contains(t, resp.String(), dtmcli.ResultSuccess)
@ -121,7 +121,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"op": dtmcli.BranchTry,
"op": dtmimp.OpTry,
}).
Post(tryURL)
assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // dangle op, return success

Loading…
Cancel
Save