Browse Source

barrier redis ok

pull/195/head
yedf2 4 years ago
parent
commit
3840e6d9e3
  1. 42
      dtmcli/barrier.go
  2. 73
      dtmcli/barrier_redis.go
  3. 12
      test/busi/barrier.go
  4. 5
      test/busi/base_http.go
  5. 4
      test/busi/base_types.go
  6. 1
      test/busi/startup.go
  7. 4
      test/busi/utils.go
  8. 82
      test/msg_barrier_redis_test.go

42
dtmcli/barrier.go

@ -13,7 +13,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8"
)
// BarrierBusiFunc type for busi func
@ -115,44 +114,3 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
}
return err
}
// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, originOp, bb.BarrierID)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
if v == false or v + ARGV[1] < 0 then
return 'FAILURE'
end
if e1 ~= false then
return
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
if ARGV[2] ~= '' then
local e2 = redis.call('GET', KEYS[3])
if e2 == false then
redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3])
return
end
end
redis.call('INCRBY', KEYS[1], ARGV[1])
`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}

73
dtmcli/barrier_redis.go

@ -0,0 +1,73 @@
package dtmcli
import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8"
)
// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bb.BarrierID = bb.BarrierID + 1
bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
if v == false or v + ARGV[1] < 0 then
return 'FAILURE'
end
if e1 ~= false then
return
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
if ARGV[2] ~= '' then
local e2 = redis.call('GET', KEYS[3])
if e2 == false then
redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3])
return
end
end
redis.call('INCRBY', KEYS[1], ARGV[1])
`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}
// RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01")
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1])
if v == false then
redis.call('SET', KEYS[1], 'rollback', 'EX', ARGV[1])
v = 'rollback'
end
if v == 'rollback' then
return 'FAILURE'
end
`, []string{bkey1}, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}

12
test/busi/barrier.go

@ -73,16 +73,16 @@ func init() {
})
}))
app.POST(BusiAPI+"/SagaRedisTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/SagaRedisTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
req := reqFrom(c)
@ -90,7 +90,7 @@ func init() {
return dtmcli.String2DtmError(req.TransOutResult)
}
if req.Store == config.Redis {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), req.Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
}
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
@ -113,7 +113,7 @@ func init() {
func TccBarrierTransOutCancel(c *gin.Context) interface{} {
req := reqFrom(c)
if req.Store == "redis" {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -req.Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -req.Amount, 7*86400)
}
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, TransOutUID, reqFrom(c).Amount)

5
test/busi/base_http.go

@ -132,6 +132,11 @@ func BaseAddRoute(app *gin.Engine) {
db := dbGet().ToSQLDB()
return bb.QueryPrepared(db)
}))
app.GET(BusiAPI+"/RedisQueryPrepared", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
logger.Debugf("%s RedisQueryPrepared", c.Query("gid"))
bb := MustBarrierFromGin(c)
return bb.RedisQueryPrepared(RedisGet(), 86400)
}))
app.POST(BusiAPI+"/TransInXa", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error {
return SagaAdjustBalance(db, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)

4
test/busi/base_types.go

@ -39,7 +39,7 @@ func (*UserAccount) TableName() string {
func GetBalanceByUID(uid int, store string) int {
if store == "redis" {
rd := RedisGet()
accA, err := rd.Get(rd.Context(), getRedisAccountKey(uid)).Result()
accA, err := rd.Get(rd.Context(), GetRedisAccountKey(uid)).Result()
dtmimp.E2P(err)
return dtmimp.MustAtoi(accA)
}
@ -131,6 +131,6 @@ type mainSwitchType struct {
// MainSwitch controls busi success or fail
var MainSwitch mainSwitchType
func getRedisAccountKey(uid int) string {
func GetRedisAccountKey(uid int) string {
return fmt.Sprintf("{a}-redis-account-key-%d", uid)
}

1
test/busi/startup.go

@ -24,4 +24,5 @@ func PopulateDB(skipDrop bool) {
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
_, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear
dtmimp.E2P(err)
SetRedisBothAccount(10000, 10000)
}

4
test/busi/utils.go

@ -137,8 +137,8 @@ func RedisGet() *redis.Client {
// SetRedisBothAccount 1
func SetRedisBothAccount(accountA int, accountB int) {
rd := RedisGet()
_, err := rd.Set(rd.Context(), getRedisAccountKey(TransOutUID), accountA, 0).Result()
_, err := rd.Set(rd.Context(), GetRedisAccountKey(TransOutUID), accountA, 0).Result()
dtmimp.E2P(err)
_, err = rd.Set(rd.Context(), getRedisAccountKey(TransInUID), accountB, 0).Result()
_, err = rd.Set(rd.Context(), GetRedisAccountKey(TransInUID), accountB, 0).Result()
dtmimp.E2P(err)
}

82
test/msg_barrier_redis_test.go

@ -0,0 +1,82 @@
package test
import (
"errors"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgRedisDo(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assertNotSameBalance(t, before, "redis")
}
func TestMsgRedisDoBusiFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return errors.New("an error")
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoPrepareFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer+"not-exists", gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoCommitFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return errors.New("after commit error")
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoCommitAfterFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
err := bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
dtmimp.E2P(err)
return errors.New("an error")
})
assert.Error(t, err)
waitTransProcessed(gid)
assertNotSameBalance(t, before, "redis")
}
Loading…
Cancel
Save