From 3840e6d9e3bafbb118ec2e3221c9a09d5a39cf95 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Mon, 24 Jan 2022 11:40:40 +0800 Subject: [PATCH] barrier redis ok --- dtmcli/barrier.go | 42 ----------------- dtmcli/barrier_redis.go | 73 ++++++++++++++++++++++++++++++ test/busi/barrier.go | 12 ++--- test/busi/base_http.go | 5 +++ test/busi/base_types.go | 4 +- test/busi/startup.go | 1 + test/busi/utils.go | 4 +- test/msg_barrier_redis_test.go | 82 ++++++++++++++++++++++++++++++++++ 8 files changed, 171 insertions(+), 52 deletions(-) create mode 100644 dtmcli/barrier_redis.go create mode 100644 test/msg_barrier_redis_test.go diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index c2b66d1..497ab63 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -13,7 +13,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/go-redis/redis/v8" ) // BarrierBusiFunc type for busi func @@ -115,44 +114,3 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { } return err } - -// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount -func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error { - bkey1 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, bb.Op, bb.BarrierID) - originOp := map[string]string{ - BranchCancel: BranchTry, - BranchCompensate: BranchAction, - }[bb.Op] - bkey2 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, originOp, bb.BarrierID) - v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount -local v = redis.call('GET', KEYS[1]) -local e1 = redis.call('GET', KEYS[2]) - -if v == false or v + ARGV[1] < 0 then - return 'FAILURE' -end - -if e1 ~= false then - return -end - -redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3]) - -if ARGV[2] ~= '' then - local e2 = redis.call('GET', KEYS[3]) - if e2 == false then - redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3]) - return - end -end -redis.call('INCRBY', KEYS[1], ARGV[1]) -`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result() - logger.Debugf("lua return v: %v err: %v", v, err) - if err == redis.Nil { - err = nil - } - if err == nil && v == ResultFailure { - err = ErrFailure - } - return err -} diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go new file mode 100644 index 0000000..9babfdf --- /dev/null +++ b/dtmcli/barrier_redis.go @@ -0,0 +1,73 @@ +package dtmcli + +import ( + "fmt" + + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/go-redis/redis/v8" +) + +// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount +func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error { + bb.BarrierID = bb.BarrierID + 1 + bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID) + originOp := map[string]string{ + BranchCancel: BranchTry, + BranchCompensate: BranchAction, + }[bb.Op] + bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID) + v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount +local v = redis.call('GET', KEYS[1]) +local e1 = redis.call('GET', KEYS[2]) + +if v == false or v + ARGV[1] < 0 then + return 'FAILURE' +end + +if e1 ~= false then + return +end + +redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3]) + +if ARGV[2] ~= '' then + local e2 = redis.call('GET', KEYS[3]) + if e2 == false then + redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3]) + return + end +end +redis.call('INCRBY', KEYS[1], ARGV[1]) +`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result() + logger.Debugf("lua return v: %v err: %v", v, err) + if err == redis.Nil { + err = nil + } + if err == nil && v == ResultFailure { + err = ErrFailure + } + return err +} + +// RedisQueryPrepared query prepared for redis +func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { + bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01") + v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared +local v = redis.call('GET', KEYS[1]) +if v == false then + redis.call('SET', KEYS[1], 'rollback', 'EX', ARGV[1]) + v = 'rollback' +end +if v == 'rollback' then + return 'FAILURE' +end +`, []string{bkey1}, barrierExpire).Result() + logger.Debugf("lua return v: %v err: %v", v, err) + if err == redis.Nil { + err = nil + } + if err == nil && v == ResultFailure { + err = ErrFailure + } + return err +} diff --git a/test/busi/barrier.go b/test/busi/barrier.go index 63c24c2..7a44ed6 100644 --- a/test/busi/barrier.go +++ b/test/busi/barrier.go @@ -73,16 +73,16 @@ func init() { }) })) app.POST(BusiAPI+"/SagaRedisTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/SagaRedisTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { req := reqFrom(c) @@ -90,7 +90,7 @@ func init() { return dtmcli.String2DtmError(req.TransOutResult) } if req.Store == config.Redis { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), req.Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400) } return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { @@ -113,7 +113,7 @@ func init() { func TccBarrierTransOutCancel(c *gin.Context) interface{} { req := reqFrom(c) if req.Store == "redis" { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -req.Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -req.Amount, 7*86400) } return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { return tccAdjustTrading(tx, TransOutUID, reqFrom(c).Amount) diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 0fe8f51..5822aef 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -132,6 +132,11 @@ func BaseAddRoute(app *gin.Engine) { db := dbGet().ToSQLDB() return bb.QueryPrepared(db) })) + app.GET(BusiAPI+"/RedisQueryPrepared", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + logger.Debugf("%s RedisQueryPrepared", c.Query("gid")) + bb := MustBarrierFromGin(c) + return bb.RedisQueryPrepared(RedisGet(), 86400) + })) app.POST(BusiAPI+"/TransInXa", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error { return SagaAdjustBalance(db, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) diff --git a/test/busi/base_types.go b/test/busi/base_types.go index 550c5bf..2d24948 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -39,7 +39,7 @@ func (*UserAccount) TableName() string { func GetBalanceByUID(uid int, store string) int { if store == "redis" { rd := RedisGet() - accA, err := rd.Get(rd.Context(), getRedisAccountKey(uid)).Result() + accA, err := rd.Get(rd.Context(), GetRedisAccountKey(uid)).Result() dtmimp.E2P(err) return dtmimp.MustAtoi(accA) } @@ -131,6 +131,6 @@ type mainSwitchType struct { // MainSwitch controls busi success or fail var MainSwitch mainSwitchType -func getRedisAccountKey(uid int) string { +func GetRedisAccountKey(uid int) string { return fmt.Sprintf("{a}-redis-account-key-%d", uid) } diff --git a/test/busi/startup.go b/test/busi/startup.go index 2b40a5c..7e02949 100644 --- a/test/busi/startup.go +++ b/test/busi/startup.go @@ -24,4 +24,5 @@ func PopulateDB(skipDrop bool) { dtmutil.RunSQLScript(BusiConf, file, skipDrop) _, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear dtmimp.E2P(err) + SetRedisBothAccount(10000, 10000) } diff --git a/test/busi/utils.go b/test/busi/utils.go index 98c9609..d77d668 100644 --- a/test/busi/utils.go +++ b/test/busi/utils.go @@ -137,8 +137,8 @@ func RedisGet() *redis.Client { // SetRedisBothAccount 1 func SetRedisBothAccount(accountA int, accountB int) { rd := RedisGet() - _, err := rd.Set(rd.Context(), getRedisAccountKey(TransOutUID), accountA, 0).Result() + _, err := rd.Set(rd.Context(), GetRedisAccountKey(TransOutUID), accountA, 0).Result() dtmimp.E2P(err) - _, err = rd.Set(rd.Context(), getRedisAccountKey(TransInUID), accountB, 0).Result() + _, err = rd.Set(rd.Context(), GetRedisAccountKey(TransInUID), accountB, 0).Result() dtmimp.E2P(err) } diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go new file mode 100644 index 0000000..5a5f4db --- /dev/null +++ b/test/msg_barrier_redis_test.go @@ -0,0 +1,82 @@ +package test + +import ( + "errors" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestMsgRedisDo(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Nil(t, err) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) + assertNotSameBalance(t, before, "redis") +} + +func TestMsgRedisDoBusiFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return errors.New("an error") + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgRedisDoPrepareFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer+"not-exists", gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgRedisDoCommitFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return errors.New("after commit error") + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgRedisDoCommitAfterFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + err := bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + dtmimp.E2P(err) + return errors.New("an error") + }) + assert.Error(t, err) + waitTransProcessed(gid) + assertNotSameBalance(t, before, "redis") +}