Browse Source

Merge pull request #239 from stulzq/main

replace string to const on barrier
pull/251/head
yedf2 4 years ago
committed by GitHub
parent
commit
f480b46df9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dtmcli/barrier.go
  2. 10
      dtmcli/barrier_mongo.go
  3. 5
      dtmcli/barrier_redis.go
  4. 6
      dtmcli/dtmimp/consts.go
  5. 2
      dtmcli/msg.go
  6. 2
      dtmgrpc/msg.go
  7. 6
      test/msg_barrier_mongo_test.go
  8. 6
      test/msg_barrier_redis_test.go
  9. 6
      test/msg_barrier_test.go

8
dtmcli/barrier.go

@ -27,8 +27,6 @@ type BranchBarrier struct {
BarrierID int BarrierID int
} }
const opMsg = "msg"
func (bb *BranchBarrier) String() string { func (bb *BranchBarrier) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op) return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op)
} }
@ -84,7 +82,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated return ErrDuplicated
} }
@ -113,11 +111,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
// QueryPrepared queries prepared data // QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback") _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback")
var reason string var reason string
if err == nil { if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
err = db.QueryRow(sql, bb.Gid, "00", "msg", "01").Scan(&reason) err = db.QueryRow(sql, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01).Scan(&reason)
} }
if reason == "rollback" { if reason == "rollback" {
return ErrFailure return ErrFailure

10
dtmcli/barrier_mongo.go

@ -33,7 +33,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated return ErrDuplicated
} }
@ -54,16 +54,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
// MongoQueryPrepared query prepared for redis // MongoQueryPrepared query prepared for redis
// experimental // experimental
func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error { func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error {
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, "00", "msg", "01", "rollback") _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback")
var result bson.M var result bson.M
if err == nil { if err == nil {
fs := strings.Split(dtmimp.BarrierTableName, ".") fs := strings.Split(dtmimp.BarrierTableName, ".")
barrier := mc.Database(fs[0]).Collection(fs[1]) barrier := mc.Database(fs[0]).Collection(fs[1])
err = barrier.FindOne(context.Background(), bson.D{ err = barrier.FindOne(context.Background(), bson.D{
{Key: "gid", Value: bb.Gid}, {Key: "gid", Value: bb.Gid},
{Key: "branch_id", Value: "00"}, {Key: "branch_id", Value: dtmimp.BranchId00},
{Key: "op", Value: "msg"}, {Key: "op", Value: dtmimp.BarrierOpMsg},
{Key: "barrier_id", Value: "01"}, {Key: "barrier_id", Value: dtmimp.BarrierID01},
}).Decode(&result) }).Decode(&result)
} }
var reason string var reason string

5
dtmcli/barrier_redis.go

@ -3,6 +3,7 @@ package dtmcli
import ( import (
"fmt" "fmt"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
@ -43,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
if err == redis.Nil { if err == redis.Nil {
err = nil err = nil
} }
if err == nil && bb.Op == opMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate if err == nil && bb.Op == dtmimp.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
return ErrDuplicated return ErrDuplicated
} }
if err == nil && v == ResultFailure { if err == nil && v == ResultFailure {
@ -54,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
// RedisQueryPrepared query prepared for redis // RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01") bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01)
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1]) local v = redis.call('GET', KEYS[1])
if v == false then if v == false then

6
dtmcli/dtmimp/consts.go

@ -26,4 +26,10 @@ const (
// JrpcCodeOngoing const for json-rpc ongoing // JrpcCodeOngoing const for json-rpc ongoing
JrpcCodeOngoing = -32902 JrpcCodeOngoing = -32902
BranchId00 = "00"
BarrierID01 = "01"
BarrierOpMsg = "msg"
) )

2
dtmcli/msg.go

@ -61,7 +61,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu
// if busiCall return ErrFailure, then abort is called directly // if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error { func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared
if err == nil { if err == nil {
err = s.Prepare(queryPrepared) err = s.Prepare(queryPrepared)
} }

2
dtmgrpc/msg.go

@ -63,7 +63,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl
// if busiCall return ErrFailure, then abort is called directly // if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared
if err == nil { if err == nil {
err = s.Prepare(queryPrepared) err = s.Prepare(queryPrepared)
} }

6
test/msg_barrier_mongo_test.go

@ -50,9 +50,9 @@ func TestMsgMongoDoBusiLater(t *testing.T) {
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
"trans_type": "msg", "trans_type": "msg",
"gid": gid, "gid": gid,
"branch_id": "00", "branch_id": dtmimp.BranchId00,
"op": "msg", "op": dtmimp.BarrierOpMsg,
"barrier_id": "01", "barrier_id": dtmimp.BarrierID01,
}). }).
SetBody(req).Get(Busi + "/MongoQueryPrepared") SetBody(req).Get(Busi + "/MongoQueryPrepared")
assert.Nil(t, err) assert.Nil(t, err)

6
test/msg_barrier_redis_test.go

@ -47,9 +47,9 @@ func TestMsgRedisDoBusiLater(t *testing.T) {
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
"trans_type": "msg", "trans_type": "msg",
"gid": gid, "gid": gid,
"branch_id": "00", "branch_id": dtmimp.BranchId00,
"op": "msg", "op": dtmimp.BarrierOpMsg,
"barrier_id": "01", "barrier_id": dtmimp.BarrierID01,
}). }).
SetBody(req).Get(Busi + "/RedisQueryPrepared") SetBody(req).Get(Busi + "/RedisQueryPrepared")
assert.Nil(t, err) assert.Nil(t, err)

6
test/msg_barrier_test.go

@ -51,9 +51,9 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) {
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
"trans_type": "msg", "trans_type": "msg",
"gid": gid, "gid": gid,
"branch_id": "00", "branch_id": dtmimp.BranchId00,
"op": "msg", "op": dtmimp.BarrierOpMsg,
"barrier_id": "01", "barrier_id": dtmimp.BarrierID01,
}). }).
SetBody(req).Get(Busi + "/QueryPreparedB") SetBody(req).Get(Busi + "/QueryPreparedB")
assert.Nil(t, err) assert.Nil(t, err)

Loading…
Cancel
Save