Browse Source

Merge pull request #159 from dtm-labs/alpha

Add msg.PrepareAndSubmit
pull/162/head
yedf2 4 years ago
committed by GitHub
parent
commit
46ef4afe12
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dtmcli/barrier.go
  2. 23
      dtmcli/msg.go
  3. 5
      dtmgrpc/dtmgimp/grpc_clients.go
  4. 4
      dtmsvr/trans_type_msg.go
  5. 2
      dtmutil/utils.go
  6. 1
      go.mod
  7. 2
      go.sum
  8. 2
      helper/test-cover.sh
  9. 32
      test/busi/barrier.go
  10. 10
      test/busi/base_grpc.go
  11. 18
      test/busi/base_http.go
  12. 13
      test/busi/base_types.go
  13. 6
      test/busi/busi.go
  14. 178
      test/busi/busi.pb.go
  15. 3
      test/busi/busi.proto
  16. 108
      test/busi/busi_grpc.pb.go
  17. 10
      test/main_test.go
  18. 105
      test/msg_barrier_test.go
  19. 8
      test/msg_grpc_test.go
  20. 2
      test/msg_options_test.go
  21. 11
      test/msg_test.go
  22. 4
      test/store_test.go
  23. 6
      test/tcc_barrier_test.go
  24. 22
      test/types.go

15
dtmcli/barrier.go

@ -72,7 +72,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
} else if rerr != nil {
tx.Rollback()
} else {
tx.Commit()
rerr = tx.Commit()
}
}()
ti := bb
@ -100,3 +100,16 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
}
return bb.Call(tx, busiCall)
}
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback")
var reason string
if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
err = db.QueryRow(sql, bb.Gid, "00", "msg", "01").Scan(&reason)
}
if reason == "rollback" {
return ErrFailure
}
return err
}

23
dtmcli/msg.go

@ -6,7 +6,11 @@
package dtmcli
import "github.com/dtm-labs/dtm/dtmcli/dtmimp"
import (
"database/sql"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
)
// Msg reliable msg type
type Msg struct {
@ -35,3 +39,20 @@ func (s *Msg) Prepare(queryPrepared string) error {
func (s *Msg) Submit() error {
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = bb.CallWithDB(db, func(tx *sql.Tx) error {
err := busiCall(tx)
if err == nil {
err = s.Prepare(queryPrepared)
}
return err
})
}
if err == nil {
err = s.Submit()
}
return err
}

5
dtmgrpc/dtmgimp/grpc_clients.go

@ -43,11 +43,6 @@ func MustGetDtmClient(grpcServer string) dtmgpb.DtmClient {
return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, false))
}
// MustGetRawDtmClient must get raw codec grpc conn
func MustGetRawDtmClient(grpcServer string) dtmgpb.DtmClient {
return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, true))
}
// GetGrpcConn 1
func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr error) {
clients := &normalClients

4
dtmsvr/trans_type_msg.go

@ -42,7 +42,7 @@ func (t *TransGlobal) mayQueryPrepared() {
if !t.needProcess() || t.Status == dtmcli.StatusSubmitted {
return
}
body, err := t.getURLResult(t.QueryPrepared, "", "", nil)
body, err := t.getURLResult(t.QueryPrepared, "00", "msg", nil)
if strings.Contains(body, dtmcli.ResultSuccess) {
t.changeStatus(dtmcli.StatusSubmitted)
} else if strings.Contains(body, dtmcli.ResultFailure) {
@ -50,7 +50,7 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if strings.Contains(body, dtmcli.ResultOngoing) {
t.touchCronTime(cronReset)
} else {
logger.Errorf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error())
logger.Errorf("getting result failed for %s. error: %v body %s", t.QueryPrepared, err, body)
t.touchCronTime(cronBackoff)
}
}

2
dtmutil/utils.go

@ -38,7 +38,7 @@ func GetGinApp() *gin.Engine {
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb))
}
}
logger.Debugf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
logger.Debugf("begin %s %s body: %s", c.Request.Method, c.Request.URL, body)
c.Next()
})
app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, map[string]interface{}{"msg": "pong"}) })

1
go.mod

