Browse Source

Merge pull request #195 from dtm-labs/alpha

flash sales bench ok
pull/197/head v1.11.0
yedf2 4 years ago
committed by GitHub
parent
commit
56aea32f35
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .gitignore
  2. 2
      .golangci.yml
  3. 8
      bench/main.go
  4. 18
      bench/svr/http.go
  5. 14
      bench/test-flash-sales.sh
  6. 42
      dtmcli/barrier.go
  7. 73
      dtmcli/barrier_redis.go
  8. 6
      dtmcli/dtmimp/trans_base.go
  9. 36
      dtmcli/msg.go
  10. 2
      dtmcli/tcc.go
  11. 2
      dtmcli/xa.go
  12. 14
      dtmgrpc/dtmgimp/types.go
  13. 36
      dtmgrpc/msg.go
  14. 10
      dtmgrpc/tcc.go
  15. 15
      dtmgrpc/type.go
  16. 10
      dtmgrpc/xa.go
  17. 15
      dtmsvr/trans_status.go
  18. 2
      helper/sync-dtmcli.sh
  19. 47
      test/busi/barrier.go
  20. 6
      test/busi/base_grpc.go
  21. 21
      test/busi/base_http.go
  22. 13
      test/busi/base_types.go
  23. 4
      test/busi/busi.go
  24. 97
      test/busi/busi.pb.go
  25. 7
      test/busi/busi.proto
  26. 180
      test/busi/busi_grpc.pb.go
  27. 6
      test/busi/quick_start.go
  28. 1
      test/busi/startup.go
  29. 13
      test/busi/utils.go
  30. 8
      test/dtmsvr_test.go
  31. 3
      test/main_test.go
  32. 82
      test/msg_barrier_redis_test.go
  33. 13
      test/msg_barrier_test.go
  34. 83
      test/msg_grpc_barrier_redis_test.go
  35. 30
      test/msg_grpc_barrier_test.go
  36. 17
      test/msg_grpc_test.go
  37. 28
      test/msg_options_test.go
  38. 17
      test/msg_test.go
  39. 12
      test/saga_concurrent_test.go
  40. 22
      test/saga_grpc_test.go
  41. 24
      test/saga_options_test.go
  42. 8
      test/saga_test.go
  43. 2
      test/tcc_barrier_test.go
  44. 3
      test/tcc_grpc_cover_test.go
  45. 3
      test/tcc_grpc_test.go
  46. 5
      test/tcc_old_test.go
  47. 5
      test/tcc_test.go
  48. 28
      test/types.go
  49. 5
      test/xa_cover_test.go
  50. 9
      test/xa_test.go

2
.gitignore

@ -8,7 +8,7 @@ dist
.vscode
default.etcd
*/**/*.bolt
bench/bench
# Output file of unit test coverage
coverage.*
profile.*

2
.golangci.yml

@ -1,7 +1,7 @@
run:
deadline: 5m
skip-dirs:
- test
# - test
# - bench
linter-settings:

8
bench/main.go

@ -34,13 +34,13 @@ func main() {
logger.Infof("starting bench server")
config.MustLoadConfig("")
logger.InitLog(conf.LogLevel)
if busi.BusiConf.Driver != "" {
dtmcli.SetCurrentDBType(busi.BusiConf.Driver)
svr.PrepareBenchDB()
}
registry.WaitStoreUp()
dtmsvr.PopulateDB(false)
if os.Args[1] == "db" {
if busi.BusiConf.Driver == "mysql" {
dtmcli.SetCurrentDBType(busi.BusiConf.Driver)
svr.PrepareBenchDB()
}
busi.PopulateDB(false)
} else if os.Args[1] == "redis" || os.Args[1] == "boltdb" {

18
bench/svr/http.go

@ -7,6 +7,7 @@
package svr
import (
"context"
"database/sql"
"fmt"
"os"
@ -132,6 +133,8 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) error { // nolint: unp
return nil
}
var stockKey = "{a}--stock-1"
func benchAddRoute(app *gin.Engine) {
app.POST(benchAPI+"/TransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c)
@ -186,4 +189,19 @@ func benchAddRoute(app *gin.Engine) {
saga.WaitResult = true
return saga.Submit()
}))
app.Any(benchAPI+"/benchFlashSalesReset", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
_, err := busi.RedisGet().FlushAll(context.Background()).Result()
logger.FatalIfError(err)
_, err = busi.RedisGet().Set(context.Background(), stockKey, "0", 86400*time.Second).Result()
logger.FatalIfError(err)
return nil
}))
app.Any(benchAPI+"/benchFlashSales", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
gid := "{a}-" + shortuuid.New()
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add("", nil)
return msg.DoAndSubmit("", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), stockKey, -1, 86400)
})
}))
}

14
bench/test-flash-sales.sh

@ -0,0 +1,14 @@
# !/bin/bash
set -x
export LOG_LEVEL=fatal
export STORE_DRIVER=redis
export STORE_HOST=localhost
export STORE_PORT=6379
export BUSI_REDIS=localhost:6379
./bench redis &
echo 'sleeping 3s for dtm bench to run up.' && sleep 3
curl "http://127.0.0.1:8083/api/busi_bench/benchFlashSalesReset"
ab -n 300000 -c 20 "http://127.0.0.1:8083/api/busi_bench/benchFlashSales"
pkill bench

42
dtmcli/barrier.go

