Browse Source

Merge pull request #209 from dtm-labs/alpha

handle extreme case when msg DoAndSubmit Call is after QueryPrepare
pull/210/head v1.12.1
yedf2 4 years ago
committed by GitHub
parent
commit
09da7babd5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      dtmcli/barrier.go
  2. 9
      dtmcli/barrier_mongo.go
  3. 11
      dtmcli/barrier_redis.go
  4. 4
      dtmcli/consts.go
  5. 4
      dtmcli/dtmimp/vars.go
  6. 25
      test/msg_barrier_mongo_test.go
  7. 23
      test/msg_barrier_redis_test.go
  8. 33
      test/msg_barrier_test.go

24
dtmcli/barrier.go

@ -27,10 +27,17 @@ type BranchBarrier struct {
BarrierID int
}
const opMsg = "msg"
func (bb *BranchBarrier) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op)
}
func (bb *BranchBarrier) newBarrierID() string {
bb.BarrierID++
return fmt.Sprintf("%02d", bb.BarrierID)
}
// BarrierFromQuery construct transaction info from request
func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) {
return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("op"))
@ -62,27 +69,30 @@ func insertBarrier(tx DB, transType string, gid string, branchID string, op stri
// tx: 本地数据库的事务对象,允许子事务屏障进行事务操作
// busiCall: 业务函数,仅在必要时被调用
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
bb.BarrierID = bb.BarrierID + 1
bid := fmt.Sprintf("%02d", bb.BarrierID)
bid := bb.newBarrierID()
defer dtmimp.DeferDo(&rerr, func() error {
return tx.Commit()
}, func() error {
return tx.Rollback()
})
ti := bb
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[ti.Op]
}[bb.Op]
originAffected, oerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originOp, bid, ti.Op)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
originAffected, oerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op)
currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
if rerr == nil {
rerr = oerr
}
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // null compensate
if (bb.Op == BranchCancel || bb.Op == BranchCompensate) && originAffected > 0 || // null compensate
currentAffected == 0 { // repeated request or dangled request
return
}

9
dtmcli/barrier_mongo.go

@ -2,7 +2,6 @@ package dtmcli
import (
"context"
"fmt"
"strings"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -14,8 +13,7 @@ import (
// MongoCall sub-trans barrier for mongo. see http://dtm.pub/practice/barrier
// experimental
func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.SessionContext) error) (rerr error) {
bb.BarrierID = bb.BarrierID + 1
bid := fmt.Sprintf("%02d", bb.BarrierID)
bid := bb.newBarrierID()
return mc.UseSession(context.Background(), func(sc mongo.SessionContext) (rerr error) {
rerr = sc.StartTransaction()
if rerr != nil {
@ -34,6 +32,11 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
originAffected, oerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op)
currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
if rerr == nil {
rerr = oerr
}

11
dtmcli/barrier_redis.go

@ -9,13 +9,13 @@ import (
// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bb.BarrierID = bb.BarrierID + 1
bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
bid := bb.newBarrierID()
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, bb.Op, bid)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID)
bkey2 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, bb.BranchID, originOp, bid)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
@ -25,7 +25,7 @@ if v == false or v + ARGV[1] < 0 then
end
if e1 ~= false then
return
return 'DUPLICATE'
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
@ -43,6 +43,9 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
if err == redis.Nil {
err = nil
}
if err == nil && bb.Op == opMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
return ErrDuplicated
}
if err == nil && v == ResultFailure {
err = ErrFailure
}

4
dtmcli/consts.go

@ -61,3 +61,7 @@ var ErrFailure = dtmimp.ErrFailure
// ErrOngoing error for returned ongoing
var ErrOngoing = dtmimp.ErrOngoing
// ErrDuplicated error of DUPLICATED for only msg
// if QueryPrepared executed before call. then DoAndSubmit return this error
var ErrDuplicated = dtmimp.ErrDuplicated

4
dtmcli/dtmimp/vars.go

@ -19,6 +19,10 @@ var ErrFailure = errors.New("FAILURE")
// ErrOngoing error of ONGOING
var ErrOngoing = errors.New("ONGOING")
// ErrDuplicated error of DUPLICATED for only msg
// if QueryPrepared executed before call. then DoAndSubmit return this error
var ErrDuplicated = errors.New("DUPLICATED")
// XaSQLTimeoutMs milliseconds for Xa sql to timeout
var XaSQLTimeoutMs = 15000

25
test/msg_barrier_mongo_test.go

@ -42,6 +42,31 @@ func TestMsgMongoDoBusiFailed(t *testing.T) {
assertSameBalance(t, before, "mongo")
}
func TestMsgMongoDoBusiLater(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
"gid": gid,
"branch_id": "00",
"op": "msg",
"barrier_id": "01",
}).
SetBody(req).Get(Busi + "/MongoQueryPrepared")
assert.Nil(t, err)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaMongoTransIn", req)
err = msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.MongoCall(busi.MongoGet(), func(sc mongo.SessionContext) error {
return busi.SagaMongoAdjustBalance(sc, sc.Client(), busi.TransOutUID, -30, "")
})
})
assert.Error(t, err, dtmcli.ErrDuplicated)
assertSameBalance(t, before, "mongo")
}
func TestMsgMongoDoCommitFailed(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()

23
test/msg_barrier_redis_test.go

@ -39,6 +39,29 @@ func TestMsgRedisDoBusiFailed(t *testing.T) {
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoBusiLater(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
"gid": gid,
"branch_id": "00",
"op": "msg",
"barrier_id": "01",
}).
SetBody(req).Get(Busi + "/RedisQueryPrepared")
assert.Nil(t, err)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err = msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Error(t, err, dtmcli.ErrDuplicated)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoPrepareFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()

33
test/msg_barrier_test.go

@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestMsgPrepareAndSubmit(t *testing.T) {
func TestMsgDoAndSubmit(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
@ -30,7 +30,7 @@ func TestMsgPrepareAndSubmit(t *testing.T) {
assertNotSameBalance(t, before, "mysql")
}
func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) {
func TestMsgDoAndSubmitBusiFailed(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
@ -43,7 +43,30 @@ func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) {
assertSameBalance(t, before, "mysql")
}
func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) {
func TestMsgDoAndSubmitBusiLater(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
"gid": gid,
"branch_id": "00",
"op": "msg",
"barrier_id": "01",
}).
SetBody(req).Get(Busi + "/QueryPreparedB")
assert.Nil(t, err)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err = msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return nil
})
assert.Error(t, err, dtmcli.ErrDuplicated)
assertSameBalance(t, before, "mysql")
}
func TestMsgDoAndSubmitPrepareFailed(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
@ -56,7 +79,7 @@ func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) {
assertSameBalance(t, before, "mysql")
}
func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) {
func TestMsgDoAndSubmitCommitFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}
@ -79,7 +102,7 @@ func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) {
assertSameBalance(t, before, "mysql")
}
func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) {
func TestMsgDoAndSubmitCommitAfterFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}

Loading…
Cancel
Save