@ -3,6 +3,7 @@ module github.com/dtm-labs/dtm
go 1.15
require (
bou.ke/monkey v1.0.2 // indirect
github.com/dtm-labs/dtmdriver v0.0.1
github.com/dtm-labs/dtmdriver-gozero v0.0.1
github.com/dtm-labs/dtmdriver-polaris v0.0.2

2
go.sum

@ -1,3 +1,5 @@
bou.ke/monkey v1.0.2 h1:kWcnsrCNUatbxncxR/ThdYqbytgOIArtYWqcQLQzKLI=
bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=

2
helper/test-cover.sh

@ -2,7 +2,7 @@ set -x
echo "" > coverage.txt
for store in redis mysql boltdb; do
for d in $(go list ./... | grep -v vendor); do
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
echo > profile.out

32
test/busi/barrier.go

@ -17,29 +17,29 @@ import (
)
func init() {
setupFuncs["TccBarrierSetup"] = func(app *gin.Engine) {
setupFuncs["BarrierSetup"] = func(app *gin.Engine) {
app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaAdjustBalance(tx, transInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
}))
app.POST(BusiAPI+"/SagaBTransInCompensate", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaAdjustBalance(tx, transInUID, -reqFrom(c).Amount, "")
return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
})
}))
app.POST(BusiAPI+"/SagaBTransOut", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaAdjustBalance(tx, transOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)
return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)
})
}))
app.POST(BusiAPI+"/SagaBTransOutCompensate", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaAdjustBalance(tx, transOutUID, reqFrom(c).Amount, "")
return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, "")
})
}))
app.POST(BusiAPI+"/SagaBTransOutGorm", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
@ -47,7 +47,7 @@ func init() {
barrier := MustBarrierFromGin(c)
tx := dbGet().DB.Begin()
return dtmcli.MapSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(tx1 *sql.Tx) error {
return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, transOutUID).Error
return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, TransOutUID).Error
})
}))
@ -57,17 +57,17 @@ func init() {
return req.TransInResult, nil
}
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, transInUID, req.Amount)
return tccAdjustTrading(tx, TransInUID, req.Amount)
})
}))
app.POST(BusiAPI+"/TccBTransInConfirm", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustBalance(tx, transInUID, reqFrom(c).Amount)
return tccAdjustBalance(tx, TransInUID, reqFrom(c).Amount)
})
}))
app.POST(BusiAPI+"/TccBTransInCancel", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, transInUID, -reqFrom(c).Amount)
return tccAdjustTrading(tx, TransInUID, -reqFrom(c).Amount)
})
}))
app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
@ -76,12 +76,12 @@ func init() {
return req.TransOutResult, nil
}
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, transOutUID, -req.Amount)
return tccAdjustTrading(tx, TransOutUID, -req.Amount)
})
}))
app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustBalance(tx, transOutUID, -reqFrom(c).Amount)
return tccAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount)
})
}))
app.POST(BusiAPI+"/TccBTransOutCancel", dtmutil.WrapHandler(TccBarrierTransOutCancel))
@ -91,34 +91,34 @@ func init() {
// TccBarrierTransOutCancel will be use in test
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, transOutUID, reqFrom(c).Amount)
return tccAdjustTrading(tx, TransOutUID, reqFrom(c).Amount)
})
}
func (s *busiServer) TransInBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcAdjustBalance(tx, transInUID, in.Amount, in.TransInResult)
return sagaGrpcAdjustBalance(tx, TransInUID, in.Amount, in.TransInResult)
})
}
func (s *busiServer) TransOutBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcAdjustBalance(tx, transOutUID, -in.Amount, in.TransOutResult)
return sagaGrpcAdjustBalance(tx, TransOutUID, -in.Amount, in.TransOutResult)
})
}
func (s *busiServer) TransInRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcAdjustBalance(tx, transInUID, -in.Amount, "")
return sagaGrpcAdjustBalance(tx, TransInUID, -in.Amount, "")
})
}
func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcAdjustBalance(tx, transOutUID, in.Amount, "")
return sagaGrpcAdjustBalance(tx, TransOutUID, in.Amount, "")
})
}

10
test/busi/base_grpc.go

