Browse Source

add 2 error test case

pull/328/head
yedf2 4 years ago
parent
commit
3327befaec
  1. 2
      dtmgrpc/workflow/rpc.go
  2. 2
      dtmgrpc/workflow/utils.go
  3. 8
      test/busi/base_types.go
  4. 10
      test/msg_barrier_mongo_test.go
  5. 12
      test/msg_barrier_redis_test.go
  6. 12
      test/msg_barrier_test.go
  7. 2
      test/msg_delay_test.go
  8. 10
      test/msg_grpc_barrier_redis_test.go
  9. 6
      test/msg_grpc_barrier_test.go
  10. 6
      test/msg_jrpc_test.go
  11. 2
      test/msg_test.go
  12. 2
      test/saga_barrier_mongo_test.go
  13. 2
      test/saga_barrier_redis_test.go
  14. 4
      test/saga_barrier_test.go
  15. 2
      test/saga_grpc_barrier_test.go
  16. 4
      test/saga_grpc_test.go
  17. 6
      test/saga_test.go
  18. 4
      test/tcc_barrier_test.go
  19. 2
      test/tcc_cover_test.go
  20. 2
      test/tcc_grpc_cover_test.go
  21. 6
      test/tcc_grpc_test.go
  22. 2
      test/tcc_jrpc_test.go
  23. 6
      test/tcc_old_test.go
  24. 10
      test/tcc_test.go
  25. 43
      test/workflow_base_test.go
  26. 131
      test/workflow_grpc_test.go
  27. 107
      test/workflow_http_test.go
  28. 152
      test/workflow_ongoing_test.go
  29. 356
      test/workflow_test.go
  30. 63
      test/workflow_xa_test.go
  31. 4
      test/xa_cover_test.go
  32. 4
      test/xa_grpc_test.go
  33. 8
      test/xa_test.go

2
dtmgrpc/workflow/rpc.go

