From abda3b43b16899f8571612ada62d3378342f6729 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 10 Feb 2022 16:47:08 +0800 Subject: [PATCH 1/3] add msg barrier unique key conflict --- dtmcli/barrier.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 251611d..898f831 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -8,6 +8,7 @@ package dtmcli import ( "database/sql" + "errors" "fmt" "net/url" @@ -82,6 +83,10 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) rerr = oerr } + if bb.Op == "msg" && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + return errors.New("unique key conflict") + } + if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // null compensate currentAffected == 0 { // repeated request or dangled request return From 2f3949f5b91c21d152899d62fab49c665ee43b45 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 10 Feb 2022 17:48:51 +0800 Subject: [PATCH 2/3] handle extreme case in DoAndSubmit --- dtmcli/barrier.go | 27 +++++++++++++++------------ dtmcli/barrier_mongo.go | 9 ++++++--- dtmcli/barrier_redis.go | 11 +++++++---- dtmcli/consts.go | 4 ++++ dtmcli/dtmimp/vars.go | 4 ++++ test/msg_barrier_mongo_test.go | 25 +++++++++++++++++++++++++ test/msg_barrier_redis_test.go | 23 +++++++++++++++++++++++ test/msg_barrier_test.go | 33 ++++++++++++++++++++++++++++----- 8 files changed, 112 insertions(+), 24 deletions(-) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 898f831..dc0e59e 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -8,7 +8,6 @@ package dtmcli import ( "database/sql" - "errors" "fmt" "net/url" @@ -32,6 +31,11 @@ func (bb *BranchBarrier) String() string { return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op) } +func (bb *BranchBarrier) newBarrierID() string { + bb.BarrierID++ + return fmt.Sprintf("%02d", bb.BarrierID) +} + // BarrierFromQuery construct transaction info from request func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) { return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("op")) @@ -63,31 +67,30 @@ func insertBarrier(tx DB, transType string, gid string, branchID string, op stri // tx: 本地数据库的事务对象,允许子事务屏障进行事务操作 // busiCall: 业务函数,仅在必要时被调用 func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) { - bb.BarrierID = bb.BarrierID + 1 - bid := fmt.Sprintf("%02d", bb.BarrierID) + bid := bb.newBarrierID() defer dtmimp.DeferDo(&rerr, func() error { return tx.Commit() }, func() error { return tx.Rollback() }) - ti := bb originOp := map[string]string{ BranchCancel: BranchTry, BranchCompensate: BranchAction, - }[ti.Op] + }[bb.Op] - originAffected, oerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originOp, bid, ti.Op) - currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) + originAffected, oerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op) + currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil { - rerr = oerr + + if rerr == nil && bb.Op == "msg" && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + return ErrDuplicated } - if bb.Op == "msg" && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. - return errors.New("unique key conflict") + if rerr == nil { + rerr = oerr } - if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // null compensate + if (bb.Op == BranchCancel || bb.Op == BranchCompensate) && originAffected > 0 || // null compensate currentAffected == 0 { // repeated request or dangled request return } diff --git a/dtmcli/barrier_mongo.go b/dtmcli/barrier_mongo.go index c4902ac..8178b4a 100644 --- a/dtmcli/barrier_mongo.go +++ b/dtmcli/barrier_mongo.go @@ -2,7 +2,6 @@ package dtmcli import ( "context" - "fmt" "strings" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -14,8 +13,7 @@ import ( // MongoCall sub-trans barrier for mongo. see http://dtm.pub/practice/barrier // experimental func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.SessionContext) error) (rerr error) { - bb.BarrierID = bb.BarrierID + 1 - bid := fmt.Sprintf("%02d", bb.BarrierID) + bid := bb.newBarrierID() return mc.UseSession(context.Background(), func(sc mongo.SessionContext) (rerr error) { rerr = sc.StartTransaction() if rerr != nil { @@ -34,6 +32,11 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session originAffected, oerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op) currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) + + if rerr == nil && bb.Op == "msg" && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + return ErrDuplicated + } + if rerr == nil { rerr = oerr } diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go index 9babfdf..fa2ecea 100644 --- a/dtmcli/barrier_redis.go +++ b/dtmcli/barrier_redis.go @@ -9,13 +9,13 @@ import ( // RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error { - bb.BarrierID = bb.BarrierID + 1 - bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID) + bid := bb.newBarrierID() + bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, bb.Op, bid) originOp := map[string]string{ BranchCancel: BranchTry, BranchCompensate: BranchAction, }[bb.Op] - bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID) + bkey2 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, originOp, bid) v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount local v = redis.call('GET', KEYS[1]) local e1 = redis.call('GET', KEYS[2]) @@ -25,7 +25,7 @@ if v == false or v + ARGV[1] < 0 then end if e1 ~= false then - return + return 'DUPLICATE' end redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3]) @@ -43,6 +43,9 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) if err == redis.Nil { err = nil } + if err == nil && bb.Op == "msg" && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate + return ErrDuplicated + } if err == nil && v == ResultFailure { err = ErrFailure } diff --git a/dtmcli/consts.go b/dtmcli/consts.go index b27b90a..814f17f 100644 --- a/dtmcli/consts.go +++ b/dtmcli/consts.go @@ -61,3 +61,7 @@ var ErrFailure = dtmimp.ErrFailure // ErrOngoing error for returned ongoing var ErrOngoing = dtmimp.ErrOngoing + +// ErrDuplicated error of DUPLICATED for only msg +// if QueryPrepared executed before call. then DoAndSubmit return this error +var ErrDuplicated = dtmimp.ErrDuplicated diff --git a/dtmcli/dtmimp/vars.go b/dtmcli/dtmimp/vars.go index 3cf31d3..3b30403 100644 --- a/dtmcli/dtmimp/vars.go +++ b/dtmcli/dtmimp/vars.go @@ -19,6 +19,10 @@ var ErrFailure = errors.New("FAILURE") // ErrOngoing error of ONGOING var ErrOngoing = errors.New("ONGOING") +// ErrDuplicated error of DUPLICATED for only msg +// if QueryPrepared executed before call. then DoAndSubmit return this error +var ErrDuplicated = errors.New("DUPLICATED") + // XaSQLTimeoutMs milliseconds for Xa sql to timeout var XaSQLTimeoutMs = 15000 diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index 1f36f6f..6d8dbfc 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -42,6 +42,31 @@ func TestMsgMongoDoBusiFailed(t *testing.T) { assertSameBalance(t, before, "mongo") } +func TestMsgMongoDoBusiLater(t *testing.T) { + before := getBeforeBalances("mongo") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + _, err := dtmcli.GetRestyClient().R(). + SetQueryParams(map[string]string{ + "trans_type": "msg", + "gid": gid, + "branch_id": "00", + "op": "msg", + "barrier_id": "01", + }). + SetBody(req).Get(Busi + "/MongoQueryPrepared") + assert.Nil(t, err) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaMongoTransIn", req) + err = msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return bb.MongoCall(busi.MongoGet(), func(sc mongo.SessionContext) error { + return busi.SagaMongoAdjustBalance(sc, sc.Client(), busi.TransOutUID, -30, "") + }) + }) + assert.Error(t, err, dtmcli.ErrDuplicated) + assertSameBalance(t, before, "mongo") +} + func TestMsgMongoDoCommitFailed(t *testing.T) { before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go index 5a5f4db..06b099f 100644 --- a/test/msg_barrier_redis_test.go +++ b/test/msg_barrier_redis_test.go @@ -39,6 +39,29 @@ func TestMsgRedisDoBusiFailed(t *testing.T) { assertSameBalance(t, before, "redis") } +func TestMsgRedisDoBusiLater(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + _, err := dtmcli.GetRestyClient().R(). + SetQueryParams(map[string]string{ + "trans_type": "msg", + "gid": gid, + "branch_id": "00", + "op": "msg", + "barrier_id": "01", + }). + SetBody(req).Get(Busi + "/RedisQueryPrepared") + assert.Nil(t, err) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err = msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Error(t, err, dtmcli.ErrDuplicated) + assertSameBalance(t, before, "redis") +} + func TestMsgRedisDoPrepareFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go index 150b7a0..828914a 100644 --- a/test/msg_barrier_test.go +++ b/test/msg_barrier_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMsgPrepareAndSubmit(t *testing.T) { +func TestMsgDoAndSubmit(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() req := busi.GenTransReq(30, false, false) @@ -30,7 +30,7 @@ func TestMsgPrepareAndSubmit(t *testing.T) { assertNotSameBalance(t, before, "mysql") } -func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) { +func TestMsgDoAndSubmitBusiFailed(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() req := busi.GenTransReq(30, false, false) @@ -43,7 +43,30 @@ func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) { assertSameBalance(t, before, "mysql") } -func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) { +func TestMsgDoAndSubmitBusiLater(t *testing.T) { + before := getBeforeBalances("mysql") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + _, err := dtmcli.GetRestyClient().R(). + SetQueryParams(map[string]string{ + "trans_type": "msg", + "gid": gid, + "branch_id": "00", + "op": "msg", + "barrier_id": "01", + }). + SetBody(req).Get(Busi + "/QueryPreparedB") + assert.Nil(t, err) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaBTransIn", req) + err = msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return nil + }) + assert.Error(t, err, dtmcli.ErrDuplicated) + assertSameBalance(t, before, "mysql") +} + +func TestMsgDoAndSubmitPrepareFailed(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() req := busi.GenTransReq(30, false, false) @@ -56,7 +79,7 @@ func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) { assertSameBalance(t, before, "mysql") } -func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) { +func TestMsgDoAndSubmitCommitFailed(t *testing.T) { if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit return } @@ -79,7 +102,7 @@ func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) { assertSameBalance(t, before, "mysql") } -func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) { +func TestMsgDoAndSubmitCommitAfterFailed(t *testing.T) { if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit return } From 57f7408b03542b7a2e54e587f45dd254384b883f Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 10 Feb 2022 18:12:34 +0800 Subject: [PATCH 3/3] msg change to opMsg --- dtmcli/barrier.go | 4 +++- dtmcli/barrier_mongo.go | 2 +- dtmcli/barrier_redis.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index dc0e59e..558f9af 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -27,6 +27,8 @@ type BranchBarrier struct { BarrierID int } +const opMsg = "msg" + func (bb *BranchBarrier) String() string { return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op) } @@ -82,7 +84,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == "msg" && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } diff --git a/dtmcli/barrier_mongo.go b/dtmcli/barrier_mongo.go index 8178b4a..98c0f67 100644 --- a/dtmcli/barrier_mongo.go +++ b/dtmcli/barrier_mongo.go @@ -33,7 +33,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == "msg" && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go index fa2ecea..6fcc0ab 100644 --- a/dtmcli/barrier_redis.go +++ b/dtmcli/barrier_redis.go @@ -43,7 +43,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) if err == redis.Nil { err = nil } - if err == nil && bb.Op == "msg" && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate + if err == nil && bb.Op == opMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate return ErrDuplicated } if err == nil && v == ResultFailure {