From 3327befaece13f617a0b0baa2865cb359c29e658 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Mon, 4 Jul 2022 11:33:07 +0800 Subject: [PATCH] add 2 error test case --- dtmgrpc/workflow/rpc.go | 2 - dtmgrpc/workflow/utils.go | 2 + test/busi/base_types.go | 8 +- test/msg_barrier_mongo_test.go | 10 +- test/msg_barrier_redis_test.go | 12 +- test/msg_barrier_test.go | 12 +- test/msg_delay_test.go | 2 +- test/msg_grpc_barrier_redis_test.go | 10 +- test/msg_grpc_barrier_test.go | 6 +- test/msg_jrpc_test.go | 6 +- test/msg_test.go | 2 +- test/saga_barrier_mongo_test.go | 2 +- test/saga_barrier_redis_test.go | 2 +- test/saga_barrier_test.go | 4 +- test/saga_grpc_barrier_test.go | 2 +- test/saga_grpc_test.go | 4 +- test/saga_test.go | 6 +- test/tcc_barrier_test.go | 4 +- test/tcc_cover_test.go | 2 +- test/tcc_grpc_cover_test.go | 2 +- test/tcc_grpc_test.go | 6 +- test/tcc_jrpc_test.go | 2 +- test/tcc_old_test.go | 6 +- test/tcc_test.go | 10 +- test/workflow_base_test.go | 43 ++++ test/workflow_grpc_test.go | 131 ++++++++++ test/workflow_http_test.go | 107 +++++++++ test/workflow_ongoing_test.go | 152 ++++++++++++ test/workflow_test.go | 356 ---------------------------- test/workflow_xa_test.go | 63 +++++ test/xa_cover_test.go | 4 +- test/xa_grpc_test.go | 4 +- test/xa_test.go | 8 +- 33 files changed, 566 insertions(+), 426 deletions(-) create mode 100644 test/workflow_base_test.go create mode 100644 test/workflow_grpc_test.go create mode 100644 test/workflow_http_test.go create mode 100644 test/workflow_ongoing_test.go delete mode 100644 test/workflow_test.go create mode 100644 test/workflow_xa_test.go diff --git a/dtmgrpc/workflow/rpc.go b/dtmgrpc/workflow/rpc.go index 4e07fd6..3badc5e 100644 --- a/dtmgrpc/workflow/rpc.go +++ b/dtmgrpc/workflow/rpc.go @@ -4,7 +4,6 @@ import ( "context" "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" "google.golang.org/protobuf/types/known/emptypb" @@ -49,7 +48,6 @@ func (wf *Workflow) submit(status string) error { } func (wf *Workflow) registerBranch(res []byte, branchID string, op string, status string) error { - logger.Errorf("registerBranch: %s %s %s", branchID, op, status) if wf.Protocol == dtmimp.ProtocolHTTP { return dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{ "data": string(res), diff --git a/dtmgrpc/workflow/utils.go b/dtmgrpc/workflow/utils.go index fcadcd6..e5abc38 100644 --- a/dtmgrpc/workflow/utils.go +++ b/dtmgrpc/workflow/utils.go @@ -122,6 +122,8 @@ func stepResultFromHTTP(resp *http.Response, err error) *stepResult { sr.Status = dtmcli.StatusSucceed } else if resp.StatusCode == http.StatusConflict { sr.Status = dtmcli.StatusFailed + } else { + sr.Error = errors.New(string(sr.Data)) } } return sr diff --git a/test/busi/base_types.go b/test/busi/base_types.go index 42dcc9e..6c97bb6 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -72,8 +72,8 @@ func (t *ReqHTTP) String() string { return fmt.Sprintf("amount: %d transIn: %s transOut: %s", t.Amount, t.TransInResult, t.TransOutResult) } -// GenTransReq 1 -func GenTransReq(amount int, outFailed bool, inFailed bool) *ReqHTTP { +// GenReqHTTP 1 +func GenReqHTTP(amount int, outFailed bool, inFailed bool) *ReqHTTP { return &ReqHTTP{ Amount: amount, TransOutResult: dtmimp.If(outFailed, dtmcli.ResultFailure, "").(string), @@ -81,8 +81,8 @@ func GenTransReq(amount int, outFailed bool, inFailed bool) *ReqHTTP { } } -// GenBusiReq 1 -func GenBusiReq(amount int, outFailed bool, inFailed bool) *BusiReq { +// GenReqGrpc 1 +func GenReqGrpc(amount int, outFailed bool, inFailed bool) *ReqGrpc { return &BusiReq{ Amount: int64(amount), TransOutResult: dtmimp.If(outFailed, dtmcli.ResultFailure, "").(string), diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index 614226b..0e5d8c3 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -14,7 +14,7 @@ import ( func TestMsgMongoDoSucceed(t *testing.T) { before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaMongoTransIn", req) err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -32,7 +32,7 @@ func TestMsgMongoDoSucceed(t *testing.T) { func TestMsgMongoDoBusiFailed(t *testing.T) { before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaMongoTransIn", req) err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -45,7 +45,7 @@ func TestMsgMongoDoBusiFailed(t *testing.T) { func TestMsgMongoDoBusiLater(t *testing.T) { before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ "trans_type": "msg", @@ -70,7 +70,7 @@ func TestMsgMongoDoBusiLater(t *testing.T) { func TestMsgMongoDoCommitFailed(t *testing.T) { before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaMongoTransIn", req) err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -87,7 +87,7 @@ func TestMsgMongoDoCommitFailed(t *testing.T) { func TestMsgMongoDoCommitAfterFailed(t *testing.T) { before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaMongoTransIn", req) err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error { diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go index 9d3ff7e..d111b94 100644 --- a/test/msg_barrier_redis_test.go +++ b/test/msg_barrier_redis_test.go @@ -13,7 +13,7 @@ import ( func TestMsgRedisDo(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaRedisTransIn", req) err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -29,7 +29,7 @@ func TestMsgRedisDo(t *testing.T) { func TestMsgRedisDoBusiFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaRedisTransIn", req) err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -42,7 +42,7 @@ func TestMsgRedisDoBusiFailed(t *testing.T) { func TestMsgRedisDoBusiLater(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ "trans_type": "msg", @@ -65,7 +65,7 @@ func TestMsgRedisDoBusiLater(t *testing.T) { func TestMsgRedisDoPrepareFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer+"not-exists", gid). Add(busi.Busi+"/SagaRedisTransIn", req) err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -78,7 +78,7 @@ func TestMsgRedisDoPrepareFailed(t *testing.T) { func TestMsgRedisDoCommitFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaRedisTransIn", req) err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { @@ -91,7 +91,7 @@ func TestMsgRedisDoCommitFailed(t *testing.T) { func TestMsgRedisDoCommitAfterFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaRedisTransIn", req) err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go index eba5db4..6ed525d 100644 --- a/test/msg_barrier_test.go +++ b/test/msg_barrier_test.go @@ -17,7 +17,7 @@ import ( func TestMsgDoAndSubmit(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { @@ -33,7 +33,7 @@ func TestMsgDoAndSubmit(t *testing.T) { func TestMsgDoAndSubmitBusiFailed(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { @@ -46,7 +46,7 @@ func TestMsgDoAndSubmitBusiFailed(t *testing.T) { func TestMsgDoAndSubmitBusiLater(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := dtmcli.GetRestyClient().R(). SetQueryParams(map[string]string{ "trans_type": "msg", @@ -69,7 +69,7 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) { func TestMsgDoAndSubmitPrepareFailed(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer+"not-exists", gid). Add(busi.Busi+"/SagaBTransIn", req) err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { @@ -85,7 +85,7 @@ func TestMsgDoAndSubmitCommitFailed(t *testing.T) { } before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) var g *monkey.PatchGuard @@ -108,7 +108,7 @@ func TestMsgDoAndSubmitCommitAfterFailed(t *testing.T) { } before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) var guard *monkey.PatchGuard diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go index 9b9d030..5aa866f 100644 --- a/test/msg_delay_test.go +++ b/test/msg_delay_test.go @@ -12,7 +12,7 @@ import ( ) func genMsgDelay(gid string) *dtmcli.Msg { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid). Add(busi.Busi+"/TransOut", &req). Add(busi.Busi+"/TransIn", &req).SetDelay(10) diff --git a/test/msg_grpc_barrier_redis_test.go b/test/msg_grpc_barrier_redis_test.go index c3b2d1f..c9525f3 100644 --- a/test/msg_grpc_barrier_redis_test.go +++ b/test/msg_grpc_barrier_redis_test.go @@ -14,7 +14,7 @@ import ( func TestMsgGrpcRedisDo(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { @@ -30,7 +30,7 @@ func TestMsgGrpcRedisDo(t *testing.T) { func TestMsgGrpcRedisDoBusiFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { @@ -43,7 +43,7 @@ func TestMsgGrpcRedisDoBusiFailed(t *testing.T) { func TestMsgGrpcRedisDoPrepareFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer+"not-exists", gid). Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { @@ -56,7 +56,7 @@ func TestMsgGrpcRedisDoPrepareFailed(t *testing.T) { func TestMsgGrpcRedisDoCommitFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { @@ -69,7 +69,7 @@ func TestMsgGrpcRedisDoCommitFailed(t *testing.T) { func TestMsgGrpcRedisDoCommitAfterFailed(t *testing.T) { before := getBeforeBalances("redis") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { diff --git a/test/msg_grpc_barrier_test.go b/test/msg_grpc_barrier_test.go index b850f72..ac75b40 100644 --- a/test/msg_grpc_barrier_test.go +++ b/test/msg_grpc_barrier_test.go @@ -17,7 +17,7 @@ import ( func TestMsgGrpcPrepareAndSubmit(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req) err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { @@ -36,7 +36,7 @@ func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) { } before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req) var guard *monkey.PatchGuard @@ -60,7 +60,7 @@ func TestMsgGrpcPrepareAndSubmitCommitFailed(t *testing.T) { } before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.Busi+"/SagaBTransIn", req) var g *monkey.PatchGuard diff --git a/test/msg_jrpc_test.go b/test/msg_jrpc_test.go index 5562783..4c4fe94 100644 --- a/test/msg_jrpc_test.go +++ b/test/msg_jrpc_test.go @@ -49,7 +49,7 @@ func TestMsgJrpcResults(t *testing.T) { func TestMsgJrpcDoAndSubmit(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid). Add(busi.Busi+"/SagaBTransIn", req) msg.Protocol = dtmimp.Jrpc @@ -66,7 +66,7 @@ func TestMsgJrpcDoAndSubmit(t *testing.T) { func TestMsgJrpcDoAndSubmitBusiFailed(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid). Add(busi.Busi+"/SagaBTransIn", req) msg.Protocol = dtmimp.Jrpc @@ -135,7 +135,7 @@ func TestMsgJprcAbnormal2(t *testing.T) { } func genJrpcMsg(gid string) *dtmcli.Msg { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid). Add(busi.Busi+"/TransOut", &req). Add(busi.BusiJrpcURL+"TransIn", &req) diff --git a/test/msg_test.go b/test/msg_test.go index 5f629d5..9ce4197 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -68,7 +68,7 @@ func TestMsgAbnormal(t *testing.T) { } func genMsg(gid string) *dtmcli.Msg { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid). Add(busi.Busi+"/TransOut", &req). Add(busi.Busi+"/TransIn", &req) diff --git a/test/saga_barrier_mongo_test.go b/test/saga_barrier_mongo_test.go index 3937475..ebf468f 100644 --- a/test/saga_barrier_mongo_test.go +++ b/test/saga_barrier_mongo_test.go @@ -38,7 +38,7 @@ func TestSagaBarrierMongoRollback(t *testing.T) { } func genSagaBarrierMongo(gid string, transInFailed bool) *dtmcli.Saga { - req := busi.GenTransReq(30, false, transInFailed) + req := busi.GenReqHTTP(30, false, transInFailed) req.Store = "mongo" return dtmcli.NewSaga(DtmServer, gid). Add(Busi+"/SagaMongoTransOut", Busi+"/SagaMongoTransOutCom", req). diff --git a/test/saga_barrier_redis_test.go b/test/saga_barrier_redis_test.go index c37dd29..5e732c1 100644 --- a/test/saga_barrier_redis_test.go +++ b/test/saga_barrier_redis_test.go @@ -40,7 +40,7 @@ func TestSagaBarrierRedisRollback(t *testing.T) { } func genSagaBarrierRedis(gid string) *dtmcli.Saga { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) req.Store = "redis" return dtmcli.NewSaga(DtmServer, gid). Add(Busi+"/SagaRedisTransIn", Busi+"/SagaRedisTransInCom", req). diff --git a/test/saga_barrier_test.go b/test/saga_barrier_test.go index 45c9057..77a94e8 100644 --- a/test/saga_barrier_test.go +++ b/test/saga_barrier_test.go @@ -34,14 +34,14 @@ func TestSagaBarrierRollback(t *testing.T) { } func genSagaBarrier(gid string, outFailed, inFailed bool) *dtmcli.Saga { - req := busi.GenTransReq(30, outFailed, inFailed) + req := busi.GenReqHTTP(30, outFailed, inFailed) return dtmcli.NewSaga(DtmServer, gid). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCom", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCom", req) } func TestSagaBarrier2Normal(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() saga := dtmcli.NewSaga(DtmServer, gid). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCom", req). diff --git a/test/saga_grpc_barrier_test.go b/test/saga_grpc_barrier_test.go index ebb5e8a..7beecd1 100644 --- a/test/saga_grpc_barrier_test.go +++ b/test/saga_grpc_barrier_test.go @@ -36,7 +36,7 @@ func TestSagaGrpcBarrierRollback(t *testing.T) { func genSagaGrpcBarrier(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid) - req := busi.GenBusiReq(30, outFailed, inFailed) + req := busi.GenReqGrpc(30, outFailed, inFailed) saga.Add(busi.BusiGrpc+"/busi.Busi/TransOutBSaga", busi.BusiGrpc+"/busi.Busi/TransOutRevertBSaga", req) saga.Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", busi.BusiGrpc+"/busi.Busi/TransInRevertBSaga", req) return saga diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index 2128bbb..f7cc57a 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -83,7 +83,7 @@ func TestSagaGrpcNormalWait(t *testing.T) { func TestSagaGrpcEmptyUrl(t *testing.T) { saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, dtmimp.GetFuncName()) - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) saga.Add(busi.BusiGrpc+"/busi.Busi/TransOut", busi.BusiGrpc+"/busi.Busi/TransOutRevert", req) saga.Add("", busi.BusiGrpc+"/busi.Busi/TransInRevert", req) saga.Submit() @@ -95,7 +95,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) { //nolint: unparam func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid) - req := busi.GenBusiReq(30, outFailed, inFailed) + req := busi.GenReqGrpc(30, outFailed, inFailed) saga.Add(busi.BusiGrpc+"/busi.Busi/TransOut", busi.BusiGrpc+"/busi.Busi/TransOutRevert", req) saga.Add(busi.BusiGrpc+"/busi.Busi/TransIn", busi.BusiGrpc+"/busi.Busi/TransInRevert", req) return saga diff --git a/test/saga_test.go b/test/saga_test.go index c1ed49a..118ea9b 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -78,7 +78,7 @@ func TestSagaAbnormal(t *testing.T) { func TestSagaEmptyUrl(t *testing.T) { saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmimp.GetFuncName()) - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) saga.Add(busi.Busi+"/TransOut", "", &req) saga.Add("", "", &req) saga.Submit() @@ -89,7 +89,7 @@ func TestSagaEmptyUrl(t *testing.T) { func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) - req := busi.GenTransReq(30, outFailed, inFailed) + req := busi.GenReqHTTP(30, outFailed, inFailed) saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req) saga.Add(busi.Busi+"/TransIn", busi.Busi+"/TransInRevert", &req) return saga @@ -97,7 +97,7 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { func genSaga1(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) - req := busi.GenTransReq(30, outFailed, inFailed) + req := busi.GenReqHTTP(30, outFailed, inFailed) saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req) return saga } diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index 9246e44..59e0503 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -22,7 +22,7 @@ import ( ) func TestTccBarrierNormal(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") @@ -36,7 +36,7 @@ func TestTccBarrierNormal(t *testing.T) { } func TestTccBarrierRollback(t *testing.T) { - req := busi.GenTransReq(30, false, true) + req := busi.GenReqHTTP(30, false, true) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") diff --git a/test/tcc_cover_test.go b/test/tcc_cover_test.go index 00ba711..4f66270 100644 --- a/test/tcc_cover_test.go +++ b/test/tcc_cover_test.go @@ -32,7 +32,7 @@ func TestTccCoverPanic(t *testing.T) { } func TestTccNested(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") diff --git a/test/tcc_grpc_cover_test.go b/test/tcc_grpc_cover_test.go index a667e72..1018dc0 100644 --- a/test/tcc_grpc_cover_test.go +++ b/test/tcc_grpc_cover_test.go @@ -32,7 +32,7 @@ func TestTccGrpcCoverPanic(t *testing.T) { } func TestTccGrpcCoverCallBranch(t *testing.T) { - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) gid := dtmimp.GetFuncName() err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { diff --git a/test/tcc_grpc_test.go b/test/tcc_grpc_test.go index 6e5e745..a48bde7 100644 --- a/test/tcc_grpc_test.go +++ b/test/tcc_grpc_test.go @@ -21,7 +21,7 @@ import ( ) func TestTccGrpcNormal(t *testing.T) { - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) gid := dtmimp.GetFuncName() err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { r := &emptypb.Empty{} @@ -38,7 +38,7 @@ func TestTccGrpcNormal(t *testing.T) { func TestTccGrpcRollback(t *testing.T) { gid := dtmimp.GetFuncName() - req := busi.GenBusiReq(30, false, true) + req := busi.GenReqGrpc(30, false, true) err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { r := &emptypb.Empty{} err := tcc.CallBranch(req, busi.BusiGrpc+"/busi.Busi/TransOutTcc", busi.BusiGrpc+"/busi.Busi/TransOutConfirm", busi.BusiGrpc+"/busi.Busi/TransOutRevert", r) @@ -56,7 +56,7 @@ func TestTccGrpcRollback(t *testing.T) { } func TestTccGrpcNested(t *testing.T) { - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) gid := dtmimp.GetFuncName() err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { r := &emptypb.Empty{} diff --git a/test/tcc_jrpc_test.go b/test/tcc_jrpc_test.go index 31bdafe..ea29d84 100644 --- a/test/tcc_jrpc_test.go +++ b/test/tcc_jrpc_test.go @@ -12,7 +12,7 @@ import ( ) func TestTccJrpcNormal(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultJrpcServer, gid, func(tcc *dtmcli.Tcc) { tcc.Protocol = dtmimp.Jrpc diff --git a/test/tcc_old_test.go b/test/tcc_old_test.go index 5aca2e1..4e38562 100644 --- a/test/tcc_old_test.go +++ b/test/tcc_old_test.go @@ -12,7 +12,7 @@ import ( ) func TestTccOldNormal(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(req, Busi+"/TransOutOld", Busi+"/TransOutConfirmOld", Busi+"/TransOutRevertOld") @@ -27,7 +27,7 @@ func TestTccOldNormal(t *testing.T) { func TestTccOldRollback(t *testing.T) { gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, true) + req := busi.GenReqHTTP(30, false, true) err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, rerr := tcc.CallBranch(req, Busi+"/TransOutOld", Busi+"/TransOutConfirmOld", Busi+"/TransOutRevertOld") assert.Nil(t, rerr) @@ -43,7 +43,7 @@ func TestTccOldRollback(t *testing.T) { } func TestTccOldTimeout(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() timeoutChan := make(chan int, 1) diff --git a/test/tcc_test.go b/test/tcc_test.go index 8297afa..a1964ec 100644 --- a/test/tcc_test.go +++ b/test/tcc_test.go @@ -18,7 +18,7 @@ import ( ) func TestTccNormal(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") @@ -33,7 +33,7 @@ func TestTccNormal(t *testing.T) { func TestTccRollback(t *testing.T) { gid := dtmimp.GetFuncName() - req := busi.GenTransReq(30, false, true) + req := busi.GenReqHTTP(30, false, true) err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, rerr := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.Nil(t, rerr) @@ -50,7 +50,7 @@ func TestTccRollback(t *testing.T) { } func TestTccTimeout(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() timeoutChan := make(chan int, 1) @@ -73,7 +73,7 @@ func TestTccTimeout(t *testing.T) { } func TestTccCompatible(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") @@ -88,7 +88,7 @@ func TestTccCompatible(t *testing.T) { } func TestTccHeaders(t *testing.T) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) gid := dtmimp.GetFuncName() err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultHTTPServer, gid, func(t *dtmcli.Tcc) { t.BranchHeaders = map[string]string{ diff --git a/test/workflow_base_test.go b/test/workflow_base_test.go new file mode 100644 index 0000000..885dba9 --- /dev/null +++ b/test/workflow_base_test.go @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 yedf. All rights reserved. + * Use of this source code is governed by a BSD-style + * license that can be found in the LICENSE file. + */ + +package test + +import ( + "testing" + "time" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr" + "github.com/dtm-labs/dtm/dtmsvr/storage" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowBranchConflict(t *testing.T) { + gid := dtmimp.GetFuncName() + store := dtmsvr.GetStore() + now := time.Now() + g := &storage.TransGlobalStore{ + Gid: gid, + Status: dtmcli.StatusPrepared, + NextCronTime: &now, + } + err := store.MaySaveNewTrans(g, []storage.TransBranchStore{ + { + BranchID: "00", + Op: dtmimp.OpAction, + }, + }) + assert.Nil(t, err) + err = dtmimp.CatchP(func() { + store.LockGlobalSaveBranches(gid, dtmcli.StatusPrepared, []storage.TransBranchStore{ + {BranchID: "00", Op: dtmimp.OpAction}, + }, -1) + }) + assert.Error(t, err) + store.ChangeGlobalStatus(g, StatusSucceed, []string{}, true) +} diff --git a/test/workflow_grpc_test.go b/test/workflow_grpc_test.go new file mode 100644 index 0000000..2c7f1b2 --- /dev/null +++ b/test/workflow_grpc_test.go @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 yedf. All rights reserved. + * Use of this source code is governed by a BSD-style + * license that can be found in the LICENSE file. + */ + +package test + +import ( + "database/sql" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowGrpcSimple(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + _, err := busi.BusiCli.TransOutBSaga(wf.NewBranchCtx(), &req) + if err != nil { + return err + } + _, err = busi.BusiCli.TransInBSaga(wf.NewBranchCtx(), &req) + return err + }) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Error(t, err, dtmcli.ErrFailure) + assert.Equal(t, StatusFailed, getTransStatus(gid)) + waitTransProcessed(gid) +} + +func TestWorkflowGrpcNormal(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) + return err + }) + _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) + if err != nil { + return err + } + wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) + return err + }) + _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) + return err + }) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Error(t, err, dtmcli.ErrFailure) + assert.Equal(t, StatusFailed, getTransStatus(gid)) + waitTransProcessed(gid) +} + +func TestWorkflowMixed(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + req := &busi.BusiReq{Amount: 30} + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + + wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) + return err + }) + _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) + if err != nil { + return err + } + + _, err = wf.NewBranch().OnBranchCommit(func(bb *dtmcli.BranchBarrier) error { + _, err := busi.BusiCli.TransInConfirm(wf.Context, &req) + return err + }).OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + req2 := &busi.ReqHTTP{Amount: 30} + _, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert") + return err + }).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { + err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "") + return nil, err + }) + if err != nil { + return err + } + _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess) + }) + return err + }) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Nil(t, err) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) + waitTransProcessed(gid) +} + +func TestWorkflowGrpcError(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + req := &busi.BusiReq{Amount: 30} + gid := dtmimp.GetFuncName() + busi.MainSwitch.TransOutResult.SetOnce("ERROR") + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + _, err := busi.BusiCli.TransOut(wf.NewBranchCtx(), &req) + if err != nil { + return err + } + _, err = busi.BusiCli.TransIn(wf.NewBranchCtx(), &req) + return err + }) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Error(t, err) + go waitTransProcessed(gid) + cronTransOnceForwardCron(t, gid, 1000) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} diff --git a/test/workflow_http_test.go b/test/workflow_http_test.go new file mode 100644 index 0000000..2aa82a8 --- /dev/null +++ b/test/workflow_http_test.go @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2021 yedf. All rights reserved. + * Use of this source code is governed by a BSD-style + * license that can be found in the LICENSE file. + */ + +package test + +import ( + "database/sql" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowNormal(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + req := busi.GenReqHTTP(30, false, false) + gid := dtmimp.GetFuncName() + + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.ReqHTTP + dtmimp.MustUnmarshal(data, &req) + _, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") + if err != nil { + return err + } + _, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn") + if err != nil { + return err + } + return nil + }) + + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} + +func TestWorkflowRollback(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + + req := &busi.ReqHTTP{Amount: 30, TransInResult: dtmimp.ResultFailure} + gid := dtmimp.GetFuncName() + + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.ReqHTTP + dtmimp.MustUnmarshal(data, &req) + _, err := wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + _, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom") + return err + }).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { + return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "") + }) + }) + if err != nil { + return err + } + _, err = wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "") + }) + }).NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn") + if err != nil { + return err + } + return nil + }) + + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) + assert.Error(t, err, dtmcli.ErrFailure) + assert.Equal(t, StatusFailed, getTransStatus(gid)) + waitTransProcessed(gid) +} + +func TestWorkflowError(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + req := busi.GenReqHTTP(30, false, false) + gid := dtmimp.GetFuncName() + busi.MainSwitch.TransOutResult.SetOnce("ERROR") + + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.ReqHTTP + dtmimp.MustUnmarshal(data, &req) + _, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") + if err != nil { + return err + } + _, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn") + if err != nil { + return err + } + return nil + }) + + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) + assert.Error(t, err) + go waitTransProcessed(gid) + cronTransOnceForwardCron(t, gid, 1000) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} diff --git a/test/workflow_ongoing_test.go b/test/workflow_ongoing_test.go new file mode 100644 index 0000000..8a6fcec --- /dev/null +++ b/test/workflow_ongoing_test.go @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2021 yedf. All rights reserved. + * Use of this source code is governed by a BSD-style + * license that can be found in the LICENSE file. + */ + +package test + +import ( + "database/sql" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +var ongoingStep = 0 + +func fetchOngoingStep(dest int) bool { + c := ongoingStep + logger.Debugf("ongoing step is: %d", c) + if c == dest { + ongoingStep++ + return true + } + return false +} + +func TestWorkflowSimpleResume(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + req := busi.GenReqHTTP(30, false, false) + gid := dtmimp.GetFuncName() + ongoingStep = 0 + + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + if fetchOngoingStep(0) { + return dtmcli.ErrOngoing + } + var req busi.ReqHTTP + dtmimp.MustUnmarshal(data, &req) + _, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") + return err + }) + + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) + assert.Error(t, err) + go waitTransProcessed(gid) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} + +func TestWorkflowGrpcRollbackResume(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + gid := dtmimp.GetFuncName() + ongoingStep = 0 + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + if fetchOngoingStep(0) { + return dtmcli.ErrOngoing + } + wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + if fetchOngoingStep(4) { + return dtmcli.ErrOngoing + } + _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) + return err + }) + _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) + if fetchOngoingStep(1) { + return dtmcli.ErrOngoing + } + if err != nil { + return err + } + wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { + if fetchOngoingStep(3) { + return dtmcli.ErrOngoing + } + _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) + return err + }) + _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) + if fetchOngoingStep(2) { + return dtmcli.ErrOngoing + } + return err + }) + req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Error(t, err, dtmcli.ErrOngoing) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + // next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan + go waitTransProcessed(gid) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusFailed, getTransStatus(gid)) +} + +func TestWorkflowXaResume(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + ongoingStep = 0 + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + if fetchOngoingStep(0) { + return nil, dtmcli.ErrOngoing + } + return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + if fetchOngoingStep(1) { + return nil, dtmcli.ErrOngoing + } + return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + if fetchOngoingStep(2) { + return dtmcli.ErrOngoing + } + + return err + }) + err := workflow.Execute(gid, gid, nil) + assert.Equal(t, dtmcli.ErrOngoing, err) + + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + // next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan + go waitTransProcessed(gid) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} diff --git a/test/workflow_test.go b/test/workflow_test.go deleted file mode 100644 index 1ef3b1b..0000000 --- a/test/workflow_test.go +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Copyright (c) 2021 yedf. All rights reserved. - * Use of this source code is governed by a BSD-style - * license that can be found in the LICENSE file. - */ - -package test - -import ( - "database/sql" - "testing" - "time" - - "github.com/dtm-labs/dtm/dtmcli" - "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" - "github.com/dtm-labs/dtm/dtmgrpc/workflow" - "github.com/dtm-labs/dtm/dtmsvr" - "github.com/dtm-labs/dtm/dtmsvr/storage" - "github.com/dtm-labs/dtm/test/busi" - "github.com/stretchr/testify/assert" -) - -func TestWorkflowNormal(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) - req := busi.GenTransReq(30, false, false) - gid := dtmimp.GetFuncName() - - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - var req busi.ReqHTTP - dtmimp.MustUnmarshal(data, &req) - _, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") - if err != nil { - return err - } - _, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn") - if err != nil { - return err - } - return nil - }) - - err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) - assert.Nil(t, err) - waitTransProcessed(gid) - assert.Equal(t, StatusSucceed, getTransStatus(gid)) -} - -func TestWorkflowSimpleResume(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) - req := busi.GenTransReq(30, false, false) - gid := dtmimp.GetFuncName() - ongoingStep = 0 - - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - if fetchOngoingStep(0) { - return dtmcli.ErrOngoing - } - var req busi.ReqHTTP - dtmimp.MustUnmarshal(data, &req) - _, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") - return err - }) - - err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) - assert.Error(t, err) - go waitTransProcessed(gid) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusSucceed, getTransStatus(gid)) -} - -func TestWorkflowRollback(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) - - req := &busi.ReqHTTP{Amount: 30, TransInResult: dtmimp.ResultFailure} - gid := dtmimp.GetFuncName() - - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - var req busi.ReqHTTP - dtmimp.MustUnmarshal(data, &req) - _, err := wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - _, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom") - return err - }).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { - return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { - return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "") - }) - }) - if err != nil { - return err - } - _, err = wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { - return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "") - }) - }).NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn") - if err != nil { - return err - } - return nil - }) - - err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) - assert.Error(t, err, dtmcli.ErrFailure) - assert.Equal(t, StatusFailed, getTransStatus(gid)) - waitTransProcessed(gid) -} - -func TestWorkflowGrpcNormal(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) - req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} - gid := dtmimp.GetFuncName() - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - var req busi.BusiReq - dtmgimp.MustProtoUnmarshal(data, &req) - wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) - return err - }) - _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) - if err != nil { - return err - } - wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) - return err - }) - _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) - return err - }) - err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) - assert.Error(t, err, dtmcli.ErrFailure) - assert.Equal(t, StatusFailed, getTransStatus(gid)) - waitTransProcessed(gid) -} - -var ongoingStep = 0 - -func fetchOngoingStep(dest int) bool { - c := ongoingStep - logger.Debugf("ongoing step is: %d", c) - if c == dest { - ongoingStep++ - return true - } - return false -} - -func TestWorkflowGrpcRollbackResume(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) - gid := dtmimp.GetFuncName() - ongoingStep = 0 - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - var req busi.BusiReq - dtmgimp.MustProtoUnmarshal(data, &req) - if fetchOngoingStep(0) { - return dtmcli.ErrOngoing - } - wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - if fetchOngoingStep(4) { - return dtmcli.ErrOngoing - } - _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) - return err - }) - _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) - if fetchOngoingStep(1) { - return dtmcli.ErrOngoing - } - if err != nil { - return err - } - wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - if fetchOngoingStep(3) { - return dtmcli.ErrOngoing - } - _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) - return err - }) - _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) - if fetchOngoingStep(2) { - return dtmcli.ErrOngoing - } - return err - }) - req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} - err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) - assert.Error(t, err, dtmcli.ErrOngoing) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - // next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan - go waitTransProcessed(gid) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusFailed, getTransStatus(gid)) -} - -func TestWorkflowXaAction(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) - gid := dtmimp.GetFuncName() - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - _, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) - }) - if err != nil { - return err - } - _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) - }) - return err - }) - err := workflow.Execute(gid, gid, nil) - assert.Nil(t, err) - waitTransProcessed(gid) - assert.Equal(t, StatusSucceed, getTransStatus(gid)) -} - -func TestWorkflowXaRollback(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) - gid := dtmimp.GetFuncName() - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - _, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) - }) - if err != nil { - return err - } - _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) - logger.FatalIfError(e) - return nil, dtmcli.ErrFailure - }) - return err - }) - err := workflow.Execute(gid, gid, nil) - assert.Equal(t, dtmcli.ErrFailure, err) - waitTransProcessed(gid) - assert.Equal(t, StatusFailed, getTransStatus(gid)) -} - -func TestWorkflowXaResume(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) - ongoingStep = 0 - gid := dtmimp.GetFuncName() - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - _, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - if fetchOngoingStep(0) { - return nil, dtmcli.ErrOngoing - } - return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) - }) - if err != nil { - return err - } - _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - if fetchOngoingStep(1) { - return nil, dtmcli.ErrOngoing - } - return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) - }) - if err != nil { - return err - } - if fetchOngoingStep(2) { - return dtmcli.ErrOngoing - } - - return err - }) - err := workflow.Execute(gid, gid, nil) - assert.Equal(t, dtmcli.ErrOngoing, err) - - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusPrepared, getTransStatus(gid)) - // next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan - go waitTransProcessed(gid) - cronTransOnceForwardNow(t, gid, 1000) - assert.Equal(t, StatusSucceed, getTransStatus(gid)) -} - -func TestWorkflowBranchConflict(t *testing.T) { - gid := dtmimp.GetFuncName() - store := dtmsvr.GetStore() - now := time.Now() - g := &storage.TransGlobalStore{ - Gid: gid, - Status: dtmcli.StatusPrepared, - NextCronTime: &now, - } - err := store.MaySaveNewTrans(g, []storage.TransBranchStore{ - { - BranchID: "00", - Op: dtmimp.OpAction, - }, - }) - assert.Nil(t, err) - err = dtmimp.CatchP(func() { - store.LockGlobalSaveBranches(gid, dtmcli.StatusPrepared, []storage.TransBranchStore{ - {BranchID: "00", Op: dtmimp.OpAction}, - }, -1) - }) - assert.Error(t, err) - store.ChangeGlobalStatus(g, StatusSucceed, []string{}, true) -} - -func TestWorkflowMixed(t *testing.T) { - workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) - req := &busi.BusiReq{Amount: 30} - gid := dtmimp.GetFuncName() - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { - var req busi.BusiReq - dtmgimp.MustProtoUnmarshal(data, &req) - - wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) - return err - }) - _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) - if err != nil { - return err - } - - _, err = wf.NewBranch().OnBranchCommit(func(bb *dtmcli.BranchBarrier) error { - _, err := busi.BusiCli.TransInConfirm(wf.Context, &req) - return err - }).OnBranchRollback(func(bb *dtmcli.BranchBarrier) error { - req2 := &busi.ReqHTTP{Amount: 30} - _, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert") - return err - }).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { - err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "") - return nil, err - }) - if err != nil { - return err - } - _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess) - }) - return err - }) - err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) - assert.Nil(t, err) - assert.Equal(t, StatusSucceed, getTransStatus(gid)) - waitTransProcessed(gid) -} diff --git a/test/workflow_xa_test.go b/test/workflow_xa_test.go new file mode 100644 index 0000000..b488f7c --- /dev/null +++ b/test/workflow_xa_test.go @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 yedf. All rights reserved. + * Use of this source code is governed by a BSD-style + * license that can be found in the LICENSE file. + */ + +package test + +import ( + "database/sql" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowXaAction(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) + }) + return err + }) + err := workflow.Execute(gid, gid, nil) + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} + +func TestWorkflowXaRollback(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) + logger.FatalIfError(e) + return nil, dtmcli.ErrFailure + }) + return err + }) + err := workflow.Execute(gid, gid, nil) + assert.Equal(t, dtmcli.ErrFailure, err) + waitTransProcessed(gid) + assert.Equal(t, StatusFailed, getTransStatus(gid)) +} diff --git a/test/xa_cover_test.go b/test/xa_cover_test.go index 8c602ae..371b8be 100644 --- a/test/xa_cover_test.go +++ b/test/xa_cover_test.go @@ -14,7 +14,7 @@ func TestXaCoverDBError(t *testing.T) { oldDriver := busi.BusiConf.Driver gid := dtmimp.GetFuncName() err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") assert.Nil(t, err) busi.BusiConf.Driver = "no-driver" @@ -44,7 +44,7 @@ func TestXaCoverGidError(t *testing.T) { } gid := dtmimp.GetFuncName() + "-' '" err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") assert.Error(t, err) return nil, err diff --git a/test/xa_grpc_test.go b/test/xa_grpc_test.go index 678dac6..c6f334c 100644 --- a/test/xa_grpc_test.go +++ b/test/xa_grpc_test.go @@ -21,7 +21,7 @@ import ( func TestXaGrpcNormal(t *testing.T) { gid := dtmimp.GetFuncName() err := dtmgrpc.XaGlobalTransaction(DtmGrpcServer, gid, func(xa *dtmgrpc.XaGrpc) error { - req := busi.GenBusiReq(30, false, false) + req := busi.GenReqGrpc(30, false, false) r := &emptypb.Empty{} err := xa.CallBranch(req, busi.BusiGrpc+"/busi.Busi/TransOutXa", r) if err != nil { @@ -38,7 +38,7 @@ func TestXaGrpcNormal(t *testing.T) { func TestXaGrpcRollback(t *testing.T) { gid := dtmimp.GetFuncName() err := dtmgrpc.XaGlobalTransaction(DtmGrpcServer, gid, func(xa *dtmgrpc.XaGrpc) error { - req := busi.GenBusiReq(30, false, true) + req := busi.GenReqGrpc(30, false, true) r := &emptypb.Empty{} err := xa.CallBranch(req, busi.BusiGrpc+"/busi.Busi/TransOutXa", r) if err != nil { diff --git a/test/xa_test.go b/test/xa_test.go index a15c339..49016a7 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -21,7 +21,7 @@ import ( func TestXaNormal(t *testing.T) { gid := dtmimp.GetFuncName() err := dtmcli.XaGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) resp, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") if err != nil { return resp, err @@ -37,7 +37,7 @@ func TestXaNormal(t *testing.T) { func TestXaDuplicate(t *testing.T) { gid := dtmimp.GetFuncName() err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") assert.Nil(t, err) sdb, err := dtmimp.StandaloneDB(busi.BusiConf) @@ -59,7 +59,7 @@ func TestXaDuplicate(t *testing.T) { func TestXaRollback(t *testing.T) { gid := dtmimp.GetFuncName() err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := busi.GenTransReq(30, false, true) + req := busi.GenReqHTTP(30, false, true) resp, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") if err != nil { return resp, err @@ -107,7 +107,7 @@ func TestXaNotTimeout(t *testing.T) { timeoutChan <- 0 }() <-timeoutChan - req := busi.GenTransReq(30, false, false) + req := busi.GenReqHTTP(30, false, false) _, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") assert.Nil(t, err) busi.MainSwitch.NextResult.SetOnce(dtmcli.ResultOngoing) // make commit temp error