Browse Source

rename const

pull/251/head
yedf2 4 years ago
parent
commit
1002372fdd
  1. 6
      dtmcli/barrier.go
  2. 10
      dtmcli/barrier_mongo.go
  3. 4
      dtmcli/barrier_redis.go
  4. 11
      dtmcli/dtmimp/consts.go
  5. 2
      dtmcli/msg.go
  6. 2
      dtmgrpc/msg.go
  7. 6
      test/msg_barrier_mongo_test.go
  8. 6
      test/msg_barrier_redis_test.go
  9. 6
      test/msg_barrier_test.go

6
dtmcli/barrier.go

@ -82,7 +82,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
@ -111,11 +111,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
// QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback")
_, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, "rollback")
var reason string
if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
err = db.QueryRow(sql, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01).Scan(&reason)
err = db.QueryRow(sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1).Scan(&reason)
}
if reason == "rollback" {
return ErrFailure

10
dtmcli/barrier_mongo.go

@ -33,7 +33,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
@ -54,16 +54,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
// MongoQueryPrepared query prepared for redis
// experimental
func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error {
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback")
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, "rollback")
var result bson.M
if err == nil {
fs := strings.Split(dtmimp.BarrierTableName, ".")
barrier := mc.Database(fs[0]).Collection(fs[1])
err = barrier.FindOne(context.Background(), bson.D{
{Key: "gid", Value: bb.Gid},
{Key: "branch_id", Value: dtmimp.BranchId00},
{Key: "op", Value: dtmimp.BarrierOpMsg},
{Key: "barrier_id", Value: dtmimp.BarrierID01},
{Key: "branch_id", Value: dtmimp.MsgDoBranch0},
{Key: "op", Value: dtmimp.MsgDoOp},
{Key: "barrier_id", Value: dtmimp.MsgDoBarrier1},
}).Decode(&result)
}
var reason string

4
dtmcli/barrier_redis.go

@ -44,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
if err == redis.Nil {
err = nil
}
if err == nil && bb.Op == dtmimp.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
if err == nil && bb.Op == dtmimp.MsgDoOp && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
return ErrDuplicated
}
if err == nil && v == ResultFailure {
@ -55,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
// RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01)
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1)
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1])
if v == false then

11
dtmcli/dtmimp/consts.go

@ -27,9 +27,10 @@ const (
// JrpcCodeOngoing const for json-rpc ongoing
JrpcCodeOngoing = -32902
BranchId00 = "00"
BarrierID01 = "01"
BarrierOpMsg = "msg"
// MsgDoBranch0 const for DoAndSubmit barrier branch
MsgDoBranch0 = "00"
// MsgDoBarrier1 const for DoAndSubmit barrier barrierID
MsgDoBarrier1 = "01"
// MsgDoOp const for DoAndSubmit barrier op
MsgDoOp = "msg"
)

2
dtmcli/msg.go

@ -61,7 +61,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error {
bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared
bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp) // a special barrier for msg QueryPrepared
if err == nil {
err = s.Prepare(queryPrepared)
}

2
dtmgrpc/msg.go

@ -63,7 +63,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp) // a special barrier for msg QueryPrepared
if err == nil {
err = s.Prepare(queryPrepared)
}

6
test/msg_barrier_mongo_test.go

@ -50,9 +50,9 @@ func TestMsgMongoDoBusiLater(t *testing.T) {
SetQueryParams(map[string]string{
"trans_type": "msg",
"gid": gid,
"branch_id": dtmimp.BranchId00,
"op": dtmimp.BarrierOpMsg,
"barrier_id": dtmimp.BarrierID01,
"branch_id": dtmimp.MsgDoBranch0,
"op": dtmimp.MsgDoOp,
"barrier_id": dtmimp.MsgDoBarrier1,
}).
SetBody(req).Get(Busi + "/MongoQueryPrepared")
assert.Nil(t, err)

6
test/msg_barrier_redis_test.go

@ -47,9 +47,9 @@ func TestMsgRedisDoBusiLater(t *testing.T) {
SetQueryParams(map[string]string{
"trans_type": "msg",
"gid": gid,
"branch_id": dtmimp.BranchId00,
"op": dtmimp.BarrierOpMsg,
"barrier_id": dtmimp.BarrierID01,
"branch_id": dtmimp.MsgDoBranch0,
"op": dtmimp.MsgDoOp,
"barrier_id": dtmimp.MsgDoBarrier1,
}).
SetBody(req).Get(Busi + "/RedisQueryPrepared")
assert.Nil(t, err)

6
test/msg_barrier_test.go

@ -51,9 +51,9 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) {
SetQueryParams(map[string]string{
"trans_type": "msg",
"gid": gid,
"branch_id": dtmimp.BranchId00,
"op": dtmimp.BarrierOpMsg,
"barrier_id": dtmimp.BarrierID01,
"branch_id": dtmimp.MsgDoBranch0,
"op": dtmimp.MsgDoOp,
"barrier_id": dtmimp.MsgDoBarrier1,
}).
SetBody(req).Get(Busi + "/QueryPreparedB")
assert.Nil(t, err)

Loading…
Cancel
Save