diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 558f9af..7292966 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -13,6 +13,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmutil" ) // BarrierBusiFunc type for busi func @@ -27,8 +28,6 @@ type BranchBarrier struct { BarrierID int } -const opMsg = "msg" - func (bb *BranchBarrier) String() string { return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op) } @@ -84,7 +83,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } @@ -113,11 +112,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error // QueryPrepared queries prepared data func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { - _, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback") + _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback") var reason string if err == nil { sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) - err = db.QueryRow(sql, bb.Gid, "00", "msg", "01").Scan(&reason) + err = db.QueryRow(sql, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01).Scan(&reason) } if reason == "rollback" { return ErrFailure diff --git a/dtmcli/barrier_mongo.go b/dtmcli/barrier_mongo.go index 98c0f67..3cd28ff 100644 --- a/dtmcli/barrier_mongo.go +++ b/dtmcli/barrier_mongo.go @@ -6,6 +6,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmutil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) @@ -33,7 +34,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } @@ -54,16 +55,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session // MongoQueryPrepared query prepared for redis // experimental func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error { - _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, "00", "msg", "01", "rollback") + _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback") var result bson.M if err == nil { fs := strings.Split(dtmimp.BarrierTableName, ".") barrier := mc.Database(fs[0]).Collection(fs[1]) err = barrier.FindOne(context.Background(), bson.D{ {Key: "gid", Value: bb.Gid}, - {Key: "branch_id", Value: "00"}, - {Key: "op", Value: "msg"}, - {Key: "barrier_id", Value: "01"}, + {Key: "branch_id", Value: dtmutil.BranchId00}, + {Key: "op", Value: dtmutil.BarrierOpMsg}, + {Key: "barrier_id", Value: dtmutil.BranchId01}, }).Decode(&result) } var reason string diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go index 6fcc0ab..9bf20a4 100644 --- a/dtmcli/barrier_redis.go +++ b/dtmcli/barrier_redis.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmutil" "github.com/go-redis/redis/v8" ) @@ -43,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) if err == redis.Nil { err = nil } - if err == nil && bb.Op == opMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate + if err == nil && bb.Op == dtmutil.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate return ErrDuplicated } if err == nil && v == ResultFailure { @@ -54,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) // RedisQueryPrepared query prepared for redis func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { - bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01") + bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01) v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared local v = redis.call('GET', KEYS[1]) if v == false then diff --git a/dtmcli/msg.go b/dtmcli/msg.go index 0bc6dc6..7a511b3 100644 --- a/dtmcli/msg.go +++ b/dtmcli/msg.go @@ -11,6 +11,7 @@ import ( "errors" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmutil" ) // Msg reliable msg type @@ -21,7 +22,7 @@ type Msg struct { // NewMsg create new msg func NewMsg(server string, gid string) *Msg { - return &Msg{TransBase: *dtmimp.NewTransBase(gid, "msg", server, "")} + return &Msg{TransBase: *dtmimp.NewTransBase(gid, dtmutil.BarrierOpMsg, server, "")} } // Add add a new step @@ -61,7 +62,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu // if busiCall return ErrFailure, then abort is called directly // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error { - bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared + bb, err := BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go index be5b304..f9dd387 100644 --- a/dtmgrpc/msg.go +++ b/dtmgrpc/msg.go @@ -13,6 +13,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmutil" "google.golang.org/protobuf/proto" ) @@ -63,7 +64,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl // if busiCall return ErrFailure, then abort is called directly // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { - bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared + bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } diff --git a/dtmutil/consts.go b/dtmutil/consts.go index d8c4345..ae86360 100644 --- a/dtmutil/consts.go +++ b/dtmutil/consts.go @@ -14,3 +14,10 @@ const ( // DefaultGrpcServer default url for grpc server. used by test and examples DefaultGrpcServer = "localhost:36790" ) + +// barrier +const ( + BranchId00 = "00" + BranchId01 = "01" + BarrierOpMsg = "msg" +) diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index 6d8dbfc..d7b9d69 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -6,6 +6,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" "go.mongodb.org/mongo-driver/mongo" @@ -48,11 +49,11 @@ func TestMsgMongoDoBusiLater(t *testing.T) { req := busi.GenTransReq(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ - "trans_type": "msg", + "trans_type": dtmutil.BarrierOpMsg, "gid": gid, - "branch_id": "00", - "op": "msg", - "barrier_id": "01", + "branch_id": dtmutil.BranchId00, + "op": dtmutil.BarrierOpMsg, + "barrier_id": dtmutil.BranchId01, }). SetBody(req).Get(Busi + "/MongoQueryPrepared") assert.Nil(t, err) diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go index 06b099f..3f6b787 100644 --- a/test/msg_barrier_redis_test.go +++ b/test/msg_barrier_redis_test.go @@ -6,6 +6,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" ) @@ -45,11 +46,11 @@ func TestMsgRedisDoBusiLater(t *testing.T) { req := busi.GenTransReq(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ - "trans_type": "msg", + "trans_type": dtmutil.BarrierOpMsg, "gid": gid, - "branch_id": "00", - "op": "msg", - "barrier_id": "01", + "branch_id": dtmutil.BranchId00, + "op": dtmutil.BarrierOpMsg, + "barrier_id": dtmutil.BranchId01, }). SetBody(req).Get(Busi + "/RedisQueryPrepared") assert.Nil(t, err) diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go index 828914a..a6920d7 100644 --- a/test/msg_barrier_test.go +++ b/test/msg_barrier_test.go @@ -10,6 +10,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" ) @@ -49,11 +50,11 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) { req := busi.GenTransReq(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ - "trans_type": "msg", + "trans_type": dtmutil.BarrierOpMsg, "gid": gid, - "branch_id": "00", - "op": "msg", - "barrier_id": "01", + "branch_id": dtmutil.BranchId00, + "op": dtmutil.BarrierOpMsg, + "barrier_id": dtmutil.BranchId01, }). SetBody(req).Get(Busi + "/QueryPreparedB") assert.Nil(t, err)