@ -13,7 +13,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8"
)
// BarrierBusiFunc type for busi func
@ -115,44 +114,3 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
}
return err
}
// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, originOp, bb.BarrierID)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
if v == false or v + ARGV[1] < 0 then
return 'FAILURE'
end
if e1 ~= false then
return
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
if ARGV[2] ~= '' then
local e2 = redis.call('GET', KEYS[3])
if e2 == false then
redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3])
return
end
end
redis.call('INCRBY', KEYS[1], ARGV[1])
`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}

73
dtmcli/barrier_redis.go

@ -0,0 +1,73 @@
package dtmcli
import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8"
)
// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bb.BarrierID = bb.BarrierID + 1
bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
if v == false or v + ARGV[1] < 0 then
return 'FAILURE'
end
if e1 ~= false then
return
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
if ARGV[2] ~= '' then
local e2 = redis.call('GET', KEYS[3])
if e2 == false then
redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3])
return
end
end
redis.call('INCRBY', KEYS[1], ARGV[1])
`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}
// RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01")
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1])
if v == false then
redis.call('SET', KEYS[1], 'rollback', 'EX', ARGV[1])
v = 'rollback'
end
if v == 'rollback' then
return 'FAILURE'
end
`, []string{bkey1}, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}

6
dtmcli/dtmimp/trans_base.go

@ -106,8 +106,8 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
return TransCallDtm(tb, m, operation)
}
// TransRequestBranch TransBAse request branch result
func TransRequestBranch(t *TransBase, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
// TransRequestBranch TransBase request branch result
func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
resp, err := RestyClient.R().
SetBody(body).
SetQueryParams(map[string]string{
@ -118,7 +118,7 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri
"op": op,
}).
SetHeaders(t.BranchHeaders).
Post(url)
Execute(method, url)
if err == nil {
err = RespAsErrorCompatible(resp)
}

36
dtmcli/msg.go

@ -8,6 +8,7 @@ package dtmcli
import (
"database/sql"
"errors"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
)
@ -40,22 +41,35 @@ func (s *Msg) Submit() error {
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
// DoAndSubmitDB short method for Do on db type. please see Do
func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
return s.DoAndSubmit(queryPrepared, func(bb *BranchBarrier) error {
return bb.CallWithDB(db, busiCall)
})
}
// DoAndSubmit one method for the entire prepare->busi->submit
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = s.Prepare(queryPrepared)
}
if err == nil {
defer func() {
if err != nil && bb.QueryPrepared(db) == ErrFailure {
_ = dtmimp.TransCallDtm(&s.TransBase, s, "abort")
}
}()
err = bb.CallWithDB(db, busiCall)
}
if err == nil {
err = s.Submit()
errb := busiCall(bb)
if errb != nil && !errors.Is(errb, ErrFailure) {
// if busicall return an error other than failure, we will query the result
_, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared)
}
if errors.Is(errb, ErrFailure) || errors.Is(err, ErrFailure) {
_ = dtmimp.TransCallDtm(&s.TransBase, s, "abort")
} else if err == nil {
err = s.Submit()
}
if errb != nil {
return errb
}
}
return err
}

2
dtmcli/tcc.go

@ -75,5 +75,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
if err != nil {
return nil, err
}
return dtmimp.TransRequestBranch(&t.TransBase, body, branchID, BranchTry, tryURL)
return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, BranchTry, tryURL)
}

2
dtmcli/xa.go

@ -101,5 +101,5 @@ func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc Xa
// CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewSubBranchID()
return dtmimp.TransRequestBranch(&x.TransBase, body, branchID, BranchAction, url)
return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, BranchAction, url)
}

14
dtmgrpc/dtmgimp/types.go

@ -13,7 +13,10 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
// GrpcServerLog 打印grpc服务端的日志
@ -46,3 +49,14 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
}
return err
}
// InvokeBranch invoke a url for trans
func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string, reply interface{}, branchID string, op string) error {
server, method, err := dtmdriver.GetDriver().ParseServerMethod(url)
if err != nil {
return err
}
ctx := TransInfo2Ctx(t.Gid, t.TransType, branchID, op, t.Dtm)
ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(t.BranchHeaders)...)
return MustGetGrpcConn(server, isRaw).Invoke(ctx, method, msg, reply)
}

36
dtmgrpc/msg.go

@ -8,6 +8,7 @@ package dtmgrpc
import (
"database/sql"
"errors"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -43,20 +44,35 @@ func (s *MsgGrpc) Submit() error {
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
}
// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *MsgGrpc) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error {
// DoAndSubmitDB short method for Do on db type. please see Do
func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error {
return s.DoAndSubmit(queryPrepared, func(bb *dtmcli.BranchBarrier) error {
return bb.CallWithDB(db, busiCall)
})
}
// DoAndSubmit one method for the entire prepare->busi->submit
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = bb.CallWithDB(db, func(tx *sql.Tx) error {
err := busiCall(tx)
if err == nil {
err = s.Prepare(queryPrepared)
}
return err
})
err = s.Prepare(queryPrepared)
}
if err == nil {
err = s.Submit()
errb := busiCall(bb)
if errb != nil && !errors.Is(err, dtmcli.ErrFailure) {
err = dtmgimp.InvokeBranch(&s.TransBase, true, nil, queryPrepared, &[]byte{}, bb.BranchID, bb.Op)
err = GrpcError2DtmError(err)
}
if errors.Is(err, dtmcli.ErrFailure) || errors.Is(errb, dtmcli.ErrFailure) {
_ = dtmgimp.DtmGrpcCall(&s.TransBase, "Abort")
} else if err == nil {
err = s.Submit()
}
if errb != nil {
return errb
}
}
return err
}

10
dtmgrpc/tcc.go

@ -13,8 +13,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
@ -87,11 +85,5 @@ func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL st
if err != nil {
return err
}
server, method, err := dtmdriver.GetDriver().ParseServerMethod(tryURL)
if err != nil {
return err
}
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm)
ctx = metadata.AppendToOutgoingContext(ctx, dtmgimp.Map2Kvs(t.BranchHeaders)...)
return dtmgimp.MustGetGrpcConn(server, false).Invoke(ctx, method, busiMsg, reply)
return dtmgimp.InvokeBranch(&t.TransBase, false, busiMsg, tryURL, reply, branchID, "try")
}

15
dtmgrpc/type.go

@ -30,6 +30,21 @@ func DtmError2GrpcError(res interface{}) error {
return e
}
// GrpcError2DtmError translate grpc error to dtm error
func GrpcError2DtmError(err error) error {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Aborted {
// version lower then v1.10, will specify Ongoing in code Aborted
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ErrOngoing
}
return dtmcli.ErrFailure
} else if ok && st.Code() == codes.FailedPrecondition {
return dtmcli.ErrOngoing
}
return err
}
// MustGenGid must gen a gid from grpcServer
func MustGenGid(grpcServer string) string {
dc := dtmgimp.MustGetDtmClient(grpcServer)

10
dtmgrpc/xa.go

@ -15,7 +15,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtmdriver"
grpc "google.golang.org/grpc"
"google.golang.org/protobuf/proto"
emptypb "google.golang.org/protobuf/types/known/emptypb"
@ -119,12 +118,5 @@ func (xc *XaGrpcClient) XaGlobalTransaction2(gid string, custom func(*XaGrpc), x
// CallBranch call a xa branch
func (x *XaGrpc) CallBranch(msg proto.Message, url string, reply interface{}) error {
server, method, err := dtmdriver.GetDriver().ParseServerMethod(url)
if err != nil {
return err
}
err = dtmgimp.MustGetGrpcConn(server, false).Invoke(
dtmgimp.TransInfo2Ctx(x.Gid, x.TransType, x.NewSubBranchID(), "action", x.Dtm), method, msg, reply)
return err
return dtmgimp.InvokeBranch(&x.TransBase, false, msg, url, reply, x.NewSubBranchID(), "action")
}

15
dtmsvr/trans_status.go

@ -15,11 +15,10 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
func (t *TransGlobal) touchCronTime(ctype cronType) {
@ -92,17 +91,7 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
if err == nil {
return nil
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.Aborted {
// version lower then v1.10, will specify Ongoing in code Aborted
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ErrOngoing
}
return dtmcli.ErrFailure
} else if ok && st.Code() == codes.FailedPrecondition {
return dtmcli.ErrOngoing
}
return err
return dtmgrpc.GrpcError2DtmError(err)
}
dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url))
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).

2
helper/sync-dtmcli.sh

@ -13,7 +13,7 @@ fi
cd ../dtmcli
cp -rf ../dtm/dtmcli/* ./
rm -f *_test.go
rm -f *_test.go logger/*.log
sed -i '' -e 's/dtm-labs\/dtm\//dtm-labs\//g' *.go */**.go
go mod tidy
go build || exit 1

47
test/busi/barrier.go

@ -11,6 +11,8 @@ import (
"database/sql"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
emptypb "google.golang.org/protobuf/types/known/emptypb"
@ -71,24 +73,24 @@ func init() {
})
}))
app.POST(BusiAPI+"/SagaRedisTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/SagaRedisTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
}))
app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
req := reqFrom(c)
if req.TransOutResult != "" {
return dtmcli.String2DtmError(req.TransOutResult)
}
if req.Store == "redis" {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), req.Amount, 7*86400)
if req.Store == config.Redis {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
}
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
@ -96,7 +98,7 @@ func init() {
})
}))
app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
if reqFrom(c).Store == "redis" {
if reqFrom(c).Store == config.Redis {
return nil
}
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
@ -111,7 +113,7 @@ func init() {
func TccBarrierTransOutCancel(c *gin.Context) interface{} {
req := reqFrom(c)
if req.Store == "redis" {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -req.Amount, 7*86400)
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -req.Amount, 7*86400)
}
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, TransOutUID, reqFrom(c).Amount)
@ -146,7 +148,34 @@ func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emp
})
}
func (s *busiServer) TransInRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), int(in.Amount), 86400)
}
func (s *busiServer) TransOutRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), int(-in.Amount), 86400)
}
func (s *busiServer) TransInRevertRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), -int(in.Amount), 86400)
}
func (s *busiServer) TransOutRevertRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), int(in.Amount), 86400)
}
func (s *busiServer) QueryPreparedB(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.QueryPrepared(dbGet().ToSQLDB())
err := barrier.QueryPrepared(dbGet().ToSQLDB())
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
}
func (s *busiServer) QueryPreparedRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
err := barrier.RedisQueryPrepared(RedisGet(), 86400)
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
}

6
test/busi/base_grpc.go

@ -27,13 +27,13 @@ import (
)
// BusiGrpc busi service grpc address
var BusiGrpc string = fmt.Sprintf("localhost:%d", BusiGrpcPort)
var BusiGrpc = fmt.Sprintf("localhost:%d", BusiGrpcPort)
// DtmClient grpc client for dtm
var DtmClient dtmgpb.DtmClient = nil
var DtmClient dtmgpb.DtmClient
// XaGrpcClient XA client connection
var XaGrpcClient *dtmgrpc.XaGrpcClient = nil
var XaGrpcClient *dtmgrpc.XaGrpcClient
func init() {
setupFuncs["XaGrpcSetup"] = func(app *gin.Engine) {

21
test/busi/base_http.go

@ -35,14 +35,17 @@ type setupFunc func(*gin.Engine)
var setupFuncs = map[string]setupFunc{}
// Busi busi service url prefix
var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI)
var Busi = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI)
var XaClient *dtmcli.XaClient = nil
// XaClient 1
var XaClient *dtmcli.XaClient
// SleepCancelHandler 1
type SleepCancelHandler func(c *gin.Context) interface{}
var sleepCancelHandler SleepCancelHandler = nil
var sleepCancelHandler SleepCancelHandler
// SetSleepCancelHandler 1
func SetSleepCancelHandler(handler SleepCancelHandler) {
sleepCancelHandler = handler
}
@ -74,8 +77,9 @@ func BaseAppStartup() *gin.Engine {
v(app)
}
logger.Debugf("Starting busi at: %d", BusiPort)
go app.Run(fmt.Sprintf(":%d", BusiPort))
go func() {
_ = app.Run(fmt.Sprintf(":%d", BusiPort))
}()
return app
}
@ -128,6 +132,11 @@ func BaseAddRoute(app *gin.Engine) {
db := dbGet().ToSQLDB()
return bb.QueryPrepared(db)
}))
app.GET(BusiAPI+"/RedisQueryPrepared", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
logger.Debugf("%s RedisQueryPrepared", c.Query("gid"))
bb := MustBarrierFromGin(c)
return bb.RedisQueryPrepared(RedisGet(), 86400)
}))
app.POST(BusiAPI+"/TransInXa", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error {
return SagaAdjustBalance(db, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
@ -154,7 +163,7 @@ func BaseAddRoute(app *gin.Engine) {
if reqFrom(c).TransOutResult == dtmcli.ResultFailure {
return dtmcli.ErrFailure
}
var dia gorm.Dialector = nil
var dia gorm.Dialector
if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql {
dia = mysql.New(mysql.Config{Conn: db})
} else if dtmcli.GetCurrentDBType() == dtmcli.DBTypePostgres {

13
test/busi/base_types.go

@ -15,6 +15,7 @@ import (
"github.com/gin-gonic/gin"
)
// BusiConf 1
var BusiConf = dtmcli.DBConf{
Driver: "mysql",
Host: "localhost",
@ -22,20 +23,23 @@ var BusiConf = dtmcli.DBConf{
User: "root",
}
// UserAccount 1
type UserAccount struct {
UserId int
UserID int
Balance string
TradingBalance string
}
// TableName 1
func (*UserAccount) TableName() string {
return "dtm_busi.user_account"
}
func GetBalanceByUid(uid int, store string) int {
// GetBalanceByUID 1
func GetBalanceByUID(uid int, store string) int {
if store == "redis" {
rd := RedisGet()
accA, err := rd.Get(rd.Context(), getRedisAccountKey(uid)).Result()
accA, err := rd.Get(rd.Context(), GetRedisAccountKey(uid)).Result()
dtmimp.E2P(err)
return dtmimp.MustAtoi(accA)
}
@ -127,6 +131,7 @@ type mainSwitchType struct {
// MainSwitch controls busi success or fail
var MainSwitch mainSwitchType
func getRedisAccountKey(uid int) string {
// GetRedisAccountKey return redis key for uid
func GetRedisAccountKey(uid int) string {
return fmt.Sprintf("{a}-redis-account-key-%d", uid)
}

4
test/busi/busi.go

@ -13,7 +13,10 @@ import (
status "google.golang.org/grpc/status"
)
// TransOutUID 1
const TransOutUID = 1
// TransInUID 2
const TransInUID = 2
func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error {
@ -58,6 +61,7 @@ func sagaGrpcAdjustBalance(db dtmcli.DB, uid int, amount int64, result string) e
return err
}
// SagaAdjustBalance 1
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
if strings.Contains(result, dtmcli.ResultFailure) {
return dtmcli.ErrFailure

97
test/busi/busi.pb.go

@ -148,7 +148,7 @@ var file_test_busi_busi_proto_rawDesc = []byte{
0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x32, 0x8d, 0x09, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72,
0x65, 0x32, 0xbe, 0x0b, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72,
0x61, 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73,
0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33,
@ -214,15 +214,34 @@ var file_test_busi_busi_proto_rawDesc = []byte{
0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73,
0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70,
0x61, 0x72, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69,
0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50,
0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e,
0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65,
0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52,
0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x0d,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e,
0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49,
0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62,
0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75,
0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62,
0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72,
0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73,
0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72,
0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73,
0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70,
0x61, 0x72, 0x65, 0x64, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -262,30 +281,40 @@ var file_test_busi_busi_proto_depIdxs = []int32{
0, // 15: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq
0, // 16: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq
0, // 17: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq
0, // 18: busi.Busi.QueryPrepared:input_type -> busi.BusiReq
0, // 19: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq
2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty
2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty
2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty
2, // 23: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty
2, // 24: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty
2, // 25: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
2, // 26: busi.Busi.XaNotify:output_type -> google.protobuf.Empty
2, // 27: busi.Busi.TransInXa:output_type -> google.protobuf.Empty
2, // 28: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty
2, // 29: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty
2, // 30: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty
2, // 31: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty
2, // 32: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty
2, // 33: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty
2, // 34: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty
2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty
2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty
2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty
1, // 38: busi.Busi.QueryPrepared:output_type -> busi.BusiReply
2, // 39: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty
20, // [20:40] is the sub-list for method output_type
0, // [0:20] is the sub-list for method input_type
0, // 18: busi.Busi.TransInRedis:input_type -> busi.BusiReq
0, // 19: busi.Busi.TransOutRedis:input_type -> busi.BusiReq
0, // 20: busi.Busi.TransInRevertRedis:input_type -> busi.BusiReq
0, // 21: busi.Busi.TransOutRevertRedis:input_type -> busi.BusiReq
0, // 22: busi.Busi.QueryPrepared:input_type -> busi.BusiReq
0, // 23: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq
0, // 24: busi.Busi.QueryPreparedRedis:input_type -> busi.BusiReq
2, // 25: busi.Busi.TransIn:output_type -> google.protobuf.Empty
2, // 26: busi.Busi.TransOut:output_type -> google.protobuf.Empty
2, // 27: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty
2, // 28: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty
2, // 29: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty
2, // 30: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
2, // 31: busi.Busi.XaNotify:output_type -> google.protobuf.Empty
2, // 32: busi.Busi.TransInXa:output_type -> google.protobuf.Empty
2, // 33: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty
2, // 34: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty
2, // 35: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty
2, // 36: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty
2, // 37: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty
2, // 38: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty
2, // 39: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty
2, // 40: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty
2, // 41: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty
2, // 42: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty
2, // 43: busi.Busi.TransInRedis:output_type -> google.protobuf.Empty
2, // 44: busi.Busi.TransOutRedis:output_type -> google.protobuf.Empty
2, // 45: busi.Busi.TransInRevertRedis:output_type -> google.protobuf.Empty
2, // 46: busi.Busi.TransOutRevertRedis:output_type -> google.protobuf.Empty
1, // 47: busi.Busi.QueryPrepared:output_type -> busi.BusiReply
2, // 48: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty
2, // 49: busi.Busi.QueryPreparedRedis:output_type -> google.protobuf.Empty
25, // [25:50] is the sub-list for method output_type
0, // [0:25] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name

7
test/busi/busi.proto

@ -37,7 +37,14 @@ service Busi {
rpc TransOutRevertBSaga(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutHeaderYes(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {}
rpc TransInRedis(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutRedis(BusiReq) returns (google.protobuf.Empty) {}
rpc TransInRevertRedis(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutRevertRedis(BusiReq) returns (google.protobuf.Empty) {}
rpc QueryPrepared(BusiReq) returns (BusiReply) {}
rpc QueryPreparedB(BusiReq) returns (google.protobuf.Empty) {}
rpc QueryPreparedRedis(BusiReq) returns (google.protobuf.Empty) {}
}

180
test/busi/busi_grpc.pb.go

@ -37,8 +37,13 @@ type BusiClient interface {
TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransInRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransInRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error)
QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
QueryPreparedRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type busiClient struct {
@ -211,6 +216,42 @@ func (c *busiClient) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...
return out, nil
}
func (c *busiClient) TransInRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransInRedis", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransOutRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransOutRedis", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransInRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransInRevertRedis", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransOutRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransOutRevertRedis", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) {
out := new(BusiReply)
err := c.cc.Invoke(ctx, "/busi.Busi/QueryPrepared", in, out, opts...)
@ -229,6 +270,15 @@ func (c *busiClient) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...gr
return out, nil
}
func (c *busiClient) QueryPreparedRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/QueryPreparedRedis", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BusiServer is the server API for Busi service.
// All implementations must embed UnimplementedBusiServer
// for forward compatibility
@ -251,8 +301,13 @@ type BusiServer interface {
TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error)
TransInRedis(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutRedis(context.Context, *BusiReq) (*emptypb.Empty, error)
TransInRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error)
QueryPrepared(context.Context, *BusiReq) (*BusiReply, error)
QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error)
QueryPreparedRedis(context.Context, *BusiReq) (*emptypb.Empty, error)
mustEmbedUnimplementedBusiServer()
}
@ -314,12 +369,27 @@ func (UnimplementedBusiServer) TransOutHeaderYes(context.Context, *BusiReq) (*em
func (UnimplementedBusiServer) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderNo not implemented")
}
func (UnimplementedBusiServer) TransInRedis(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransInRedis not implemented")
}
func (UnimplementedBusiServer) TransOutRedis(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutRedis not implemented")
}
func (UnimplementedBusiServer) TransInRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransInRevertRedis not implemented")
}
func (UnimplementedBusiServer) TransOutRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutRevertRedis not implemented")
}
func (UnimplementedBusiServer) QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method QueryPrepared not implemented")
}
func (UnimplementedBusiServer) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedB not implemented")
}
func (UnimplementedBusiServer) QueryPreparedRedis(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedRedis not implemented")
}
func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {}
// UnsafeBusiServer may be embedded to opt out of forward compatibility for this service.
@ -657,6 +727,78 @@ func _Busi_TransOutHeaderNo_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _Busi_TransInRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransInRedis(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/TransInRedis",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransInRedis(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransOutRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransOutRedis(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/TransOutRedis",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransOutRedis(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransInRevertRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransInRevertRedis(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/TransInRevertRedis",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransInRevertRedis(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransOutRevertRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransOutRevertRedis(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/TransOutRevertRedis",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransOutRevertRedis(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_QueryPrepared_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
@ -693,6 +835,24 @@ func _Busi_QueryPreparedB_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
func _Busi_QueryPreparedRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).QueryPreparedRedis(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/QueryPreparedRedis",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).QueryPreparedRedis(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
// Busi_ServiceDesc is the grpc.ServiceDesc for Busi service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -772,6 +932,22 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
MethodName: "TransOutHeaderNo",
Handler: _Busi_TransOutHeaderNo_Handler,
},
{
MethodName: "TransInRedis",
Handler: _Busi_TransInRedis_Handler,
},
{
MethodName: "TransOutRedis",
Handler: _Busi_TransOutRedis_Handler,
},
{
MethodName: "TransInRevertRedis",
Handler: _Busi_TransInRevertRedis_Handler,
},
{
MethodName: "TransOutRevertRedis",
Handler: _Busi_TransOutRevertRedis_Handler,
},
{
MethodName: "QueryPrepared",
Handler: _Busi_QueryPrepared_Handler,
@ -780,6 +956,10 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
MethodName: "QueryPreparedB",
Handler: _Busi_QueryPreparedB_Handler,
},
{
MethodName: "QueryPreparedRedis",
Handler: _Busi_QueryPreparedRedis_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "test/busi/busi.proto",

6
test/busi/quick_start.go

@ -24,14 +24,18 @@ const qsBusiPort = 8082
var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)
// QsStartSvr quick start: start server
func QsStartSvr() {
app := dtmutil.GetGinApp()
qsAddRoute(app)
logger.Infof("quick start examples listening at %d", qsBusiPort)
go app.Run(fmt.Sprintf(":%d", qsBusiPort))
go func() {
_ = app.Run(fmt.Sprintf(":%d", qsBusiPort))
}()
time.Sleep(100 * time.Millisecond)
}
// QsFireRequest quick start: fire request
func QsFireRequest() string {
req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址

1
test/busi/startup.go

@ -24,4 +24,5 @@ func PopulateDB(skipDrop bool) {
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
_, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear
dtmimp.E2P(err)
SetRedisBothAccount(10000, 10000)
}

13
test/busi/utils.go

@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"os"
"strings"
sync "sync"
"time"
@ -79,8 +80,8 @@ func SetGrpcHeaderForHeadersYes(ctx context.Context, method string, req, reply i
return invoker(ctx, method, req, reply, cc, opts...)
}
// SetHttpHeaderForHeadersYes interceptor to set head for HeadersYes
func SetHttpHeaderForHeadersYes(c *resty.Client, r *resty.Request) error {
// SetHTTPHeaderForHeadersYes interceptor to set head for HeadersYes
func SetHTTPHeaderForHeadersYes(c *resty.Client, r *resty.Request) error {
if b, ok := r.Body.(*dtmcli.Saga); ok && strings.HasSuffix(b.Gid, "HeadersYes") {
logger.Debugf("set test_header for url: %s", r.URL)
r.SetHeader("test_header", "yes")
@ -121,11 +122,12 @@ var (
once sync.Once
)
// RedisGet 1
func RedisGet() *redis.Client {
once.Do(func() {
logger.Debugf("connecting to client redis")
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Addr: dtmimp.OrString(os.Getenv("BUSI_REDIS"), "localhost:6379"),
Username: "root",
Password: "",
})
@ -133,10 +135,11 @@ func RedisGet() *redis.Client {
return rdb
}
// SetRedisBothAccount 1
func SetRedisBothAccount(accountA int, accountB int) {
rd := RedisGet()
_, err := rd.Set(rd.Context(), getRedisAccountKey(TransOutUID), accountA, 0).Result()
_, err := rd.Set(rd.Context(), GetRedisAccountKey(TransOutUID), accountA, 0).Result()
dtmimp.E2P(err)
_, err = rd.Set(rd.Context(), getRedisAccountKey(TransInUID), accountB, 0).Result()
_, err = rd.Set(rd.Context(), GetRedisAccountKey(TransInUID), accountB, 0).Result()
dtmimp.E2P(err)
}

8
test/dtmsvr_test.go

@ -12,6 +12,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
@ -34,13 +35,8 @@ func getBranchesStatus(gid string) []string {
return status
}
func assertSucceed(t *testing.T, gid string) {
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestUpdateBranchAsync(t *testing.T) {
if conf.Store.Driver != "mysql" {
if conf.Store.Driver != config.Mysql {
return
}
conf.UpdateBranchSync = 0

3
test/main_test.go

@ -37,7 +37,7 @@ func TestMain(m *testing.M) {
conf.UpdateBranchSync = 1
dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes)
dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHttpHeaderForHeadersYes)
dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHTTPHeaderForHeadersYes)
dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil })
tenv := os.Getenv("TEST_STORE")
@ -68,4 +68,5 @@ func TestMain(m *testing.M) {
close(dtmsvr.TransProcessedTestChan)
gid, more := <-dtmsvr.TransProcessedTestChan
logger.FatalfIf(more, "extra gid: %s in test chan", gid)
os.Exit(0)
}

82
test/msg_barrier_redis_test.go

@ -0,0 +1,82 @@
package test
import (
"errors"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgRedisDo(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assertNotSameBalance(t, before, "redis")
}
func TestMsgRedisDoBusiFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return errors.New("an error")
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoPrepareFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer+"not-exists", gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoCommitFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return errors.New("after commit error")
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgRedisDoCommitAfterFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaRedisTransIn", req)
err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
err := bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
dtmimp.E2P(err)
return errors.New("an error")
})
assert.Error(t, err)
waitTransProcessed(gid)
assertNotSameBalance(t, before, "redis")
}

13
test/msg_barrier_test.go

@ -20,7 +20,7 @@ func TestMsgPrepareAndSubmit(t *testing.T) {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
assert.Nil(t, err)
@ -36,7 +36,7 @@ func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return errors.New("an error")
})
assert.Error(t, err)
@ -49,7 +49,7 @@ func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(DtmServer+"not-exists", gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
assert.Error(t, err)
@ -66,7 +66,7 @@ func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) {
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var g *monkey.PatchGuard
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
g = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
logger.Debugf("tx.Commit rollback and return error in test")
_ = tx.Rollback()
@ -76,7 +76,6 @@ func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) {
})
g.Unpatch()
assert.Error(t, err)
cronTransOnceForwardNow(180)
assertSameBalance(t, before, "mysql")
}
@ -90,7 +89,7 @@ func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) {
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var guard *monkey.PatchGuard
err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
guard.Unpatch()
@ -100,6 +99,6 @@ func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) {
return err
})
assert.Error(t, err)
cronTransOnceForwardNow(180)
waitTransProcessed(gid)
assertNotSameBalance(t, before, "mysql")
}

83
test/msg_grpc_barrier_redis_test.go

@ -0,0 +1,83 @@
package test
import (
"errors"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgGrpcRedisDo(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assertNotSameBalance(t, before, "redis")
}
func TestMsgGrpcRedisDoBusiFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
return errors.New("an error")
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgGrpcRedisDoPrepareFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer+"not-exists", gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgGrpcRedisDoCommitFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
return errors.New("after commit error")
})
assert.Error(t, err)
assertSameBalance(t, before, "redis")
}
func TestMsgGrpcRedisDoCommitAfterFailed(t *testing.T) {
before := getBeforeBalances("redis")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req)
err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error {
err := bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
dtmimp.E2P(err)
return errors.New("an error")
})
assert.Error(t, err)
waitTransProcessed(gid)
assertNotSameBalance(t, before, "redis")
}

30
test/msg_grpc_barrier_test.go

@ -8,6 +8,7 @@ import (
"bou.ke/monkey"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
@ -19,7 +20,7 @@ func TestMsgGrpcPrepareAndSubmit(t *testing.T) {
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
})
assert.Nil(t, err)
@ -39,7 +40,7 @@ func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) {
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
var guard *monkey.PatchGuard
err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
guard.Unpatch()
@ -49,6 +50,29 @@ func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) {
return err
})
assert.Error(t, err)
cronTransOnceForwardNow(180)
waitTransProcessed(gid)
assertNotSameBalance(t, before, "mysql")
}
func TestMsgGrpcPrepareAndSubmitCommitFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
var g *monkey.PatchGuard
err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
g = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
logger.Debugf("tx.Commit rollback and return error in test")
_ = tx.Rollback()
return errors.New("test error for patch")
})
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
})
g.Unpatch()
assert.Error(t, err)
assertSameBalance(t, before, "mysql")
}

17
test/msg_grpc_test.go

@ -28,31 +28,32 @@ func TestMsgGrpcNormal(t *testing.T) {
}
func TestMsgGrpcTimeoutSuccess(t *testing.T) {
msg := genGrpcMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genGrpcMsg(gid)
err := msg.Prepare("")
assert.Nil(t, err)
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
}
func TestMsgGrpcTimeoutFailed(t *testing.T) {
msg := genGrpcMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genGrpcMsg(gid)
msg.Prepare("")
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
}

28
test/msg_options_test.go

@ -16,38 +16,38 @@ import (
)
func TestMsgOptionsTimeout(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genMsg(gid)
msg.Prepare("")
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
cronTransOnceForwardNow(t, gid, 60)
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgOptionsTimeoutCustom(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genMsg(gid)
msg.TimeoutToFail = 120
msg.Prepare("")
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
cronTransOnceForwardNow(t, gid, 60)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgOptionsTimeoutFailed(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genMsg(gid)
msg.TimeoutToFail = 120
msg.Prepare("")
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
cronTransOnceForwardNow(t, gid, 60)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
}

17
test/msg_test.go

@ -26,30 +26,31 @@ func TestMsgNormal(t *testing.T) {
}
func TestMsgTimeoutSuccess(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genMsg(gid)
msg.Prepare("")
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgTimeoutFailed(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
gid := dtmimp.GetFuncName()
msg := genMsg(gid)
msg.Prepare("")
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(360)
cronTransOnceForwardNow(t, gid, 360)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure)
cronTransOnceForwardNow(180)
cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
}

12
test/saga_concurrent_test.go

@ -28,14 +28,14 @@ func TestSagaConNormal(t *testing.T) {
}
func TestSagaConRollbackNormal(t *testing.T) {
sagaCon := genSagaCon(dtmimp.GetFuncName(), true, false)
gid := dtmimp.GetFuncName()
sagaCon := genSagaCon(gid, true, false)
busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing)
err := sagaCon.Submit()
assert.Nil(t, err)
waitTransProcessed(sagaCon.Gid)
assert.Equal(t, StatusAborting, getTransStatus(sagaCon.Gid))
g := cronTransOnce()
assert.Equal(t, sagaCon.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid))
// TODO should fix this
// assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusSucceed, StatusSucceed}, getBranchesStatus(sagaCon.Gid))
@ -61,15 +61,15 @@ func TestSagaConRollbackOrder2(t *testing.T) {
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(sagaCon.Gid))
}
func TestSagaConCommittedOngoing(t *testing.T) {
sagaCon := genSagaCon(dtmimp.GetFuncName(), false, false)
gid := dtmimp.GetFuncName()
sagaCon := genSagaCon(gid, false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
sagaCon.Submit()
waitTransProcessed(sagaCon.Gid)
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(sagaCon.Gid))
g := cronTransOnce()
assert.Equal(t, sagaCon.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(sagaCon.Gid))
}

22
test/saga_grpc_test.go

@ -26,13 +26,13 @@ func TestSagaGrpcNormal(t *testing.T) {
}
func TestSagaGrpcRollback(t *testing.T) {
saga := genSagaGrpc(dtmimp.GetFuncName(), false, true)
gid := dtmimp.GetFuncName()
saga := genSagaGrpc(gid, false, true)
busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusAborting, getTransStatus(saga.Gid))
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
}
@ -57,20 +57,21 @@ func TestSagaGrpcCurrentOrder(t *testing.T) {
}
func TestSagaGrpcCommittedOngoing(t *testing.T) {
saga := genSagaGrpc(dtmimp.GetFuncName(), false, false)
gid := dtmimp.GetFuncName()
saga := genSagaGrpc(gid, false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
func TestSagaGrpcNormalWait(t *testing.T) {
saga := genSagaGrpc(dtmimp.GetFuncName(), false, false)
gid := dtmimp.GetFuncName()
saga := genSagaGrpc(gid, false, false)
saga.WaitResult = true
saga.Submit()
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
@ -89,6 +90,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) {
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
//nolint: unparam
func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid)
req := busi.GenBusiReq(30, outFailed, inFailed)
@ -118,8 +120,7 @@ func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
@ -158,7 +159,6 @@ func TestSagaGrpcCronHeaders(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}

24
test/saga_options_test.go

@ -17,19 +17,20 @@ import (
)
func TestSagaOptionsRetryOngoing(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSaga1(dtmimp.GetFuncName(), false, false)
saga.RetryInterval = 150 // CronForwardDuration is larger than RetryInterval
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
func TestSagaOptionsRetryError(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSaga1(dtmimp.GetFuncName(), false, false)
saga.RetryInterval = 150 // CronForwardDuration is less than 2*RetryInterval
busi.MainSwitch.TransOutResult.SetOnce("ERROR")
@ -38,22 +39,21 @@ func TestSagaOptionsRetryError(t *testing.T) {
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
g := cronTransOnce()
assert.Equal(t, "", g)
g = cronTransOnceForwardCron(360)
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, "")
cronTransOnceForwardCron(t, gid, 360)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
func TestSagaOptionsTimeout(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.TimeoutToFail = 1800
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
cronTransOnceForwardNow(3600)
cronTransOnceForwardNow(t, gid, 3600)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
}
@ -68,6 +68,7 @@ func TestSagaOptionsNormalWait(t *testing.T) {
}
func TestSagaOptionsCommittedOngoingWait(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSaga(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
saga.WaitResult = true
@ -76,8 +77,7 @@ func TestSagaOptionsCommittedOngoingWait(t *testing.T) {
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
waitTransProcessed(saga.Gid)
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
@ -113,8 +113,7 @@ func TestSagaCronPassthroughHeadersYes(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
@ -153,7 +152,6 @@ func TestSagaHeadersYes1(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}

8
test/saga_test.go

@ -34,27 +34,27 @@ func TestSagaRollback(t *testing.T) {
}
func TestSagaOngoingSucceed(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSaga(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
func TestSagaFailed(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSaga(dtmimp.GetFuncName(), false, true)
busi.MainSwitch.TransOutRevertResult.SetOnce("ERROR")
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusAborting, getTransStatus(saga.Gid))
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
}

2
test/tcc_barrier_test.go

@ -97,7 +97,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
logger.Debugf("cron to timeout and then call cancelled twice")
cron := func() {
cronTransOnceForwardNow(300)
cronTransOnceForwardNow(t, gid, 300)
logger.Debugf("cronFinished write")
cronFinished <- "1"
logger.Debugf("cronFinished after write")

3
test/tcc_grpc_cover_test.go

@ -47,6 +47,5 @@ func TestTccGrpcCoverCallBranch(t *testing.T) {
return err
})
assert.Error(t, err)
g := cronTransOnceForwardNow(300)
assert.Equal(t, gid, g)
cronTransOnceForwardNow(t, gid, 300)
}

3
test/tcc_grpc_test.go

@ -49,8 +49,7 @@ func TestTccGrpcRollback(t *testing.T) {
assert.Error(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusAborting, getTransStatus(gid))
g2 := cronTransOnce()
assert.Equal(t, gid, g2)
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
}

5
test/tcc_old_test.go

@ -37,8 +37,7 @@ func TestTccOldRollback(t *testing.T) {
assert.Error(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusAborting, getTransStatus(gid))
g := cronTransOnce()
assert.Equal(t, gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
}
@ -52,7 +51,7 @@ func TestTccOldTimeout(t *testing.T) {
_, err := tcc.CallBranch(req, Busi+"/TransOutOld", Busi+"/TransOutConfirmOld", Busi+"/TransOutRevertOld")
assert.Nil(t, err)
go func() {
cronTransOnceForwardNow(300)
cronTransOnceForwardNow(t, gid, 300)
timeoutChan <- 0
}()
<-timeoutChan

5
test/tcc_test.go

@ -43,8 +43,7 @@ func TestTccRollback(t *testing.T) {
assert.Error(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusAborting, getTransStatus(gid))
g := cronTransOnce()
assert.Equal(t, gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
}
@ -58,7 +57,7 @@ func TestTccTimeout(t *testing.T) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, err)
go func() {
cronTransOnceForwardNow(300)
cronTransOnceForwardNow(t, gid, 300)
timeoutChan <- 0
}()
<-timeoutChan

28
test/types.go

@ -38,12 +38,12 @@ func waitTransProcessed(gid string) {
}
}
func cronTransOnce() string {
gid := dtmsvr.CronTransOnce()
func cronTransOnce(t *testing.T, gid string) {
gid2 := dtmsvr.CronTransOnce()
assert.Equal(t, gid, gid2)
if dtmsvr.TransProcessedTestChan != nil && gid != "" {
waitTransProcessed(gid)
}
return gid
}
var e2p = dtmimp.E2P
@ -54,20 +54,18 @@ type TransGlobal = dtmsvr.TransGlobal
// TransBranch alias
type TransBranch = dtmsvr.TransBranch
func cronTransOnceForwardNow(seconds int) string {
func cronTransOnceForwardNow(t *testing.T, gid string, seconds int) {
old := dtmsvr.NowForwardDuration
dtmsvr.NowForwardDuration = time.Duration(seconds) * time.Second
gid := cronTransOnce()
cronTransOnce(t, gid)
dtmsvr.NowForwardDuration = old
return gid
}
func cronTransOnceForwardCron(seconds int) string {
func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) {
old := dtmsvr.CronForwardDuration
dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second
gid := cronTransOnce()
cronTransOnce(t, gid)
dtmsvr.CronForwardDuration = old
return gid
}
const (
@ -84,21 +82,21 @@ const (
)
func getBeforeBalances(store string) []int {
b1 := busi.GetBalanceByUid(busi.TransOutUID, store)
b2 := busi.GetBalanceByUid(busi.TransInUID, store)
b1 := busi.GetBalanceByUID(busi.TransOutUID, store)
b2 := busi.GetBalanceByUID(busi.TransInUID, store)
return []int{b1, b2}
}
func assertSameBalance(t *testing.T, before []int, store string) {
b1 := busi.GetBalanceByUid(busi.TransOutUID, store)
b2 := busi.GetBalanceByUid(busi.TransInUID, store)
b1 := busi.GetBalanceByUID(busi.TransOutUID, store)
b2 := busi.GetBalanceByUID(busi.TransInUID, store)
assert.Equal(t, before[0], b1)
assert.Equal(t, before[1], b2)
}
func assertNotSameBalance(t *testing.T, before []int, store string) {
b1 := busi.GetBalanceByUid(busi.TransOutUID, store)
b2 := busi.GetBalanceByUid(busi.TransInUID, store)
b1 := busi.GetBalanceByUID(busi.TransOutUID, store)
b2 := busi.GetBalanceByUID(busi.TransInUID, store)
assert.NotEqual(t, before[0], b1)
assert.Equal(t, before[0]+before[1], b1+b2)
}

5
test/xa_cover_test.go

@ -20,15 +20,14 @@ func TestXaCoverDBError(t *testing.T) {
getXc().Conf.Driver = "no-driver"
_, err = xa.CallBranch(req, busi.Busi+"/TransInXa")
assert.Error(t, err)
getXc().Conf.Driver = oldDriver // make abort succeed
return nil, err
})
assert.Error(t, err)
getXc().Conf.Driver = "no-driver" // make xa rollback failed
waitTransProcessed(gid)
getXc().Conf.Driver = oldDriver
cronTransOnceForwardNow(500) // rollback succeeded here
cronTransOnceForwardNow(t, gid, 500) // rollback succeeded here
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
}
func TestXaCoverDTMError(t *testing.T) {

9
test/xa_test.go

@ -89,7 +89,7 @@ func TestXaTimeout(t *testing.T) {
timeoutChan := make(chan int, 1)
err := getXc().XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
go func() {
cronTransOnceForwardNow(300)
cronTransOnceForwardNow(t, gid, 300)
timeoutChan <- 0
}()
<-timeoutChan
@ -105,10 +105,10 @@ func TestXaNotTimeout(t *testing.T) {
timeoutChan := make(chan int, 1)
err := getXc().XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
go func() {
cronTransOnceForwardNow(0) // not timeout,
cronTransOnceForwardNow(t, gid, 0) // not timeout,
timeoutChan <- 0
}()
_ = <-timeoutChan
<-timeoutChan
req := busi.GenTransReq(30, false, false)
_, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
assert.Nil(t, err)
@ -118,8 +118,7 @@ func TestXaNotTimeout(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSubmitted, getTransStatus(gid))
g := cronTransOnce()
assert.Equal(t, gid, g)
cronTransOnce(t, gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid))
}

Loading…
Cancel
Save