From 548343e5d7f31a2c12e97ea019249a12ebbcfb61 Mon Sep 17 00:00:00 2001 From: stulzq Date: Wed, 2 Mar 2022 16:43:51 +0800 Subject: [PATCH] fix --- dtmcli/barrier.go | 7 +++---- dtmcli/barrier_mongo.go | 11 +++++------ dtmcli/barrier_redis.go | 6 +++--- dtmcli/dtmimp/consts.go | 6 ++++++ dtmcli/msg.go | 3 +-- dtmgrpc/msg.go | 3 +-- dtmutil/consts.go | 7 ------- test/msg_barrier_mongo_test.go | 9 ++++----- test/msg_barrier_redis_test.go | 9 ++++----- test/msg_barrier_test.go | 9 ++++----- 10 files changed, 31 insertions(+), 39 deletions(-) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 7292966..0c37d21 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -13,7 +13,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmutil" ) // BarrierBusiFunc type for busi func @@ -83,7 +82,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } @@ -112,11 +111,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error // QueryPrepared queries prepared data func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { - _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback") + _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback") var reason string if err == nil { sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) - err = db.QueryRow(sql, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01).Scan(&reason) + err = db.QueryRow(sql, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01).Scan(&reason) } if reason == "rollback" { return ErrFailure diff --git a/dtmcli/barrier_mongo.go b/dtmcli/barrier_mongo.go index 3cd28ff..e886e13 100644 --- a/dtmcli/barrier_mongo.go +++ b/dtmcli/barrier_mongo.go @@ -6,7 +6,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmutil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) @@ -34,7 +33,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. + if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. return ErrDuplicated } @@ -55,16 +54,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session // MongoQueryPrepared query prepared for redis // experimental func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error { - _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback") + _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback") var result bson.M if err == nil { fs := strings.Split(dtmimp.BarrierTableName, ".") barrier := mc.Database(fs[0]).Collection(fs[1]) err = barrier.FindOne(context.Background(), bson.D{ {Key: "gid", Value: bb.Gid}, - {Key: "branch_id", Value: dtmutil.BranchId00}, - {Key: "op", Value: dtmutil.BarrierOpMsg}, - {Key: "barrier_id", Value: dtmutil.BranchId01}, + {Key: "branch_id", Value: dtmimp.BranchId00}, + {Key: "op", Value: dtmimp.BarrierOpMsg}, + {Key: "barrier_id", Value: dtmimp.BarrierID01}, }).Decode(&result) } var reason string diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go index 9bf20a4..1df79dd 100644 --- a/dtmcli/barrier_redis.go +++ b/dtmcli/barrier_redis.go @@ -3,8 +3,8 @@ package dtmcli import ( "fmt" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmutil" "github.com/go-redis/redis/v8" ) @@ -44,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) if err == redis.Nil { err = nil } - if err == nil && bb.Op == dtmutil.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate + if err == nil && bb.Op == dtmimp.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate return ErrDuplicated } if err == nil && v == ResultFailure { @@ -55,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1]) // RedisQueryPrepared query prepared for redis func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { - bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01) + bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01) v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared local v = redis.call('GET', KEYS[1]) if v == false then diff --git a/dtmcli/dtmimp/consts.go b/dtmcli/dtmimp/consts.go index d029552..cd68361 100644 --- a/dtmcli/dtmimp/consts.go +++ b/dtmcli/dtmimp/consts.go @@ -26,4 +26,10 @@ const ( // JrpcCodeOngoing const for json-rpc ongoing JrpcCodeOngoing = -32902 + + BranchId00 = "00" + + BarrierID01 = "01" + + BarrierOpMsg = "msg" ) diff --git a/dtmcli/msg.go b/dtmcli/msg.go index 492906e..076c6de 100644 --- a/dtmcli/msg.go +++ b/dtmcli/msg.go @@ -11,7 +11,6 @@ import ( "errors" "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmutil" ) // Msg reliable msg type @@ -62,7 +61,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu // if busiCall return ErrFailure, then abort is called directly // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error { - bb, err := BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared + bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go index f9dd387..3cfc323 100644 --- a/dtmgrpc/msg.go +++ b/dtmgrpc/msg.go @@ -13,7 +13,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" - "github.com/dtm-labs/dtm/dtmutil" "google.golang.org/protobuf/proto" ) @@ -64,7 +63,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl // if busiCall return ErrFailure, then abort is called directly // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { - bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared + bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } diff --git a/dtmutil/consts.go b/dtmutil/consts.go index ae86360..d8c4345 100644 --- a/dtmutil/consts.go +++ b/dtmutil/consts.go @@ -14,10 +14,3 @@ const ( // DefaultGrpcServer default url for grpc server. used by test and examples DefaultGrpcServer = "localhost:36790" ) - -// barrier -const ( - BranchId00 = "00" - BranchId01 = "01" - BarrierOpMsg = "msg" -) diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index d7b9d69..b4fd018 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -6,7 +6,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" "go.mongodb.org/mongo-driver/mongo" @@ -49,11 +48,11 @@ func TestMsgMongoDoBusiLater(t *testing.T) { req := busi.GenTransReq(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ - "trans_type": dtmutil.BarrierOpMsg, + "trans_type": "msg", "gid": gid, - "branch_id": dtmutil.BranchId00, - "op": dtmutil.BarrierOpMsg, - "barrier_id": dtmutil.BranchId01, + "branch_id": dtmimp.BranchId00, + "op": dtmimp.BarrierOpMsg, + "barrier_id": dtmimp.BarrierID01, }). SetBody(req).Get(Busi + "/MongoQueryPrepared") assert.Nil(t, err) diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go index 3f6b787..3e07233 100644 --- a/test/msg_barrier_redis_test.go +++ b/test/msg_barrier_redis_test.go @@ -6,7 +6,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" ) @@ -46,11 +45,11 @@ func TestMsgRedisDoBusiLater(t *testing.T) { req := busi.GenTransReq(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ - "trans_type": dtmutil.BarrierOpMsg, + "trans_type": "msg", "gid": gid, - "branch_id": dtmutil.BranchId00, - "op": dtmutil.BarrierOpMsg, - "barrier_id": dtmutil.BranchId01, + "branch_id": dtmimp.BranchId00, + "op": dtmimp.BarrierOpMsg, + "barrier_id": dtmimp.BarrierID01, }). SetBody(req).Get(Busi + "/RedisQueryPrepared") assert.Nil(t, err) diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go index a6920d7..1acdf7c 100644 --- a/test/msg_barrier_test.go +++ b/test/msg_barrier_test.go @@ -10,7 +10,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" ) @@ -50,11 +49,11 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) { req := busi.GenTransReq(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ - "trans_type": dtmutil.BarrierOpMsg, + "trans_type": "msg", "gid": gid, - "branch_id": dtmutil.BranchId00, - "op": dtmutil.BarrierOpMsg, - "barrier_id": dtmutil.BranchId01, + "branch_id": dtmimp.BranchId00, + "op": dtmimp.BarrierOpMsg, + "barrier_id": dtmimp.BarrierID01, }). SetBody(req).Get(Busi + "/QueryPreparedB") assert.Nil(t, err)