Browse Source

def const and replace

pull/239/head
stulzq 4 years ago
parent
commit
a18a75e011
  1. 9
      dtmcli/barrier.go
  2. 11
      dtmcli/barrier_mongo.go
  3. 5
      dtmcli/barrier_redis.go
  4. 5
      dtmcli/msg.go
  5. 3
      dtmgrpc/msg.go
  6. 7
      dtmutil/consts.go
  7. 9
      test/msg_barrier_mongo_test.go
  8. 9
      test/msg_barrier_redis_test.go
  9. 9
      test/msg_barrier_test.go

9
dtmcli/barrier.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
)
// BarrierBusiFunc type for busi func
@ -27,8 +28,6 @@ type BranchBarrier struct {
BarrierID int
}
const opMsg = "msg"
func (bb *BranchBarrier) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op)
}
@ -84,7 +83,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
@ -113,11 +112,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
// QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback")
_, err := insertBarrier(db, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback")
var reason string
if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
err = db.QueryRow(sql, bb.Gid, "00", "msg", "01").Scan(&reason)
err = db.QueryRow(sql, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01).Scan(&reason)
}
if reason == "rollback" {
return ErrFailure

11
dtmcli/barrier_mongo.go

@ -6,6 +6,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
@ -33,7 +34,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == opMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
@ -54,16 +55,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
// MongoQueryPrepared query prepared for redis
// experimental
func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error {
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, "00", "msg", "01", "rollback")
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback")
var result bson.M
if err == nil {
fs := strings.Split(dtmimp.BarrierTableName, ".")
barrier := mc.Database(fs[0]).Collection(fs[1])
err = barrier.FindOne(context.Background(), bson.D{
{Key: "gid", Value: bb.Gid},
{Key: "branch_id", Value: "00"},
{Key: "op", Value: "msg"},
{Key: "barrier_id", Value: "01"},
{Key: "branch_id", Value: dtmutil.BranchId00},
{Key: "op", Value: dtmutil.BarrierOpMsg},
{Key: "barrier_id", Value: dtmutil.BranchId01},
}).Decode(&result)
}
var reason string

5
dtmcli/barrier_redis.go

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/go-redis/redis/v8"
)
@ -43,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
if err == redis.Nil {
err = nil
}
if err == nil && bb.Op == opMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
if err == nil && bb.Op == dtmutil.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
return ErrDuplicated
}
if err == nil && v == ResultFailure {
@ -54,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
// RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01")
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01)
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1])
if v == false then

5
dtmcli/msg.go

@ -11,6 +11,7 @@ import (
"errors"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
)
// Msg reliable msg type
@ -21,7 +22,7 @@ type Msg struct {
// NewMsg create new msg
func NewMsg(server string, gid string) *Msg {
return &Msg{TransBase: *dtmimp.NewTransBase(gid, "msg", server, "")}
return &Msg{TransBase: *dtmimp.NewTransBase(gid, dtmutil.BarrierOpMsg, server, "")}
}
// Add add a new step
@ -61,7 +62,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
bb, err := BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared
if err == nil {
err = s.Prepare(queryPrepared)
}

3
dtmgrpc/msg.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmutil"
"google.golang.org/protobuf/proto"
)
@ -63,7 +64,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared
if err == nil {
err = s.Prepare(queryPrepared)
}

7
dtmutil/consts.go

@ -14,3 +14,10 @@ const (
// DefaultGrpcServer default url for grpc server. used by test and examples
DefaultGrpcServer = "localhost:36790"
)
// barrier
const (
BranchId00 = "00"
BranchId01 = "01"
BarrierOpMsg = "msg"
)

9
test/msg_barrier_mongo_test.go

@ -6,6 +6,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/mongo"
@ -48,11 +49,11 @@ func TestMsgMongoDoBusiLater(t *testing.T) {
req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
"trans_type": dtmutil.BarrierOpMsg,
"gid": gid,
"branch_id": "00",
"op": "msg",
"barrier_id": "01",
"branch_id": dtmutil.BranchId00,
"op": dtmutil.BarrierOpMsg,
"barrier_id": dtmutil.BranchId01,
}).
SetBody(req).Get(Busi + "/MongoQueryPrepared")
assert.Nil(t, err)

9
test/msg_barrier_redis_test.go

@ -6,6 +6,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
@ -45,11 +46,11 @@ func TestMsgRedisDoBusiLater(t *testing.T) {
req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
"trans_type": dtmutil.BarrierOpMsg,
"gid": gid,
"branch_id": "00",
"op": "msg",
"barrier_id": "01",
"branch_id": dtmutil.BranchId00,
"op": dtmutil.BarrierOpMsg,
"barrier_id": dtmutil.BranchId01,
}).
SetBody(req).Get(Busi + "/RedisQueryPrepared")
assert.Nil(t, err)

9
test/msg_barrier_test.go

@ -10,6 +10,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
@ -49,11 +50,11 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) {
req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
"trans_type": dtmutil.BarrierOpMsg,
"gid": gid,
"branch_id": "00",
"op": "msg",
"barrier_id": "01",
"branch_id": dtmutil.BranchId00,
"op": dtmutil.BarrierOpMsg,
"barrier_id": dtmutil.BranchId01,
}).
SetBody(req).Get(Busi + "/QueryPreparedB")
assert.Nil(t, err)

Loading…
Cancel
Save