@ -63,9 +63,9 @@ type busiServer struct {
UnimplementedBusiServer
}
func (s *busiServer) CanSubmit(ctx context.Context, in *BusiReq) (*BusiReply, error) {
res := MainSwitch.CanSubmitResult.Fetch()
return &BusiReply{Message: "a sample"}, dtmgimp.Result2Error(res, nil)
func (s *busiServer) QueryPrepared(ctx context.Context, in *BusiReq) (*BusiReply, error) {
res := MainSwitch.QueryPreparedResult.Fetch()
return &BusiReply{Message: "a sample data"}, dtmgimp.Result2Error(res, nil)
}
func (s *busiServer) TransIn(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
@ -102,13 +102,13 @@ func (s *busiServer) TransOutTcc(ctx context.Context, in *BusiReq) (*emptypb.Emp
func (s *busiServer) TransInXa(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
return &emptypb.Empty{}, XaGrpcClient.XaLocalTransaction(ctx, in, func(db *sql.DB, xa *dtmgrpc.XaGrpc) error {
return sagaGrpcAdjustBalance(db, transInUID, in.Amount, in.TransInResult)
return sagaGrpcAdjustBalance(db, TransInUID, in.Amount, in.TransInResult)
})
}
func (s *busiServer) TransOutXa(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
return &emptypb.Empty{}, XaGrpcClient.XaLocalTransaction(ctx, in, func(db *sql.DB, xa *dtmgrpc.XaGrpc) error {
return sagaGrpcAdjustBalance(db, transOutUID, in.Amount, in.TransOutResult)
return sagaGrpcAdjustBalance(db, TransOutUID, in.Amount, in.TransOutResult)
})
}

18
test/busi/base_http.go

@ -99,19 +99,25 @@ func BaseAddRoute(app *gin.Engine) {
app.POST(BusiAPI+"/TransOutRevert", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert")
}))
app.GET(BusiAPI+"/CanSubmit", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
logger.Debugf("%s CanSubmit", c.Query("gid"))
return dtmimp.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil
app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
logger.Debugf("%s QueryPrepared", c.Query("gid"))
return dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess), nil
}))
app.GET(BusiAPI+"/QueryPreparedB", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
logger.Debugf("%s QueryPreparedB", c.Query("gid"))
bb := MustBarrierFromGin(c)
db := dbGet().ToSQLDB()
return error2Resp(bb.QueryPrepared(db))
}))
app.POST(BusiAPI+"/TransInXa", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error {
return sagaAdjustBalance(db, transInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
return SagaAdjustBalance(db, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
return error2Resp(err)
}))
app.POST(BusiAPI+"/TransOutXa", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error {
return sagaAdjustBalance(db, transOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult)
return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult)
})
return error2Resp(err)
}))
@ -137,7 +143,7 @@ func BaseAddRoute(app *gin.Engine) {
if err != nil {
return err
}
dbr := gdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, transOutUID)
dbr := gdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, TransOutUID)
return dbr.Error
})
return error2Resp(err)

13
test/busi/base_types.go

@ -32,15 +32,10 @@ func (*UserAccount) TableName() string {
return "dtm_busi.user_account"
}
func GetUserAccountByUid(uid int) *UserAccount {
func GetBalanceByUid(uid int) int {
ua := UserAccount{}
dbr := dbGet().Must().Model(&ua).Where("user_id=?", uid).First(&ua)
dtmimp.E2P(dbr.Error)
return &ua
}
func IsEqual(ua1, ua2 *UserAccount) bool {
return ua1.UserId == ua2.UserId && ua1.Balance == ua2.Balance && ua1.TradingBalance == ua2.TradingBalance
_ = dbGet().Must().Model(&ua).Where("user_id=?", uid).First(&ua)
return dtmimp.MustAtoi(ua.Balance[:len(ua.Balance)-3])
}
// TransReq transaction request payload
@ -118,7 +113,7 @@ type mainSwitchType struct {
TransOutConfirmResult AutoEmptyString
TransInRevertResult AutoEmptyString
TransOutRevertResult AutoEmptyString
CanSubmitResult AutoEmptyString
QueryPreparedResult AutoEmptyString
NextResult AutoEmptyString
}

6
test/busi/busi.go

@ -13,8 +13,8 @@ import (
status "google.golang.org/grpc/status"
)
const transOutUID = 1
const transInUID = 2
const TransOutUID = 1
const TransInUID = 2
func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error {
res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess)
@ -59,7 +59,7 @@ func sagaGrpcAdjustBalance(db dtmcli.DB, uid int, amount int64, result string) e
}
func sagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
if strings.Contains(result, dtmcli.ResultFailure) {
return dtmcli.ErrFailure
}

