Browse Source

fix

pull/239/head
stulzq 4 years ago
parent
commit
548343e5d7
  1. 7
      dtmcli/barrier.go
  2. 11
      dtmcli/barrier_mongo.go
  3. 6
      dtmcli/barrier_redis.go
  4. 6
      dtmcli/dtmimp/consts.go
  5. 3
      dtmcli/msg.go
  6. 3
      dtmgrpc/msg.go
  7. 7
      dtmutil/consts.go
  8. 9
      test/msg_barrier_mongo_test.go
  9. 9
      test/msg_barrier_redis_test.go
  10. 9
      test/msg_barrier_test.go

7
dtmcli/barrier.go

@ -13,7 +13,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
) )
// BarrierBusiFunc type for busi func // BarrierBusiFunc type for busi func
@ -83,7 +82,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) currentAffected, rerr := insertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated return ErrDuplicated
} }
@ -112,11 +111,11 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
// QueryPrepared queries prepared data // QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback") _, err := insertBarrier(db, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback")
var reason string var reason string
if err == nil { if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
err = db.QueryRow(sql, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01).Scan(&reason) err = db.QueryRow(sql, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01).Scan(&reason)
} }
if reason == "rollback" { if reason == "rollback" {
return ErrFailure return ErrFailure

11
dtmcli/barrier_mongo.go

@ -6,7 +6,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
) )
@ -34,7 +33,7 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) currentAffected, rerr := mongoInsertBarrier(sc, mc, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if rerr == nil && bb.Op == dtmutil.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. if rerr == nil && bb.Op == dtmimp.BarrierOpMsg && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated return ErrDuplicated
} }
@ -55,16 +54,16 @@ func (bb *BranchBarrier) MongoCall(mc *mongo.Client, busiCall func(mongo.Session
// MongoQueryPrepared query prepared for redis // MongoQueryPrepared query prepared for redis
// experimental // experimental
func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error { func (bb *BranchBarrier) MongoQueryPrepared(mc *mongo.Client) error {
_, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01, "rollback") _, err := mongoInsertBarrier(context.Background(), mc, bb.TransType, bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01, "rollback")
var result bson.M var result bson.M
if err == nil { if err == nil {
fs := strings.Split(dtmimp.BarrierTableName, ".") fs := strings.Split(dtmimp.BarrierTableName, ".")
barrier := mc.Database(fs[0]).Collection(fs[1]) barrier := mc.Database(fs[0]).Collection(fs[1])
err = barrier.FindOne(context.Background(), bson.D{ err = barrier.FindOne(context.Background(), bson.D{
{Key: "gid", Value: bb.Gid}, {Key: "gid", Value: bb.Gid},
{Key: "branch_id", Value: dtmutil.BranchId00}, {Key: "branch_id", Value: dtmimp.BranchId00},
{Key: "op", Value: dtmutil.BarrierOpMsg}, {Key: "op", Value: dtmimp.BarrierOpMsg},
{Key: "barrier_id", Value: dtmutil.BranchId01}, {Key: "barrier_id", Value: dtmimp.BarrierID01},
}).Decode(&result) }).Decode(&result)
} }
var reason string var reason string

6
dtmcli/barrier_redis.go

@ -3,8 +3,8 @@ package dtmcli
import ( import (
"fmt" "fmt"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
@ -44,7 +44,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
if err == redis.Nil { if err == redis.Nil {
err = nil err = nil
} }
if err == nil && bb.Op == dtmutil.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate if err == nil && bb.Op == dtmimp.BarrierOpMsg && v == "DUPLICATE" { // msg DoAndSubmit should be rejected when duplicate
return ErrDuplicated return ErrDuplicated
} }
if err == nil && v == ResultFailure { if err == nil && v == ResultFailure {
@ -55,7 +55,7 @@ redis.call('INCRBY', KEYS[1], ARGV[1])
// RedisQueryPrepared query prepared for redis // RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg, dtmutil.BranchId01) bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg, dtmimp.BarrierID01)
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1]) local v = redis.call('GET', KEYS[1])
if v == false then if v == false then

6
dtmcli/dtmimp/consts.go

@ -26,4 +26,10 @@ const (
// JrpcCodeOngoing const for json-rpc ongoing // JrpcCodeOngoing const for json-rpc ongoing
JrpcCodeOngoing = -32902 JrpcCodeOngoing = -32902
BranchId00 = "00"
BarrierID01 = "01"
BarrierOpMsg = "msg"
) )

3
dtmcli/msg.go

@ -11,7 +11,6 @@ import (
"errors" "errors"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
) )
// Msg reliable msg type // Msg reliable msg type
@ -62,7 +61,7 @@ func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBu
// if busiCall return ErrFailure, then abort is called directly // if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error { func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error {
bb, err := BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared bb, err := BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared
if err == nil { if err == nil {
err = s.Prepare(queryPrepared) err = s.Prepare(queryPrepared)
} }

3
dtmgrpc/msg.go

@ -13,7 +13,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmutil"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -64,7 +63,7 @@ func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcl
// if busiCall return ErrFailure, then abort is called directly // if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result // if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmutil.BranchId00, dtmutil.BarrierOpMsg) // a special barrier for msg QueryPrepared bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, dtmimp.BranchId00, dtmimp.BarrierOpMsg) // a special barrier for msg QueryPrepared
if err == nil { if err == nil {
err = s.Prepare(queryPrepared) err = s.Prepare(queryPrepared)
} }

7
dtmutil/consts.go

@ -14,10 +14,3 @@ const (
// DefaultGrpcServer default url for grpc server. used by test and examples // DefaultGrpcServer default url for grpc server. used by test and examples
DefaultGrpcServer = "localhost:36790" DefaultGrpcServer = "localhost:36790"
) )
// barrier
const (
BranchId00 = "00"
BranchId01 = "01"
BarrierOpMsg = "msg"
)

9
test/msg_barrier_mongo_test.go

@ -6,7 +6,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi" "github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
@ -49,11 +48,11 @@ func TestMsgMongoDoBusiLater(t *testing.T) {
req := busi.GenTransReq(30, false, false) req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R(). _, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
"trans_type": dtmutil.BarrierOpMsg, "trans_type": "msg",
"gid": gid, "gid": gid,
"branch_id": dtmutil.BranchId00, "branch_id": dtmimp.BranchId00,
"op": dtmutil.BarrierOpMsg, "op": dtmimp.BarrierOpMsg,
"barrier_id": dtmutil.BranchId01, "barrier_id": dtmimp.BarrierID01,
}). }).
SetBody(req).Get(Busi + "/MongoQueryPrepared") SetBody(req).Get(Busi + "/MongoQueryPrepared")
assert.Nil(t, err) assert.Nil(t, err)

