diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 62f59b2..5cffeba 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -72,7 +72,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) } else if rerr != nil { tx.Rollback() } else { - tx.Commit() + rerr = tx.Commit() } }() ti := bb @@ -100,3 +100,16 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error } return bb.Call(tx, busiCall) } + +func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { + _, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback") + var reason string + if err == nil { + sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) + err = db.QueryRow(sql, bb.Gid, "00", "msg", "01").Scan(&reason) + } + if reason == "rollback" { + return ErrFailure + } + return err +} diff --git a/dtmcli/msg.go b/dtmcli/msg.go index a461ea2..815b97d 100644 --- a/dtmcli/msg.go +++ b/dtmcli/msg.go @@ -6,7 +6,11 @@ package dtmcli -import "github.com/dtm-labs/dtm/dtmcli/dtmimp" +import ( + "database/sql" + + "github.com/dtm-labs/dtm/dtmcli/dtmimp" +) // Msg reliable msg type type Msg struct { @@ -35,3 +39,20 @@ func (s *Msg) Prepare(queryPrepared string) error { func (s *Msg) Submit() error { return dtmimp.TransCallDtm(&s.TransBase, s, "submit") } + +func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error { + bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared + if err == nil { + err = bb.CallWithDB(db, func(tx *sql.Tx) error { + err := busiCall(tx) + if err == nil { + err = s.Prepare(queryPrepared) + } + return err + }) + } + if err == nil { + err = s.Submit() + } + return err +} diff --git a/dtmgrpc/dtmgimp/grpc_clients.go b/dtmgrpc/dtmgimp/grpc_clients.go index dc21e81..b299b9a 100644 --- a/dtmgrpc/dtmgimp/grpc_clients.go +++ b/dtmgrpc/dtmgimp/grpc_clients.go @@ -43,11 +43,6 @@ func MustGetDtmClient(grpcServer string) dtmgpb.DtmClient { return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, false)) } -// MustGetRawDtmClient must get raw codec grpc conn -func MustGetRawDtmClient(grpcServer string) dtmgpb.DtmClient { - return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, true)) -} - // GetGrpcConn 1 func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr error) { clients := &normalClients diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go index 3f62f74..3435042 100644 --- a/dtmsvr/trans_type_msg.go +++ b/dtmsvr/trans_type_msg.go @@ -42,7 +42,7 @@ func (t *TransGlobal) mayQueryPrepared() { if !t.needProcess() || t.Status == dtmcli.StatusSubmitted { return } - body, err := t.getURLResult(t.QueryPrepared, "", "", nil) + body, err := t.getURLResult(t.QueryPrepared, "00", "msg", nil) if strings.Contains(body, dtmcli.ResultSuccess) { t.changeStatus(dtmcli.StatusSubmitted) } else if strings.Contains(body, dtmcli.ResultFailure) { @@ -50,7 +50,7 @@ func (t *TransGlobal) mayQueryPrepared() { } else if strings.Contains(body, dtmcli.ResultOngoing) { t.touchCronTime(cronReset) } else { - logger.Errorf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error()) + logger.Errorf("getting result failed for %s. error: %v body %s", t.QueryPrepared, err, body) t.touchCronTime(cronBackoff) } } diff --git a/dtmutil/utils.go b/dtmutil/utils.go index 4fbbc7d..0b08915 100644 --- a/dtmutil/utils.go +++ b/dtmutil/utils.go @@ -38,7 +38,7 @@ func GetGinApp() *gin.Engine { c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb)) } } - logger.Debugf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + logger.Debugf("begin %s %s body: %s", c.Request.Method, c.Request.URL, body) c.Next() }) app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, map[string]interface{}{"msg": "pong"}) }) diff --git a/go.mod b/go.mod index 306fad3..f62a410 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/dtm-labs/dtm go 1.15 require ( + bou.ke/monkey v1.0.2 // indirect github.com/dtm-labs/dtmdriver v0.0.1 github.com/dtm-labs/dtmdriver-gozero v0.0.1 github.com/dtm-labs/dtmdriver-polaris v0.0.2 diff --git a/go.sum b/go.sum index d67335c..609d7e6 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +bou.ke/monkey v1.0.2 h1:kWcnsrCNUatbxncxR/ThdYqbytgOIArtYWqcQLQzKLI= +bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= diff --git a/helper/test-cover.sh b/helper/test-cover.sh index ca14aa6..13bd654 100644 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -2,7 +2,7 @@ set -x echo "" > coverage.txt for store in redis mysql boltdb; do for d in $(go list ./... | grep -v vendor); do - TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d + TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 if [ -f profile.out ]; then cat profile.out >> coverage.txt echo > profile.out diff --git a/test/busi/barrier.go b/test/busi/barrier.go index df30145..8ca19de 100644 --- a/test/busi/barrier.go +++ b/test/busi/barrier.go @@ -17,29 +17,29 @@ import ( ) func init() { - setupFuncs["TccBarrierSetup"] = func(app *gin.Engine) { + setupFuncs["BarrierSetup"] = func(app *gin.Engine) { app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { barrier := MustBarrierFromGin(c) return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaAdjustBalance(tx, transInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) + return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) }) })) app.POST(BusiAPI+"/SagaBTransInCompensate", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { barrier := MustBarrierFromGin(c) return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaAdjustBalance(tx, transInUID, -reqFrom(c).Amount, "") + return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "") }) })) app.POST(BusiAPI+"/SagaBTransOut", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { barrier := MustBarrierFromGin(c) return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaAdjustBalance(tx, transOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult) + return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult) }) })) app.POST(BusiAPI+"/SagaBTransOutCompensate", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { barrier := MustBarrierFromGin(c) return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaAdjustBalance(tx, transOutUID, reqFrom(c).Amount, "") + return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, "") }) })) app.POST(BusiAPI+"/SagaBTransOutGorm", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { @@ -47,7 +47,7 @@ func init() { barrier := MustBarrierFromGin(c) tx := dbGet().DB.Begin() return dtmcli.MapSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(tx1 *sql.Tx) error { - return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, transOutUID).Error + return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, TransOutUID).Error }) })) @@ -57,17 +57,17 @@ func init() { return req.TransInResult, nil } return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { - return tccAdjustTrading(tx, transInUID, req.Amount) + return tccAdjustTrading(tx, TransInUID, req.Amount) }) })) app.POST(BusiAPI+"/TccBTransInConfirm", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { - return tccAdjustBalance(tx, transInUID, reqFrom(c).Amount) + return tccAdjustBalance(tx, TransInUID, reqFrom(c).Amount) }) })) app.POST(BusiAPI+"/TccBTransInCancel", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { - return tccAdjustTrading(tx, transInUID, -reqFrom(c).Amount) + return tccAdjustTrading(tx, TransInUID, -reqFrom(c).Amount) }) })) app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { @@ -76,12 +76,12 @@ func init() { return req.TransOutResult, nil } return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { - return tccAdjustTrading(tx, transOutUID, -req.Amount) + return tccAdjustTrading(tx, TransOutUID, -req.Amount) }) })) app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { - return tccAdjustBalance(tx, transOutUID, -reqFrom(c).Amount) + return tccAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount) }) })) app.POST(BusiAPI+"/TccBTransOutCancel", dtmutil.WrapHandler(TccBarrierTransOutCancel)) @@ -91,34 +91,34 @@ func init() { // TccBarrierTransOutCancel will be use in test func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { - return tccAdjustTrading(tx, transOutUID, reqFrom(c).Amount) + return tccAdjustTrading(tx, TransOutUID, reqFrom(c).Amount) }) } func (s *busiServer) TransInBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { barrier := MustBarrierFromGrpc(ctx) return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaGrpcAdjustBalance(tx, transInUID, in.Amount, in.TransInResult) + return sagaGrpcAdjustBalance(tx, TransInUID, in.Amount, in.TransInResult) }) } func (s *busiServer) TransOutBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { barrier := MustBarrierFromGrpc(ctx) return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaGrpcAdjustBalance(tx, transOutUID, -in.Amount, in.TransOutResult) + return sagaGrpcAdjustBalance(tx, TransOutUID, -in.Amount, in.TransOutResult) }) } func (s *busiServer) TransInRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { barrier := MustBarrierFromGrpc(ctx) return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaGrpcAdjustBalance(tx, transInUID, -in.Amount, "") + return sagaGrpcAdjustBalance(tx, TransInUID, -in.Amount, "") }) } func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { barrier := MustBarrierFromGrpc(ctx) return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error { - return sagaGrpcAdjustBalance(tx, transOutUID, in.Amount, "") + return sagaGrpcAdjustBalance(tx, TransOutUID, in.Amount, "") }) } diff --git a/test/busi/base_grpc.go b/test/busi/base_grpc.go index 6fd3790..3c71a50 100644 --- a/test/busi/base_grpc.go +++ b/test/busi/base_grpc.go @@ -63,9 +63,9 @@ type busiServer struct { UnimplementedBusiServer } -func (s *busiServer) CanSubmit(ctx context.Context, in *BusiReq) (*BusiReply, error) { - res := MainSwitch.CanSubmitResult.Fetch() - return &BusiReply{Message: "a sample"}, dtmgimp.Result2Error(res, nil) +func (s *busiServer) QueryPrepared(ctx context.Context, in *BusiReq) (*BusiReply, error) { + res := MainSwitch.QueryPreparedResult.Fetch() + return &BusiReply{Message: "a sample data"}, dtmgimp.Result2Error(res, nil) } func (s *busiServer) TransIn(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { @@ -102,13 +102,13 @@ func (s *busiServer) TransOutTcc(ctx context.Context, in *BusiReq) (*emptypb.Emp func (s *busiServer) TransInXa(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { return &emptypb.Empty{}, XaGrpcClient.XaLocalTransaction(ctx, in, func(db *sql.DB, xa *dtmgrpc.XaGrpc) error { - return sagaGrpcAdjustBalance(db, transInUID, in.Amount, in.TransInResult) + return sagaGrpcAdjustBalance(db, TransInUID, in.Amount, in.TransInResult) }) } func (s *busiServer) TransOutXa(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { return &emptypb.Empty{}, XaGrpcClient.XaLocalTransaction(ctx, in, func(db *sql.DB, xa *dtmgrpc.XaGrpc) error { - return sagaGrpcAdjustBalance(db, transOutUID, in.Amount, in.TransOutResult) + return sagaGrpcAdjustBalance(db, TransOutUID, in.Amount, in.TransOutResult) }) } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index fe47dd7..83da433 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -99,19 +99,25 @@ func BaseAddRoute(app *gin.Engine) { app.POST(BusiAPI+"/TransOutRevert", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert") })) - app.GET(BusiAPI+"/CanSubmit", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { - logger.Debugf("%s CanSubmit", c.Query("gid")) - return dtmimp.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil + app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { + logger.Debugf("%s QueryPrepared", c.Query("gid")) + return dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess), nil + })) + app.GET(BusiAPI+"/QueryPreparedB", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { + logger.Debugf("%s QueryPreparedB", c.Query("gid")) + bb := MustBarrierFromGin(c) + db := dbGet().ToSQLDB() + return error2Resp(bb.QueryPrepared(db)) })) app.POST(BusiAPI+"/TransInXa", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error { - return sagaAdjustBalance(db, transInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) + return SagaAdjustBalance(db, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) }) return error2Resp(err) })) app.POST(BusiAPI+"/TransOutXa", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error { - return sagaAdjustBalance(db, transOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) + return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) }) return error2Resp(err) })) @@ -137,7 +143,7 @@ func BaseAddRoute(app *gin.Engine) { if err != nil { return err } - dbr := gdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, transOutUID) + dbr := gdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, TransOutUID) return dbr.Error }) return error2Resp(err) diff --git a/test/busi/base_types.go b/test/busi/base_types.go index 471bbe3..5aaaed9 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -32,15 +32,10 @@ func (*UserAccount) TableName() string { return "dtm_busi.user_account" } -func GetUserAccountByUid(uid int) *UserAccount { +func GetBalanceByUid(uid int) int { ua := UserAccount{} - dbr := dbGet().Must().Model(&ua).Where("user_id=?", uid).First(&ua) - dtmimp.E2P(dbr.Error) - return &ua -} - -func IsEqual(ua1, ua2 *UserAccount) bool { - return ua1.UserId == ua2.UserId && ua1.Balance == ua2.Balance && ua1.TradingBalance == ua2.TradingBalance + _ = dbGet().Must().Model(&ua).Where("user_id=?", uid).First(&ua) + return dtmimp.MustAtoi(ua.Balance[:len(ua.Balance)-3]) } // TransReq transaction request payload @@ -118,7 +113,7 @@ type mainSwitchType struct { TransOutConfirmResult AutoEmptyString TransInRevertResult AutoEmptyString TransOutRevertResult AutoEmptyString - CanSubmitResult AutoEmptyString + QueryPreparedResult AutoEmptyString NextResult AutoEmptyString } diff --git a/test/busi/busi.go b/test/busi/busi.go index 13eb7e8..7d6c064 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -13,8 +13,8 @@ import ( status "google.golang.org/grpc/status" ) -const transOutUID = 1 -const transInUID = 2 +const TransOutUID = 1 +const TransInUID = 2 func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error { res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess) @@ -59,7 +59,7 @@ func sagaGrpcAdjustBalance(db dtmcli.DB, uid int, amount int64, result string) e } -func sagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error { +func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error { if strings.Contains(result, dtmcli.ResultFailure) { return dtmcli.ErrFailure } diff --git a/test/busi/busi.pb.go b/test/busi/busi.pb.go index cfa8766..759886f 100644 --- a/test/busi/busi.pb.go +++ b/test/busi/busi.pb.go @@ -148,77 +148,81 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x32, 0xce, 0x08, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x2d, 0x0a, 0x09, 0x43, 0x61, - 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, - 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, - 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, - 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33, 0x0a, - 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, - 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x00, 0x12, 0x38, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, - 0x65, 0x72, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, - 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x12, 0x0d, - 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x49, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, - 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, + 0x65, 0x32, 0x8d, 0x09, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, + 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, - 0x0a, 0x08, 0x58, 0x61, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x09, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x58, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, - 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x58, 0x61, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33, + 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, + 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, + 0x76, 0x65, 0x72, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, + 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, + 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x12, + 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x49, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, + 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, + 0x3c, 0x0a, 0x08, 0x58, 0x61, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a, + 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x58, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, + 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x58, + 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, + 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, + 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, + 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, + 0x00, 0x12, 0x36, 0x0a, 0x0b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x54, 0x63, 0x63, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x35, 0x0a, 0x0a, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, - 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, - 0x12, 0x36, 0x0a, 0x0b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x54, 0x63, 0x63, 0x12, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x12, 0x0d, 0x2e, + 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, + 0x6e, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, + 0x38, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, + 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, - 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, - 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, - 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, - 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, - 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, - 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x4f, 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x59, 0x65, 0x73, 0x12, 0x0d, 0x2e, 0x62, - 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, - 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, - 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x4f, 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x59, 0x65, 0x73, 0x12, 0x0d, 0x2e, + 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, + 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, + 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, + 0x61, 0x72, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, + 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, + 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, + 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, + 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -240,26 +244,26 @@ var file_test_busi_busi_proto_goTypes = []interface{}{ (*emptypb.Empty)(nil), // 2: google.protobuf.Empty } var file_test_busi_busi_proto_depIdxs = []int32{ - 0, // 0: busi.Busi.CanSubmit:input_type -> busi.BusiReq - 0, // 1: busi.Busi.TransIn:input_type -> busi.BusiReq - 0, // 2: busi.Busi.TransOut:input_type -> busi.BusiReq - 0, // 3: busi.Busi.TransInRevert:input_type -> busi.BusiReq - 0, // 4: busi.Busi.TransOutRevert:input_type -> busi.BusiReq - 0, // 5: busi.Busi.TransInConfirm:input_type -> busi.BusiReq - 0, // 6: busi.Busi.TransOutConfirm:input_type -> busi.BusiReq - 2, // 7: busi.Busi.XaNotify:input_type -> google.protobuf.Empty - 0, // 8: busi.Busi.TransInXa:input_type -> busi.BusiReq - 0, // 9: busi.Busi.TransOutXa:input_type -> busi.BusiReq - 0, // 10: busi.Busi.TransInTcc:input_type -> busi.BusiReq - 0, // 11: busi.Busi.TransOutTcc:input_type -> busi.BusiReq - 0, // 12: busi.Busi.TransInTccNested:input_type -> busi.BusiReq - 0, // 13: busi.Busi.TransInBSaga:input_type -> busi.BusiReq - 0, // 14: busi.Busi.TransOutBSaga:input_type -> busi.BusiReq - 0, // 15: busi.Busi.TransInRevertBSaga:input_type -> busi.BusiReq - 0, // 16: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq - 0, // 17: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq - 0, // 18: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq - 1, // 19: busi.Busi.CanSubmit:output_type -> busi.BusiReply + 0, // 0: busi.Busi.TransIn:input_type -> busi.BusiReq + 0, // 1: busi.Busi.TransOut:input_type -> busi.BusiReq + 0, // 2: busi.Busi.TransInRevert:input_type -> busi.BusiReq + 0, // 3: busi.Busi.TransOutRevert:input_type -> busi.BusiReq + 0, // 4: busi.Busi.TransInConfirm:input_type -> busi.BusiReq + 0, // 5: busi.Busi.TransOutConfirm:input_type -> busi.BusiReq + 2, // 6: busi.Busi.XaNotify:input_type -> google.protobuf.Empty + 0, // 7: busi.Busi.TransInXa:input_type -> busi.BusiReq + 0, // 8: busi.Busi.TransOutXa:input_type -> busi.BusiReq + 0, // 9: busi.Busi.TransInTcc:input_type -> busi.BusiReq + 0, // 10: busi.Busi.TransOutTcc:input_type -> busi.BusiReq + 0, // 11: busi.Busi.TransInTccNested:input_type -> busi.BusiReq + 0, // 12: busi.Busi.TransInBSaga:input_type -> busi.BusiReq + 0, // 13: busi.Busi.TransOutBSaga:input_type -> busi.BusiReq + 0, // 14: busi.Busi.TransInRevertBSaga:input_type -> busi.BusiReq + 0, // 15: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq + 0, // 16: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq + 0, // 17: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq + 0, // 18: busi.Busi.QueryPrepared:input_type -> busi.BusiReq + 0, // 19: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq 2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty 2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty 2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty @@ -278,8 +282,10 @@ var file_test_busi_busi_proto_depIdxs = []int32{ 2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty 2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty 2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty - 19, // [19:38] is the sub-list for method output_type - 0, // [0:19] is the sub-list for method input_type + 1, // 38: busi.Busi.QueryPrepared:output_type -> busi.BusiReply + 2, // 39: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty + 20, // [20:40] is the sub-list for method output_type + 0, // [0:20] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/test/busi/busi.proto b/test/busi/busi.proto index 9d43abd..c94d8df 100644 --- a/test/busi/busi.proto +++ b/test/busi/busi.proto @@ -17,7 +17,6 @@ message BusiReply { } // The dtm service definition. service Busi { - rpc CanSubmit(BusiReq) returns (BusiReply) {} rpc TransIn(BusiReq) returns (google.protobuf.Empty) {} rpc TransOut(BusiReq) returns (google.protobuf.Empty) {} rpc TransInRevert(BusiReq) returns (google.protobuf.Empty) {} @@ -38,5 +37,7 @@ service Busi { rpc TransOutRevertBSaga(BusiReq) returns (google.protobuf.Empty) {} rpc TransOutHeaderYes(BusiReq) returns (google.protobuf.Empty) {} rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {} + rpc QueryPrepared(BusiReq) returns (BusiReply) {} + rpc QueryPreparedB(BusiReq) returns (google.protobuf.Empty) {} } diff --git a/test/busi/busi_grpc.pb.go b/test/busi/busi_grpc.pb.go index 095b295..f7db854 100644 --- a/test/busi/busi_grpc.pb.go +++ b/test/busi/busi_grpc.pb.go @@ -19,7 +19,6 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type BusiClient interface { - CanSubmit(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) TransIn(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOut(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransInRevert(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) @@ -38,6 +37,8 @@ type BusiClient interface { TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) + QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -48,15 +49,6 @@ func NewBusiClient(cc grpc.ClientConnInterface) BusiClient { return &busiClient{cc} } -func (c *busiClient) CanSubmit(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) { - out := new(BusiReply) - err := c.cc.Invoke(ctx, "/busi.Busi/CanSubmit", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *busiClient) TransIn(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/busi.Busi/TransIn", in, out, opts...) @@ -219,11 +211,28 @@ func (c *busiClient) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ... return out, nil } +func (c *busiClient) QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) { + out := new(BusiReply) + err := c.cc.Invoke(ctx, "/busi.Busi/QueryPrepared", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/QueryPreparedB", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility type BusiServer interface { - CanSubmit(context.Context, *BusiReq) (*BusiReply, error) TransIn(context.Context, *BusiReq) (*emptypb.Empty, error) TransOut(context.Context, *BusiReq) (*emptypb.Empty, error) TransInRevert(context.Context, *BusiReq) (*emptypb.Empty, error) @@ -242,6 +251,8 @@ type BusiServer interface { TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) + QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) + QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -249,9 +260,6 @@ type BusiServer interface { type UnimplementedBusiServer struct { } -func (UnimplementedBusiServer) CanSubmit(context.Context, *BusiReq) (*BusiReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method CanSubmit not implemented") -} func (UnimplementedBusiServer) TransIn(context.Context, *BusiReq) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransIn not implemented") } @@ -306,6 +314,12 @@ func (UnimplementedBusiServer) TransOutHeaderYes(context.Context, *BusiReq) (*em func (UnimplementedBusiServer) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderNo not implemented") } +func (UnimplementedBusiServer) QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method QueryPrepared not implemented") +} +func (UnimplementedBusiServer) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedB not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -319,24 +333,6 @@ func RegisterBusiServer(s grpc.ServiceRegistrar, srv BusiServer) { s.RegisterService(&Busi_ServiceDesc, srv) } -func _Busi_CanSubmit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(BusiReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BusiServer).CanSubmit(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/busi.Busi/CanSubmit", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BusiServer).CanSubmit(ctx, req.(*BusiReq)) - } - return interceptor(ctx, in, info, handler) -} - func _Busi_TransIn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BusiReq) if err := dec(in); err != nil { @@ -661,6 +657,42 @@ func _Busi_TransOutHeaderNo_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _Busi_QueryPrepared_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).QueryPrepared(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/QueryPrepared", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).QueryPrepared(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_QueryPreparedB_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).QueryPreparedB(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/QueryPreparedB", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).QueryPreparedB(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -668,10 +700,6 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ ServiceName: "busi.Busi", HandlerType: (*BusiServer)(nil), Methods: []grpc.MethodDesc{ - { - MethodName: "CanSubmit", - Handler: _Busi_CanSubmit_Handler, - }, { MethodName: "TransIn", Handler: _Busi_TransIn_Handler, @@ -744,6 +772,14 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "TransOutHeaderNo", Handler: _Busi_TransOutHeaderNo_Handler, }, + { + MethodName: "QueryPrepared", + Handler: _Busi_QueryPrepared_Handler, + }, + { + MethodName: "QueryPreparedB", + Handler: _Busi_QueryPreparedB_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "test/busi/busi.proto", diff --git a/test/main_test.go b/test/main_test.go index 2fa6a1c..0f4add2 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -16,6 +16,7 @@ import ( "github.com/dtm-labs/dtm/dtmgrpc" "github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmsvr/config" + "github.com/dtm-labs/dtm/dtmsvr/storage/registry" "github.com/dtm-labs/dtm/test/busi" "github.com/go-resty/resty/v2" ) @@ -55,11 +56,16 @@ func TestMain(m *testing.M) { conf.Store.Password = "" conf.Store.Port = 6379 } + registry.WaitStoreUp() + dtmsvr.PopulateDB(false) go dtmsvr.StartSvr() busi.PopulateDB(false) _ = busi.Startup() - exitIf(m.Run()) - + r := m.Run() + exitIf(r) + close(dtmsvr.TransProcessedTestChan) + gid, more := <-dtmsvr.TransProcessedTestChan + logger.FatalfIf(more, "extra gid: %s in test chan", gid) } diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go new file mode 100644 index 0000000..fe586e0 --- /dev/null +++ b/test/msg_barrier_test.go @@ -0,0 +1,105 @@ +package test + +import ( + "database/sql" + "errors" + "reflect" + "testing" + + "bou.ke/monkey" + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestMsgPrepareAndSubmit(t *testing.T) { + before := getBeforeBalances() + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaBTransIn", req) + err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") + }) + assert.Nil(t, err) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) + assertNotSameBalance(t, before) +} + +func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) { + before := getBeforeBalances() + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaBTransIn", req) + err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return errors.New("an error") + }) + assert.Error(t, err) + assertSameBalance(t, before) +} + +func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) { + before := getBeforeBalances() + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer+"not-exists", gid). + Add(busi.Busi+"/SagaBTransIn", req) + err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") + }) + assert.Error(t, err) + assertSameBalance(t, before) +} + +func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) { + if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit + return + } + before := getBeforeBalances() + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaBTransIn", req) + var g *monkey.PatchGuard + err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + g = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error { + logger.Debugf("tx.Commit rollback and return error in test") + _ = tx.Rollback() + return errors.New("test error for patch") + }) + return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") + }) + g.Unpatch() + assert.Error(t, err) + cronTransOnceForwardNow(180) + assertSameBalance(t, before) +} + +func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) { + if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit + return + } + before := getBeforeBalances() + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaBTransIn", req) + var guard *monkey.PatchGuard + err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") + guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error { + guard.Unpatch() + _ = tx.Commit() + return errors.New("test error for patch") + }) + return err + }) + assert.Error(t, err) + cronTransOnceForwardNow(180) + assertNotSameBalance(t, before) +} diff --git a/test/msg_grpc_test.go b/test/msg_grpc_test.go index e228fbb..4701903 100644 --- a/test/msg_grpc_test.go +++ b/test/msg_grpc_test.go @@ -31,7 +31,7 @@ func TestMsgGrpcTimeoutSuccess(t *testing.T) { msg := genGrpcMsg(dtmimp.GetFuncName()) err := msg.Prepare("") assert.Nil(t, err) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) cronTransOnceForwardNow(180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) @@ -48,10 +48,10 @@ func TestMsgGrpcTimeoutFailed(t *testing.T) { msg := genGrpcMsg(dtmimp.GetFuncName()) msg.Prepare("") assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) cronTransOnceForwardNow(180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultFailure) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure) cronTransOnceForwardNow(180) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) @@ -62,6 +62,6 @@ func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc { msg := dtmgrpc.NewMsgGrpc(dtmutil.DefaultGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransOut", req). Add(busi.BusiGrpc+"/busi.Busi/TransIn", req) - msg.QueryPrepared = fmt.Sprintf("%s/busi.Busi/CanSubmit", busi.BusiGrpc) + msg.QueryPrepared = fmt.Sprintf("%s/busi.Busi/QueryPrepared", busi.BusiGrpc) return msg } diff --git a/test/msg_options_test.go b/test/msg_options_test.go index 97cfbe3..1f60810 100644 --- a/test/msg_options_test.go +++ b/test/msg_options_test.go @@ -47,7 +47,7 @@ func TestMsgOptionsTimeoutFailed(t *testing.T) { assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) cronTransOnceForwardNow(60) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultFailure) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure) cronTransOnceForwardNow(180) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) } diff --git a/test/msg_test.go b/test/msg_test.go index e9c1475..2330364 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -29,7 +29,7 @@ func TestMsgTimeoutSuccess(t *testing.T) { msg := genMsg(dtmimp.GetFuncName()) msg.Prepare("") assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) cronTransOnceForwardNow(180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing) @@ -45,10 +45,13 @@ func TestMsgTimeoutFailed(t *testing.T) { msg := genMsg(dtmimp.GetFuncName()) msg.Prepare("") assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultOngoing) + busi.MainSwitch.QueryPreparedResult.SetOnce("OTHER_ERROR") cronTransOnceForwardNow(180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - busi.MainSwitch.CanSubmitResult.SetOnce(dtmcli.ResultFailure) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) + cronTransOnceForwardNow(360) + assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) + busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure) cronTransOnceForwardNow(180) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) @@ -71,6 +74,6 @@ func genMsg(gid string) *dtmcli.Msg { msg := dtmcli.NewMsg(dtmutil.DefaultHttpServer, gid). Add(busi.Busi+"/TransOut", &req). Add(busi.Busi+"/TransIn", &req) - msg.QueryPrepared = busi.Busi + "/CanSubmit" + msg.QueryPrepared = busi.Busi + "/QueryPrepared" return msg } diff --git a/test/store_test.go b/test/store_test.go index c2413f7..b711c76 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -88,10 +88,6 @@ func TestStoreLockTrans(t *testing.T) { assert.Nil(t, g2) } -func TestStoreWait(t *testing.T) { - registry.WaitStoreUp() -} - func TestUpdateBranches(t *testing.T) { if !conf.Store.IsDB() { _, err := registry.GetStore().UpdateBranches(nil, nil) diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index 87c3aea..376b3ca 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -51,8 +51,7 @@ func TestTccBarrierRollback(t *testing.T) { } func TestTccBarrierDisorder(t *testing.T) { - ua1 := busi.GetUserAccountByUid(1) - ua2 := busi.GetUserAccountByUid(2) + before := getBeforeBalances() cancelFinishedChan := make(chan string, 2) cancelCanReturnChan := make(chan string, 2) gid := dtmimp.GetFuncName() @@ -123,8 +122,7 @@ func TestTccBarrierDisorder(t *testing.T) { assert.Error(t, err, fmt.Errorf("a cancelled tcc")) assert.Equal(t, []string{StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) assert.Equal(t, StatusFailed, getTransStatus(gid)) - assert.True(t, busi.IsEqual(ua1, busi.GetUserAccountByUid(1))) - assert.True(t, busi.IsEqual(ua2, busi.GetUserAccountByUid(2))) + assertSameBalance(t, before) } func TestTccBarrierPanic(t *testing.T) { diff --git a/test/types.go b/test/types.go index 4fcbe34..c583a28 100644 --- a/test/types.go +++ b/test/types.go @@ -7,6 +7,7 @@ package test import ( + "testing" "time" "github.com/dtm-labs/dtm/dtmcli" @@ -16,6 +17,7 @@ import ( "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" ) var conf = &config.Config @@ -80,3 +82,23 @@ const ( // StatusAborting status for global trans status. StatusAborting = dtmcli.StatusAborting ) + +func getBeforeBalances() []int { + b1 := busi.GetBalanceByUid(busi.TransOutUID) + b2 := busi.GetBalanceByUid(busi.TransInUID) + return []int{b1, b2} +} + +func assertSameBalance(t *testing.T, before []int) { + b1 := busi.GetBalanceByUid(busi.TransOutUID) + b2 := busi.GetBalanceByUid(busi.TransInUID) + assert.Equal(t, before[0], b1) + assert.Equal(t, before[1], b2) +} + +func assertNotSameBalance(t *testing.T, before []int) { + b1 := busi.GetBalanceByUid(busi.TransOutUID) + b2 := busi.GetBalanceByUid(busi.TransInUID) + assert.NotEqual(t, before[0], b1) + assert.Equal(t, before[0]+before[1], b1+b2) +}