diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 0c37d21..492296d 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -82,7 +82,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } @@ -111,11 +111,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error // QueryPrepared queries prepared data func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { - _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback") + _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, "rollback") var reason string if err == nil { sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) - err = db.QueryRow(sql, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01).Scan(&reason) + err = db.QueryRow(sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1).Scan(&reason) } if reason == "rollback" { return ErrFailure diff --git a/dtmcli/barrier_mongo.go b/dtmcli/barrier_mongo.go index e886e13..5604eeb 100644 --- a/dtmcli/barrier_mongo.go +++ b/dtmcli/barrier_mongo.go @@ -33,7 +33,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } @@ -54,16 +54,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session // MongoQueryPrepared query prepared for redis // experimental func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error { - _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback") + _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, "rollback") var result bson.M if err == nil { fs := strings.Split(dtmimp.BarrierTableName, ".") barrier := mc.Database(fs[0]).Collection(fs[1]) err = barrier.FindOne(context.Background(), bson.D{ {Key: "gid", Value: bb.Gid}, - {Key: "branch_id", Value: dtmimp.BranchId00}, - {Key: "op", Value: dtmimp.BarrierOpMsg}, - {Key: "barrier_id", Value: dtmimp.BarrierID01}, + {Key: "branch_id", Value: dtmimp.MsgDoBranch0}, + {Key: "op", Value: dtmimp.MsgDoOp}, + {Key: "barrier_id", Value: dtmimp.MsgDoBarrier1}, }).Decode(&result) } var reason string diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go index 1df79dd..1ff1591 100644 --- a/dtmcli/barrier_redis.go +++ b/dtmcli/barrier_redis.go @@ -44,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) if err == redis.Nil { err = nil } - if err == nil && bb.Op == dtmimp.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate + if err == nil && bb.Op == dtmimp.MsgDoOp && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate return ErrDuplicated } if err == nil && v == ResultFailure { @@ -55,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) // RedisQueryPrepared query prepared for redis func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { - bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01) + bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1) v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared local v = redis.call('GET', KEYS[1]) if v == false then diff --git a/dtmcli/dtmimp/consts.go b/dtmcli/dtmimp/consts.go index cd68361..f17bccb 100644 --- a/dtmcli/dtmimp/consts.go +++ b/dtmcli/dtmimp/consts.go @@ -27,9 +27,10 @@ const ( // JrpcCodeOngoing const for json-rpc ongoing JrpcCodeOngoing = -32902 - BranchId00 = "00" - - BarrierID01 = "01" - - BarrierOpMsg = "msg" + // MsgDoBranch0 const for DoAndSubmit barrier branch + MsgDoBranch0 = "00" + // MsgDoBarrier1 const for DoAndSubmit barrier barrierID + MsgDoBarrier1 = "01" + // MsgDoOp const for DoAndSubmit barrier op + MsgDoOp = "msg" ) diff --git a/dtmcli/msg.go b/dtmcli/msg.go index 076c6de..e0e828e 100644 --- a/dtmcli/msg.go +++ b/dtmcli/msg.go @@ -61,7 +61,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu // if busiCall return ErrFailure, then abort is called directly // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error { - bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared + bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp) // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go index 3cfc323..82b504d 100644 --- a/dtmgrpc/msg.go +++ b/dtmgrpc/msg.go @@ -63,7 +63,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl // if busiCall return ErrFailure, then abort is called directly // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { - bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared + bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp) // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index b4fd018..614226b 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -50,9 +50,9 @@ func TestMsgMongoDoBusiLater(t *testing.T) { SetQueryParams(map[string]string{ "trans_type": "msg", "gid": gid, - "branch_id": dtmimp.BranchId00, - "op": dtmimp.BarrierOpMsg, - "barrier_id": dtmimp.BarrierID01, + "branch_id": dtmimp.MsgDoBranch0, + "op": dtmimp.MsgDoOp, + "barrier_id": dtmimp.MsgDoBarrier1, }). SetBody(req).Get(Busi + "/MongoQueryPrepared") assert.Nil(t, err) diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go index 3e07233..9d3ff7e 100644 --- a/test/msg_barrier_redis_test.go +++ b/test/msg_barrier_redis_test.go @@ -47,9 +47,9 @@ func TestMsgRedisDoBusiLater(t *testing.T) { SetQueryParams(map[string]string{ "trans_type": "msg", "gid": gid, - "branch_id": dtmimp.BranchId00, - "op": dtmimp.BarrierOpMsg, - "barrier_id": dtmimp.BarrierID01, + "branch_id": dtmimp.MsgDoBranch0, + "op": dtmimp.MsgDoOp, + "barrier_id": dtmimp.MsgDoBarrier1, }). SetBody(req).Get(Busi + "/RedisQueryPrepared") assert.Nil(t, err) diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go index 1acdf7c..eba5db4 100644 --- a/test/msg_barrier_test.go +++ b/test/msg_barrier_test.go @@ -51,9 +51,9 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) { SetQueryParams(map[string]string{ "trans_type": "msg", "gid": gid, - "branch_id": dtmimp.BranchId00, - "op": dtmimp.BarrierOpMsg, - "barrier_id": dtmimp.BarrierID01, + "branch_id": dtmimp.MsgDoBranch0, + "op": dtmimp.MsgDoOp, + "barrier_id": dtmimp.MsgDoBarrier1, }). SetBody(req).Get(Busi + "/QueryPreparedB") assert.Nil(t, err)