9
test/msg_barrier_redis_test.go

@ -6,7 +6,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi" "github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -46,11 +45,11 @@ func TestMsgRedisDoBusiLater(t *testing.T) {
req := busi.GenTransReq(30, false, false) req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R(). _, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
"trans_type": dtmutil.BarrierOpMsg, "trans_type": "msg",
"gid": gid, "gid": gid,
"branch_id": dtmutil.BranchId00, "branch_id": dtmimp.BranchId00,
"op": dtmutil.BarrierOpMsg, "op": dtmimp.BarrierOpMsg,
"barrier_id": dtmutil.BranchId01, "barrier_id": dtmimp.BarrierID01,
}). }).
SetBody(req).Get(Busi + "/RedisQueryPrepared") SetBody(req).Get(Busi + "/RedisQueryPrepared")
assert.Nil(t, err) assert.Nil(t, err)

9
test/msg_barrier_test.go

@ -10,7 +10,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi" "github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -50,11 +49,11 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) {
req := busi.GenTransReq(30, false, false) req := busi.GenTransReq(30, false, false)
_, err := dtmcli.GetRestyClient().R(). _, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
"trans_type": dtmutil.BarrierOpMsg, "trans_type": "msg",
"gid": gid, "gid": gid,
"branch_id": dtmutil.BranchId00, "branch_id": dtmimp.BranchId00,
"op": dtmutil.BarrierOpMsg, "op": dtmimp.BarrierOpMsg,
"barrier_id": dtmutil.BranchId01, "barrier_id": dtmimp.BarrierID01,
}). }).
SetBody(req).Get(Busi + "/QueryPreparedB") SetBody(req).Get(Busi + "/QueryPreparedB")
assert.Nil(t, err) assert.Nil(t, err)

Loading…
Cancel
Save