178
test/busi/busi.pb.go

@ -148,77 +148,81 @@ var file_test_busi_busi_proto_rawDesc = []byte{
0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x32, 0xce, 0x08, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x2d, 0x0a, 0x09, 0x43, 0x61,
0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42,
0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69,
0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33, 0x0a,
0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x12, 0x38, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76,
0x65, 0x72, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52,
0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x12, 0x0d,
0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73,
0x49, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x43, 0x6f,
0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73,
0x65, 0x32, 0x8d, 0x09, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72,
0x61, 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73,
0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c,
0x0a, 0x08, 0x58, 0x61, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x09,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x58, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x58, 0x61,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33,
0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73,
0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65,
0x76, 0x65, 0x72, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69,
0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a,
0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x12,
0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x49, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73,
0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12,
0x3c, 0x0a, 0x08, 0x58, 0x61, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a,
0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x58, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73,
0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x58,
0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71,
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72,
0x61, 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e,
0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x12, 0x36, 0x0a, 0x0b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x54, 0x63, 0x63,
0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42,
0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00,
0x12, 0x36, 0x0a, 0x0b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x54, 0x63, 0x63, 0x12,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x12, 0x0d, 0x2e,
0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49,
0x6e, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12,
0x38, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61,
0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12,
0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62,
0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e,
0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73,
0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38,
0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12,
0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d,
0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, 0x73,
0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d,
0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73,
0x4f, 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x59, 0x65, 0x73, 0x12, 0x0d, 0x2e, 0x62,
0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75,
0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x4f, 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x59, 0x65, 0x73, 0x12, 0x0d, 0x2e,
0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f,
0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73,
0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70,
0x61, 0x72, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69,
0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50,
0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e,
0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (
@ -240,26 +244,26 @@ var file_test_busi_busi_proto_goTypes = []interface{}{
(*emptypb.Empty)(nil), // 2: google.protobuf.Empty
}
var file_test_busi_busi_proto_depIdxs = []int32{
0, // 0: busi.Busi.CanSubmit:input_type -> busi.BusiReq
0, // 1: busi.Busi.TransIn:input_type -> busi.BusiReq
0, // 2: busi.Busi.TransOut:input_type -> busi.BusiReq
0, // 3: busi.Busi.TransInRevert:input_type -> busi.BusiReq
0, // 4: busi.Busi.TransOutRevert:input_type -> busi.BusiReq
0, // 5: busi.Busi.TransInConfirm:input_type -> busi.BusiReq
0, // 6: busi.Busi.TransOutConfirm:input_type -> busi.BusiReq
2, // 7: busi.Busi.XaNotify:input_type -> google.protobuf.Empty
0, // 8: busi.Busi.TransInXa:input_type -> busi.BusiReq
0, // 9: busi.Busi.TransOutXa:input_type -> busi.BusiReq
0, // 10: busi.Busi.TransInTcc:input_type -> busi.BusiReq
0, // 11: busi.Busi.TransOutTcc:input_type -> busi.BusiReq
0, // 12: busi.Busi.TransInTccNested:input_type -> busi.BusiReq
0, // 13: busi.Busi.TransInBSaga:input_type -> busi.BusiReq
0, // 14: busi.Busi.TransOutBSaga:input_type -> busi.BusiReq
0, // 15: busi.Busi.TransInRevertBSaga:input_type -> busi.BusiReq
0, // 16: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq
0, // 17: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq
0, // 18: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq
1, // 19: busi.Busi.CanSubmit:output_type -> busi.BusiReply
0, // 0: busi.Busi.TransIn:input_type -> busi.BusiReq
0, // 1: busi.Busi.TransOut:input_type -> busi.BusiReq
0, // 2: busi.Busi.TransInRevert:input_type -> busi.BusiReq
0, // 3: busi.Busi.TransOutRevert:input_type -> busi.BusiReq
0, // 4: busi.Busi.TransInConfirm:input_type -> busi.BusiReq
0, // 5: busi.Busi.TransOutConfirm:input_type -> busi.BusiReq
2, // 6: busi.Busi.XaNotify:input_type -> google.protobuf.Empty
0, // 7: busi.Busi.TransInXa:input_type -> busi.BusiReq
0, // 8: busi.Busi.TransOutXa:input_type -> busi.BusiReq
0, // 9: busi.Busi.TransInTcc:input_type -> busi.BusiReq
0, // 10: busi.Busi.TransOutTcc:input_type -> busi.BusiReq
0, // 11: busi.Busi.TransInTccNested:input_type -> busi.BusiReq
0, // 12: busi.Busi.TransInBSaga:input_type -> busi.BusiReq
0, // 13: busi.Busi.TransOutBSaga:input_type -> busi.BusiReq
0, // 14: busi.Busi.TransInRevertBSaga:input_type -> busi.BusiReq
0, // 15: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq
0, // 16: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq
0, // 17: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq
0, // 18: busi.Busi.QueryPrepared:input_type -> busi.BusiReq
0, // 19: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq
2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty
2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty
2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty
@ -278,8 +282,10 @@ var file_test_busi_busi_proto_depIdxs = []int32{
2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty
2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty
2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty
19, // [19:38] is the sub-list for method output_type
0, // [0:19] is the sub-list for method input_type
1, // 38: busi.Busi.QueryPrepared:output_type -> busi.BusiReply
2, // 39: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty
20, // [20:40] is the sub-list for method output_type
0, // [0:20] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name

3
test/busi/busi.proto

@ -17,7 +17,6 @@ message BusiReply {
}
// The dtm service definition.
service Busi {
rpc CanSubmit(BusiReq) returns (BusiReply) {}
rpc TransIn(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOut(BusiReq) returns (google.protobuf.Empty) {}
rpc TransInRevert(BusiReq) returns (google.protobuf.Empty) {}
@ -38,5 +37,7 @@ service Busi {
rpc TransOutRevertBSaga(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutHeaderYes(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {}
rpc QueryPrepared(BusiReq) returns (BusiReply) {}
rpc QueryPreparedB(BusiReq) returns (google.protobuf.Empty) {}
}

108
test/busi/busi_grpc.pb.go

@ -19,7 +19,6 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type BusiClient interface {
CanSubmit(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error)
TransIn(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOut(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransInRevert(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
@ -38,6 +37,8 @@ type BusiClient interface {
TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error)
QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type busiClient struct {
@ -48,15 +49,6 @@ func NewBusiClient(cc grpc.ClientConnInterface) BusiClient {
return &busiClient{cc}
}
func (c *busiClient) CanSubmit(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) {
out := new(BusiReply)
err := c.cc.Invoke(ctx, "/busi.Busi/CanSubmit", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransIn(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransIn", in, out, opts...)
@ -219,11 +211,28 @@ func (c *busiClient) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...
return out, nil
}
func (c *busiClient) QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) {
out := new(BusiReply)
err := c.cc.Invoke(ctx, "/busi.Busi/QueryPrepared", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/QueryPreparedB", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BusiServer is the server API for Busi service.
// All implementations must embed UnimplementedBusiServer
// for forward compatibility
type BusiServer interface {
CanSubmit(context.Context, *BusiReq) (*BusiReply, error)
TransIn(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOut(context.Context, *BusiReq) (*emptypb.Empty, error)
TransInRevert(context.Context, *BusiReq) (*emptypb.Empty, error)
@ -242,6 +251,8 @@ type BusiServer interface {
TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error)
QueryPrepared(context.Context, *BusiReq) (*BusiReply, error)
QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error)
mustEmbedUnimplementedBusiServer()
}
@ -249,9 +260,6 @@ type BusiServer interface {
type UnimplementedBusiServer struct {
}
func (UnimplementedBusiServer) CanSubmit(context.Context, *BusiReq) (*BusiReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method CanSubmit not implemented")
}
func (UnimplementedBusiServer) TransIn(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransIn not implemented")
}
@ -306,6 +314,12 @@ func (UnimplementedBusiServer) TransOutHeaderYes(context.Context, *BusiReq) (*em
func (UnimplementedBusiServer) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderNo not implemented")
}
func (UnimplementedBusiServer) QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method QueryPrepared not implemented")
}
func (UnimplementedBusiServer) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedB not implemented")
}
func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {}
// UnsafeBusiServer may be embedded to opt out of forward compatibility for this service.
@ -319,24 +333,6 @@ func RegisterBusiServer(s grpc.ServiceRegistrar, srv BusiServer) {
s.RegisterService(&Busi_ServiceDesc, srv)
}
func _Busi_CanSubmit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).CanSubmit(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/CanSubmit",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).CanSubmit(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransIn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
@ -661,6 +657,42 @@ func _Busi_TransOutHeaderNo_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _Busi_QueryPrepared_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).QueryPrepared(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/QueryPrepared",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).QueryPrepared(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_QueryPreparedB_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).QueryPreparedB(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/QueryPreparedB",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).QueryPreparedB(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
// Busi_ServiceDesc is the grpc.ServiceDesc for Busi service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -668,10 +700,6 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
ServiceName: "busi.Busi",
HandlerType: (*BusiServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CanSubmit",
Handler: _Busi_CanSubmit_Handler,
},
{
MethodName: "TransIn",
Handler: _Busi_TransIn_Handler,
@ -744,6 +772,14 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
MethodName: "TransOutHeaderNo",
Handler: _Busi_TransOutHeaderNo_Handler,
},
{
MethodName: "QueryPrepared",
Handler: _Busi_QueryPrepared_Handler,
},
{
MethodName: "QueryPreparedB",
Handler: _Busi_QueryPreparedB_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "test/busi/busi.proto",

10
test/main_test.go

@ -16,6 +16,7 @@ import (
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
"github.com/dtm-labs/dtm/test/busi"
"github.com/go-resty/resty/v2"
)
@ -55,11 +56,16 @@ func TestMain(m *testing.M) {
conf.Store.Password = ""
conf.Store.Port = 6379
}
registry.WaitStoreUp()
dtmsvr.PopulateDB(false)
go dtmsvr.StartSvr()
busi.PopulateDB(false)
_ = busi.Startup()
exitIf(m.Run())
r := m.Run()
exitIf(r)
close(dtmsvr.TransProcessedTestChan)
gid, more := <-dtmsvr.TransProcessedTestChan
logger.FatalfIf(more, "extra gid: %s in test chan", gid)
}

105
test/msg_barrier_test.go

@ -0,0 +1,105 @@
package test
import (
"database/sql"
"errors"
"reflect"
"testing"
"bou.ke/monkey"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgPrepareAndSubmit(t *testing.T) {
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assertNotSameBalance(t, before)
}
func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) {
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return errors.New("an error")
})
assert.Error(t, err)
assertSameBalance(t, before)
}
func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) {
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer+"not-exists", gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
assert.Error(t, err)
assertSameBalance(t, before)
}
func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var g *monkey.PatchGuard
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
g = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
logger.Debugf("tx.Commit rollback and return error in test")
_ = tx.Rollback()
return errors.New("test error for patch")
})
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
g.Unpatch()
assert.Error(t, err)
cronTransOnceForwardNow(180)
assertSameBalance(t, before)
}
func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var guard *monkey.PatchGuard
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
guard.Unpatch()
_ = tx.Commit()
return errors.New("test error for patch")
})
return err
})
assert.Error(t, err)
cronTransOnceForwardNow(180)
assertNotSameBalance(t, before)
}

8
test/msg_grpc_test.go

@ -31,7 +31,7 @@ func TestMsgGrpcTimeoutSuccess(t *testing.T) {
msg := genGrpcMsg(dtmimp.GetFuncName())
err := msg.Prepare("")
assert.Nil(t, err)
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
@ -48,10 +48,10 @@ func TestMsgGrpcTimeoutFailed(t *testing.T) {
msg := genGrpcMsg(dtmimp.GetFuncName())
msg.Prepare("")
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultFailure)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure)
cronTransOnceForwardNow(180)
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
@ -62,6 +62,6 @@ func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc {
msg := dtmgrpc.NewMsgGrpc(dtmutil.DefaultGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransOut", req).
Add(busi.BusiGrpc+"/busi.Busi/TransIn", req)
msg.QueryPrepared = fmt.Sprintf("%s/busi.Busi/CanSubmit", busi.BusiGrpc)
msg.QueryPrepared = fmt.Sprintf("%s/busi.Busi/QueryPrepared", busi.BusiGrpc)
return msg
}

2
test/msg_options_test.go

@ -47,7 +47,7 @@ func TestMsgOptionsTimeoutFailed(t *testing.T) {
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultFailure)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure)
cronTransOnceForwardNow(180)
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
}

11
test/msg_test.go

@ -29,7 +29,7 @@ func TestMsgTimeoutSuccess(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
msg.Prepare("")
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing)
@ -45,10 +45,13 @@ func TestMsgTimeoutFailed(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
msg.Prepare("")
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing)
busi.MainSwitch.QueryPreparedResult.SetOnce("OTHER_ERROR")
cronTransOnceForwardNow(180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultFailure)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(360)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure)
cronTransOnceForwardNow(180)
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
@ -71,6 +74,6 @@ func genMsg(gid string) *dtmcli.Msg {
msg := dtmcli.NewMsg(dtmutil.DefaultHttpServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)
msg.QueryPrepared = busi.Busi + "/CanSubmit"
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
return msg
}

4
test/store_test.go

@ -88,10 +88,6 @@ func TestStoreLockTrans(t *testing.T) {
assert.Nil(t, g2)
}
func TestStoreWait(t *testing.T) {
registry.WaitStoreUp()
}
func TestUpdateBranches(t *testing.T) {
if !conf.Store.IsDB() {
_, err := registry.GetStore().UpdateBranches(nil, nil)

6
test/tcc_barrier_test.go

@ -51,8 +51,7 @@ func TestTccBarrierRollback(t *testing.T) {
}
func TestTccBarrierDisorder(t *testing.T) {
ua1 := busi.GetUserAccountByUid(1)
ua2 := busi.GetUserAccountByUid(2)
before := getBeforeBalances()
cancelFinishedChan := make(chan string, 2)
cancelCanReturnChan := make(chan string, 2)
gid := dtmimp.GetFuncName()
@ -123,8 +122,7 @@ func TestTccBarrierDisorder(t *testing.T) {
assert.Error(t, err, fmt.Errorf("a cancelled tcc"))
assert.Equal(t, []string{StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.True(t, busi.IsEqual(ua1, busi.GetUserAccountByUid(1)))
assert.True(t, busi.IsEqual(ua2, busi.GetUserAccountByUid(2)))
assertSameBalance(t, before)
}
func TestTccBarrierPanic(t *testing.T) {

22
test/types.go

@ -7,6 +7,7 @@
package test
import (
"testing"
"time"
"github.com/dtm-labs/dtm/dtmcli"
@ -16,6 +17,7 @@ import (
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
var conf = &config.Config
@ -80,3 +82,23 @@ const (
// StatusAborting status for global trans status.
StatusAborting = dtmcli.StatusAborting
)
func getBeforeBalances() []int {
b1 := busi.GetBalanceByUid(busi.TransOutUID)
b2 := busi.GetBalanceByUid(busi.TransInUID)
return []int{b1, b2}
}
func assertSameBalance(t *testing.T, before []int) {
b1 := busi.GetBalanceByUid(busi.TransOutUID)
b2 := busi.GetBalanceByUid(busi.TransInUID)
assert.Equal(t, before[0], b1)
assert.Equal(t, before[1], b2)
}
func assertNotSameBalance(t *testing.T, before []int) {
b1 := busi.GetBalanceByUid(busi.TransOutUID)
b2 := busi.GetBalanceByUid(busi.TransInUID)
assert.NotEqual(t, before[0], b1)
assert.Equal(t, before[0]+before[1], b1+b2)
}

Loading…
Cancel
Save