@ -4,7 +4,6 @@ import (
"context"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"google.golang.org/protobuf/types/known/emptypb"
@ -49,7 +48,6 @@ func (wf *Workflow) submit(status string) error {
}
func (wf *Workflow) registerBranch(res []byte, branchID string, op string, status string) error {
logger.Errorf("registerBranch: %s %s %s", branchID, op, status)
if wf.Protocol == dtmimp.ProtocolHTTP {
return dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{
"data": string(res),

2
dtmgrpc/workflow/utils.go

@ -122,6 +122,8 @@ func stepResultFromHTTP(resp *http.Response, err error) *stepResult {
sr.Status = dtmcli.StatusSucceed
} else if resp.StatusCode == http.StatusConflict {
sr.Status = dtmcli.StatusFailed
} else {
sr.Error = errors.New(string(sr.Data))
}
}
return sr

8
test/busi/base_types.go

@ -72,8 +72,8 @@ func (t *ReqHTTP) String() string {
return fmt.Sprintf("amount: %d transIn: %s transOut: %s", t.Amount, t.TransInResult, t.TransOutResult)
}
// GenTransReq 1
func GenTransReq(amount int, outFailed bool, inFailed bool) *ReqHTTP {
// GenReqHTTP 1
func GenReqHTTP(amount int, outFailed bool, inFailed bool) *ReqHTTP {
return &ReqHTTP{
Amount: amount,
TransOutResult: dtmimp.If(outFailed, dtmcli.ResultFailure, "").(string),
@ -81,8 +81,8 @@ func GenTransReq(amount int, outFailed bool, inFailed bool) *ReqHTTP {
}
}
// GenBusiReq 1
func GenBusiReq(amount int, outFailed bool, inFailed bool) *BusiReq {
// GenReqGrpc 1
func GenReqGrpc(amount int, outFailed bool, inFailed bool) *ReqGrpc {
return &BusiReq{
Amount: int64(amount),
TransOutResult: dtmimp.If(outFailed, dtmcli.ResultFailure, "").(string),

10
test/msg_barrier_mongo_test.go

@ -14,7 +14,7 @@ import (
func TestMsgMongoDoSucceed(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaMongoTransIn", req)
err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -32,7 +32,7 @@ func TestMsgMongoDoSucceed(t *testing.T) {
func TestMsgMongoDoBusiFailed(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaMongoTransIn", req)
err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -45,7 +45,7 @@ func TestMsgMongoDoBusiFailed(t *testing.T) {
func TestMsgMongoDoBusiLater(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
@ -70,7 +70,7 @@ func TestMsgMongoDoBusiLater(t *testing.T) {
func TestMsgMongoDoCommitFailed(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaMongoTransIn", req)
err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -87,7 +87,7 @@ func TestMsgMongoDoCommitFailed(t *testing.T) {
func TestMsgMongoDoCommitAfterFailed(t *testing.T) {
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaMongoTransIn", req)
err := msg.DoAndSubmit(Busi+"/MongoQueryPrepared", func(bb *dtmcli.BranchBarrier) error {

12
test/msg_barrier_redis_test.go

@ -13,7 +13,7 @@ import (
func TestMsgRedisDo(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -29,7 +29,7 @@ func TestMsgRedisDo(t *testing.T) {
func TestMsgRedisDoBusiFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -42,7 +42,7 @@ func TestMsgRedisDoBusiFailed(t *testing.T) {
func TestMsgRedisDoBusiLater(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
@ -65,7 +65,7 @@ func TestMsgRedisDoBusiLater(t *testing.T) {
func TestMsgRedisDoPrepareFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer+"not-exists", gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -78,7 +78,7 @@ func TestMsgRedisDoPrepareFailed(t *testing.T) {
func TestMsgRedisDoCommitFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
@ -91,7 +91,7 @@ func TestMsgRedisDoCommitFailed(t *testing.T) {
func TestMsgRedisDoCommitAfterFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {

12
test/msg_barrier_test.go

@ -17,7 +17,7 @@ import (
func TestMsgDoAndSubmit(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
@ -33,7 +33,7 @@ func TestMsgDoAndSubmit(t *testing.T) {
func TestMsgDoAndSubmitBusiFailed(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
@ -46,7 +46,7 @@ func TestMsgDoAndSubmitBusiFailed(t *testing.T) {
func TestMsgDoAndSubmitBusiLater(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := dtmcli.GetRestyClient().R().
SetQueryParams(map[string]string{
"trans_type": "msg",
@ -69,7 +69,7 @@ func TestMsgDoAndSubmitBusiLater(t *testing.T) {
func TestMsgDoAndSubmitPrepareFailed(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer+"not-exists", gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
@ -85,7 +85,7 @@ func TestMsgDoAndSubmitCommitFailed(t *testing.T) {
}
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var g *monkey.PatchGuard
@ -108,7 +108,7 @@ func TestMsgDoAndSubmitCommitAfterFailed(t *testing.T) {
}
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var guard *monkey.PatchGuard

2
test/msg_delay_test.go

@ -12,7 +12,7 @@ import (
)
func genMsgDelay(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req).SetDelay(10)

10
test/msg_grpc_barrier_redis_test.go

@ -14,7 +14,7 @@ import (
func TestMsgGrpcRedisDo(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
@ -30,7 +30,7 @@ func TestMsgGrpcRedisDo(t *testing.T) {
func TestMsgGrpcRedisDoBusiFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
@ -43,7 +43,7 @@ func TestMsgGrpcRedisDoBusiFailed(t *testing.T) {
func TestMsgGrpcRedisDoPrepareFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer+"not-exists", gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
@ -56,7 +56,7 @@ func TestMsgGrpcRedisDoPrepareFailed(t *testing.T) {
func TestMsgGrpcRedisDoCommitFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
@ -69,7 +69,7 @@ func TestMsgGrpcRedisDoCommitFailed(t *testing.T) {
func TestMsgGrpcRedisDoCommitAfterFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {

6
test/msg_grpc_barrier_test.go

@ -17,7 +17,7 @@ import (
func TestMsgGrpcPrepareAndSubmit(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
@ -36,7 +36,7 @@ func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) {
}
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
var guard *monkey.PatchGuard
@ -60,7 +60,7 @@ func TestMsgGrpcPrepareAndSubmitCommitFailed(t *testing.T) {
}
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var g *monkey.PatchGuard

6
test/msg_jrpc_test.go

@ -49,7 +49,7 @@ func TestMsgJrpcResults(t *testing.T) {
func TestMsgJrpcDoAndSubmit(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
msg.Protocol = dtmimp.Jrpc
@ -66,7 +66,7 @@ func TestMsgJrpcDoAndSubmit(t *testing.T) {
func TestMsgJrpcDoAndSubmitBusiFailed(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
msg.Protocol = dtmimp.Jrpc
@ -135,7 +135,7 @@ func TestMsgJprcAbnormal2(t *testing.T) {
}
func genJrpcMsg(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.BusiJrpcURL+"TransIn", &req)

2
test/msg_test.go

@ -68,7 +68,7 @@ func TestMsgAbnormal(t *testing.T) {
}
func genMsg(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)

2
test/saga_barrier_mongo_test.go

@ -38,7 +38,7 @@ func TestSagaBarrierMongoRollback(t *testing.T) {
}
func genSagaBarrierMongo(gid string, transInFailed bool) *dtmcli.Saga {
req := busi.GenTransReq(30, false, transInFailed)
req := busi.GenReqHTTP(30, false, transInFailed)
req.Store = "mongo"
return dtmcli.NewSaga(DtmServer, gid).
Add(Busi+"/SagaMongoTransOut", Busi+"/SagaMongoTransOutCom", req).

2
test/saga_barrier_redis_test.go

@ -40,7 +40,7 @@ func TestSagaBarrierRedisRollback(t *testing.T) {
}
func genSagaBarrierRedis(gid string) *dtmcli.Saga {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
req.Store = "redis"
return dtmcli.NewSaga(DtmServer, gid).
Add(Busi+"/SagaRedisTransIn", Busi+"/SagaRedisTransInCom", req).

4
test/saga_barrier_test.go

@ -34,14 +34,14 @@ func TestSagaBarrierRollback(t *testing.T) {
}
func genSagaBarrier(gid string, outFailed, inFailed bool) *dtmcli.Saga {
req := busi.GenTransReq(30, outFailed, inFailed)
req := busi.GenReqHTTP(30, outFailed, inFailed)
return dtmcli.NewSaga(DtmServer, gid).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCom", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCom", req)
}
func TestSagaBarrier2Normal(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
saga := dtmcli.NewSaga(DtmServer, gid).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCom", req).

2
test/saga_grpc_barrier_test.go

@ -36,7 +36,7 @@ func TestSagaGrpcBarrierRollback(t *testing.T) {
func genSagaGrpcBarrier(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid)
req := busi.GenBusiReq(30, outFailed, inFailed)
req := busi.GenReqGrpc(30, outFailed, inFailed)
saga.Add(busi.BusiGrpc+"/busi.Busi/TransOutBSaga", busi.BusiGrpc+"/busi.Busi/TransOutRevertBSaga", req)
saga.Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", busi.BusiGrpc+"/busi.Busi/TransInRevertBSaga", req)
return saga

4
test/saga_grpc_test.go

@ -83,7 +83,7 @@ func TestSagaGrpcNormalWait(t *testing.T) {
func TestSagaGrpcEmptyUrl(t *testing.T) {
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, dtmimp.GetFuncName())
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
saga.Add(busi.BusiGrpc+"/busi.Busi/TransOut", busi.BusiGrpc+"/busi.Busi/TransOutRevert", req)
saga.Add("", busi.BusiGrpc+"/busi.Busi/TransInRevert", req)
saga.Submit()
@ -95,7 +95,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) {
//nolint: unparam
func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid)
req := busi.GenBusiReq(30, outFailed, inFailed)
req := busi.GenReqGrpc(30, outFailed, inFailed)
saga.Add(busi.BusiGrpc+"/busi.Busi/TransOut", busi.BusiGrpc+"/busi.Busi/TransOutRevert", req)
saga.Add(busi.BusiGrpc+"/busi.Busi/TransIn", busi.BusiGrpc+"/busi.Busi/TransInRevert", req)
return saga

6
test/saga_test.go

@ -78,7 +78,7 @@ func TestSagaAbnormal(t *testing.T) {
func TestSagaEmptyUrl(t *testing.T) {
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmimp.GetFuncName())
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
saga.Add(busi.Busi+"/TransOut", "", &req)
saga.Add("", "", &req)
saga.Submit()
@ -89,7 +89,7 @@ func TestSagaEmptyUrl(t *testing.T) {
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
req := busi.GenTransReq(30, outFailed, inFailed)
req := busi.GenReqHTTP(30, outFailed, inFailed)
saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req)
saga.Add(busi.Busi+"/TransIn", busi.Busi+"/TransInRevert", &req)
return saga
@ -97,7 +97,7 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
func genSaga1(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
req := busi.GenTransReq(30, outFailed, inFailed)
req := busi.GenReqHTTP(30, outFailed, inFailed)
saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req)
return saga
}

4
test/tcc_barrier_test.go

@ -22,7 +22,7 @@ import (
)
func TestTccBarrierNormal(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
@ -36,7 +36,7 @@ func TestTccBarrierNormal(t *testing.T) {
}
func TestTccBarrierRollback(t *testing.T) {
req := busi.GenTransReq(30, false, true)
req := busi.GenReqHTTP(30, false, true)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")

2
test/tcc_cover_test.go

@ -32,7 +32,7 @@ func TestTccCoverPanic(t *testing.T) {
}
func TestTccNested(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")

2
test/tcc_grpc_cover_test.go

@ -32,7 +32,7 @@ func TestTccGrpcCoverPanic(t *testing.T) {
}
func TestTccGrpcCoverCallBranch(t *testing.T) {
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {

6
test/tcc_grpc_test.go

@ -21,7 +21,7 @@ import (
)
func TestTccGrpcNormal(t *testing.T) {
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {
r := &emptypb.Empty{}
@ -38,7 +38,7 @@ func TestTccGrpcNormal(t *testing.T) {
func TestTccGrpcRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, true)
req := busi.GenReqGrpc(30, false, true)
err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {
r := &emptypb.Empty{}
err := tcc.CallBranch(req, busi.BusiGrpc+"/busi.Busi/TransOutTcc", busi.BusiGrpc+"/busi.Busi/TransOutConfirm", busi.BusiGrpc+"/busi.Busi/TransOutRevert", r)
@ -56,7 +56,7 @@ func TestTccGrpcRollback(t *testing.T) {
}
func TestTccGrpcNested(t *testing.T) {
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmgrpc.TccGlobalTransaction(dtmutil.DefaultGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {
r := &emptypb.Empty{}

2
test/tcc_jrpc_test.go

@ -12,7 +12,7 @@ import (
)
func TestTccJrpcNormal(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultJrpcServer, gid, func(tcc *dtmcli.Tcc) {
tcc.Protocol = dtmimp.Jrpc

6
test/tcc_old_test.go

@ -12,7 +12,7 @@ import (
)
func TestTccOldNormal(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOutOld", Busi+"/TransOutConfirmOld", Busi+"/TransOutRevertOld")
@ -27,7 +27,7 @@ func TestTccOldNormal(t *testing.T) {
func TestTccOldRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, true)
req := busi.GenReqHTTP(30, false, true)
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, rerr := tcc.CallBranch(req, Busi+"/TransOutOld", Busi+"/TransOutConfirmOld", Busi+"/TransOutRevertOld")
assert.Nil(t, rerr)
@ -43,7 +43,7 @@ func TestTccOldRollback(t *testing.T) {
}
func TestTccOldTimeout(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
timeoutChan := make(chan int, 1)

10
test/tcc_test.go

@ -18,7 +18,7 @@ import (
)
func TestTccNormal(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
@ -33,7 +33,7 @@ func TestTccNormal(t *testing.T) {
func TestTccRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, true)
req := busi.GenReqHTTP(30, false, true)
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, rerr := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, rerr)
@ -50,7 +50,7 @@ func TestTccRollback(t *testing.T) {
}
func TestTccTimeout(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
timeoutChan := make(chan int, 1)
@ -73,7 +73,7 @@ func TestTccTimeout(t *testing.T) {
}
func TestTccCompatible(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
@ -88,7 +88,7 @@ func TestTccCompatible(t *testing.T) {
}
func TestTccHeaders(t *testing.T) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultHTTPServer, gid, func(t *dtmcli.Tcc) {
t.BranchHeaders = map[string]string{

43
test/workflow_base_test.go

@ -0,0 +1,43 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"testing"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/stretchr/testify/assert"
)
func TestWorkflowBranchConflict(t *testing.T) {
gid := dtmimp.GetFuncName()
store := dtmsvr.GetStore()
now := time.Now()
g := &storage.TransGlobalStore{
Gid: gid,
Status: dtmcli.StatusPrepared,
NextCronTime: &now,
}
err := store.MaySaveNewTrans(g, []storage.TransBranchStore{
{
BranchID: "00",
Op: dtmimp.OpAction,
},
})
assert.Nil(t, err)
err = dtmimp.CatchP(func() {
store.LockGlobalSaveBranches(gid, dtmcli.StatusPrepared, []storage.TransBranchStore{
{BranchID: "00", Op: dtmimp.OpAction},
}, -1)
})
assert.Error(t, err)
store.ChangeGlobalStatus(g, StatusSucceed, []string{}, true)
}

131
test/workflow_grpc_test.go

@ -0,0 +1,131 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"database/sql"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestWorkflowGrpcSimple(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
_, err := busi.BusiCli.TransOutBSaga(wf.NewBranchCtx(), &req)
if err != nil {
return err
}
_, err = busi.BusiCli.TransInBSaga(wf.NewBranchCtx(), &req)
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
func TestWorkflowGrpcNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if err != nil {
return err
}
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err
})
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req)
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
func TestWorkflowMixed(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := &busi.BusiReq{Amount: 30}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if err != nil {
return err
}
_, err = wf.NewBranch().OnBranchCommit(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInConfirm(wf.Context, &req)
return err
}).OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
req2 := &busi.ReqHTTP{Amount: 30}
_, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert")
return err
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "")
return nil, err
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess)
})
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
waitTransProcessed(gid)
}
func TestWorkflowGrpcError(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
req := &busi.BusiReq{Amount: 30}
gid := dtmimp.GetFuncName()
busi.MainSwitch.TransOutResult.SetOnce("ERROR")
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
_, err := busi.BusiCli.TransOut(wf.NewBranchCtx(), &req)
if err != nil {
return err
}
_, err = busi.BusiCli.TransIn(wf.NewBranchCtx(), &req)
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err)
go waitTransProcessed(gid)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}

107
test/workflow_http_test.go

@ -0,0 +1,107 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"database/sql"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestWorkflowNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
if err != nil {
return err
}
_, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := &busi.ReqHTTP{Amount: 30, TransInResult: dtmimp.ResultFailure}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom")
return err
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "")
})
})
if err != nil {
return err
}
_, err = wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "")
})
}).NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
func TestWorkflowError(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
busi.MainSwitch.TransOutResult.SetOnce("ERROR")
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
if err != nil {
return err
}
_, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
go waitTransProcessed(gid)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}

152
test/workflow_ongoing_test.go

@ -0,0 +1,152 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"database/sql"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
var ongoingStep = 0
func fetchOngoingStep(dest int) bool {
c := ongoingStep
logger.Debugf("ongoing step is: %d", c)
if c == dest {
ongoingStep++
return true
}
return false
}
func TestWorkflowSimpleResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
ongoingStep = 0
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
if fetchOngoingStep(0) {
return dtmcli.ErrOngoing
}
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
return err
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowGrpcRollbackResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
ongoingStep = 0
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
if fetchOngoingStep(0) {
return dtmcli.ErrOngoing
}
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(4) {
return dtmcli.ErrOngoing
}
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if fetchOngoingStep(1) {
return dtmcli.ErrOngoing
}
if err != nil {
return err
}
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(3) {
return dtmcli.ErrOngoing
}
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err
})
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req)
if fetchOngoingStep(2) {
return dtmcli.ErrOngoing
}
return err
})
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrOngoing)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}
func TestWorkflowXaResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
ongoingStep = 0
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
if fetchOngoingStep(0) {
return nil, dtmcli.ErrOngoing
}
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
if fetchOngoingStep(1) {
return nil, dtmcli.ErrOngoing
}
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
if fetchOngoingStep(2) {
return dtmcli.ErrOngoing
}
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrOngoing, err)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}

356
test/workflow_test.go

@ -1,356 +0,0 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"database/sql"
"testing"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestWorkflowNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
if err != nil {
return err
}
_, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowSimpleResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
ongoingStep = 0
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
if fetchOngoingStep(0) {
return dtmcli.ErrOngoing
}
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
return err
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := &busi.ReqHTTP{Amount: 30, TransInResult: dtmimp.ResultFailure}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom")
return err
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "")
})
})
if err != nil {
return err
}
_, err = wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "")
})
}).NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
func TestWorkflowGrpcNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if err != nil {
return err
}
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err
})
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req)
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
var ongoingStep = 0
func fetchOngoingStep(dest int) bool {
c := ongoingStep
logger.Debugf("ongoing step is: %d", c)
if c == dest {
ongoingStep++
return true
}
return false
}
func TestWorkflowGrpcRollbackResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
ongoingStep = 0
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
if fetchOngoingStep(0) {
return dtmcli.ErrOngoing
}
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(4) {
return dtmcli.ErrOngoing
}
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if fetchOngoingStep(1) {
return dtmcli.ErrOngoing
}
if err != nil {
return err
}
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(3) {
return dtmcli.ErrOngoing
}
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err
})
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req)
if fetchOngoingStep(2) {
return dtmcli.ErrOngoing
}
return err
})
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrOngoing)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}
func TestWorkflowXaAction(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
})
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowXaRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
logger.FatalIfError(e)
return nil, dtmcli.ErrFailure
})
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrFailure, err)
waitTransProcessed(gid)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}
func TestWorkflowXaResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
ongoingStep = 0
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
if fetchOngoingStep(0) {
return nil, dtmcli.ErrOngoing
}
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
if fetchOngoingStep(1) {
return nil, dtmcli.ErrOngoing
}
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
if fetchOngoingStep(2) {
return dtmcli.ErrOngoing
}
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrOngoing, err)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowBranchConflict(t *testing.T) {
gid := dtmimp.GetFuncName()
store := dtmsvr.GetStore()
now := time.Now()
g := &storage.TransGlobalStore{
Gid: gid,
Status: dtmcli.StatusPrepared,
NextCronTime: &now,
}
err := store.MaySaveNewTrans(g, []storage.TransBranchStore{
{
BranchID: "00",
Op: dtmimp.OpAction,
},
})
assert.Nil(t, err)
err = dtmimp.CatchP(func() {
store.LockGlobalSaveBranches(gid, dtmcli.StatusPrepared, []storage.TransBranchStore{
{BranchID: "00", Op: dtmimp.OpAction},
}, -1)
})
assert.Error(t, err)
store.ChangeGlobalStatus(g, StatusSucceed, []string{}, true)
}
func TestWorkflowMixed(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := &busi.BusiReq{Amount: 30}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
wf.NewBranch().OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if err != nil {
return err
}
_, err = wf.NewBranch().OnBranchCommit(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInConfirm(wf.Context, &req)
return err
}).OnBranchRollback(func(bb *dtmcli.BranchBarrier) error {
req2 := &busi.ReqHTTP{Amount: 30}
_, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert")
return err
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "")
return nil, err
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess)
})
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
waitTransProcessed(gid)
}

63
test/workflow_xa_test.go

@ -0,0 +1,63 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"database/sql"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestWorkflowXaAction(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
})
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowXaRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
logger.FatalIfError(e)
return nil, dtmcli.ErrFailure
})
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrFailure, err)
waitTransProcessed(gid)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}

4
test/xa_cover_test.go

@ -14,7 +14,7 @@ func TestXaCoverDBError(t *testing.T) {
oldDriver := busi.BusiConf.Driver
gid := dtmimp.GetFuncName()
err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
assert.Nil(t, err)
busi.BusiConf.Driver = "no-driver"
@ -44,7 +44,7 @@ func TestXaCoverGidError(t *testing.T) {
}
gid := dtmimp.GetFuncName() + "-' '"
err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
assert.Error(t, err)
return nil, err

4
test/xa_grpc_test.go

@ -21,7 +21,7 @@ import (
func TestXaGrpcNormal(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmgrpc.XaGlobalTransaction(DtmGrpcServer, gid, func(xa *dtmgrpc.XaGrpc) error {
req := busi.GenBusiReq(30, false, false)
req := busi.GenReqGrpc(30, false, false)
r := &emptypb.Empty{}
err := xa.CallBranch(req, busi.BusiGrpc+"/busi.Busi/TransOutXa", r)
if err != nil {
@ -38,7 +38,7 @@ func TestXaGrpcNormal(t *testing.T) {
func TestXaGrpcRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmgrpc.XaGlobalTransaction(DtmGrpcServer, gid, func(xa *dtmgrpc.XaGrpc) error {
req := busi.GenBusiReq(30, false, true)
req := busi.GenReqGrpc(30, false, true)
r := &emptypb.Empty{}
err := xa.CallBranch(req, busi.BusiGrpc+"/busi.Busi/TransOutXa", r)
if err != nil {

8
test/xa_test.go

@ -21,7 +21,7 @@ import (
func TestXaNormal(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmcli.XaGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
resp, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
if err != nil {
return resp, err
@ -37,7 +37,7 @@ func TestXaNormal(t *testing.T) {
func TestXaDuplicate(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
assert.Nil(t, err)
sdb, err := dtmimp.StandaloneDB(busi.BusiConf)
@ -59,7 +59,7 @@ func TestXaDuplicate(t *testing.T) {
func TestXaRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, true)
req := busi.GenReqHTTP(30, false, true)
resp, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
if err != nil {
return resp, err
@ -107,7 +107,7 @@ func TestXaNotTimeout(t *testing.T) {
timeoutChan <- 0
}()
<-timeoutChan
req := busi.GenTransReq(30, false, false)
req := busi.GenReqHTTP(30, false, false)
_, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
assert.Nil(t, err)
busi.MainSwitch.NextResult.SetOnce(dtmcli.ResultOngoing) // make commit temp error

Loading…
Cancel
Save