Browse Source

support headers

pull/145/head
yedf2 4 years ago
parent
commit
09ae1cdaa2
  1. 8
      bench/svr/http.go
  2. 11
      conf.sample.yml
  3. 2
      dtmcli/barrier.go
  4. 23
      dtmcli/dtmimp/trans_base.go
  5. 8
      dtmcli/dtmimp/vars.go
  6. 6
      dtmcli/tcc.go
  7. 23
      dtmcli/types.go
  8. 1
      dtmcli/types_test.go
  9. 12
      dtmcli/xa.go
  10. 7
      dtmgrpc/dtmgimp/grpc_clients.go
  11. 45
      dtmgrpc/dtmgimp/utils.go
  12. 197
      dtmgrpc/dtmgpb/dtmgimp.pb.go
  13. 2
      dtmgrpc/dtmgpb/dtmgimp.proto
  14. 23
      dtmgrpc/tcc.go
  15. 17
      dtmgrpc/type.go
  16. 5
      dtmgrpc/type_test.go
  17. 10
      dtmgrpc/xa.go
  18. 6
      dtmsvr/api_grpc.go
  19. 24
      dtmsvr/config/config.go
  20. 43
      dtmsvr/config/config_test.go
  21. 22
      dtmsvr/config/config_utils.go
  22. 12
      dtmsvr/storage/trans.go
  23. 31
      dtmsvr/trans_class.go
  24. 7
      dtmsvr/trans_process.go
  25. 6
      dtmsvr/trans_status.go
  26. 1
      go.mod
  27. 6
      go.sum
  28. 2
      helper/sync-dtmcli.sh
  29. 4
      sqls/dtmsvr.storage.mysql.sql
  30. 4
      sqls/dtmsvr.storage.postgres.sql
  31. 17
      test/busi/base_grpc.go
  32. 14
      test/busi/base_http.go
  33. 56
      test/busi/busi.pb.go
  34. 2
      test/busi/busi.proto
  35. 72
      test/busi/busi_grpc.pb.go
  36. 28
      test/busi/utils.go
  37. 3
      test/dtmsvr_test.go
  38. 4
      test/main_test.go
  39. 3
      test/msg_grpc_test.go
  40. 9
      test/msg_options_test.go
  41. 5
      test/msg_test.go
  42. 6
      test/saga_concurrent_test.go
  43. 11
      test/saga_cover_test.go
  44. 74
      test/saga_grpc_test.go
  45. 85
      test/saga_options_test.go
  46. 8
      test/saga_test.go
  47. 1
      test/tcc_cover_test.go
  48. 3
      test/tcc_grpc_cover_test.go
  49. 22
      test/tcc_grpc_test.go
  50. 19
      test/tcc_test.go
  51. 18
      test/types.go
  52. 3
      test/xa_cover_test.go
  53. 8
      test/xa_grpc_test.go
  54. 3
      test/xa_test.go

8
bench/svr/http.go

@ -33,14 +33,14 @@ const total = 200000
var benchPort = dtmimp.If(os.Getenv("BENCH_PORT") == "", "8083", os.Getenv("BENCH_PORT")).(string)
var benchBusi = fmt.Sprintf("http://localhost:%s%s", benchPort, benchAPI)
func sdbGet() *sql.DB {
func pdbGet() *sql.DB {
db, err := dtmimp.PooledDB(busi.BusiConf)
logger.FatalIfError(err)
return db
}
func txGet() *sql.Tx {
db := sdbGet()
db := pdbGet()
tx, err := db.Begin()
logger.FatalIfError(err)
return tx
@ -49,7 +49,7 @@ func txGet() *sql.Tx {
func reloadData() {
time.Sleep(dtmsvr.UpdateBranchAsyncInterval * 2)
began := time.Now()
db := sdbGet()
db := pdbGet()
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"}
for _, t := range tables {
_, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t))
@ -70,7 +70,7 @@ var mode string = ""
var sqls int = 1
func PrepareBenchDB() {
db := sdbGet()
db := pdbGet()
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
logger.FatalIfError(err)
_, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log (

11
conf.sample.yml

@ -1,3 +1,10 @@
#####################################################################
### dtm can be run without any config.
### all config in this file is optional. the default value is as specified in each line
### all configs can be specified from env. for example:
### Store.MaxOpenConns can also specified from env: STORE_MAX_OPEN_CONNS
#####################################################################
# Store: # specify which engine to store trans status
# Driver: 'boltdb' # default store engine
@ -19,10 +26,12 @@
# Password: 'mysecretpassword'
# Port: '5432'
### following connection config is for only Driver postgres/mysql
### following config is for only Driver postgres/mysql
# MaxOpenConns: 500
# MaxIdleConns: 500
# ConnMaxLifeTime 5 # default value is 5 (minutes)
# TransGlobalTable: 'dtm.trans_global'
# TransBranchOp: 'dtm.trans_branch_op'
### flollowing config is only for some Driver
# DataExpire: 604800 # Trans data will expire in 7 days. only for redis/boltdb.

2
dtmcli/barrier.go

@ -54,7 +54,7 @@ func insertBarrier(tx DB, transType string, gid string, branchID string, op stri
if op == "" {
return 0, nil
}
sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate(dtmimp.BarrierTableName+"(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}

23
dtmcli/dtmimp/trans_base.go

@ -40,9 +40,11 @@ func (g *BranchIDGen) CurrentSubBranchID() string {
// TransOptions transaction options
type TransOptions struct {
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"`
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"`
}
// TransBase base for all trans
@ -62,18 +64,14 @@ type TransBase struct {
QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG
}
// SetOptions set options
func (tb *TransBase) SetOptions(options *TransOptions) {
tb.TransOptions = *options
}
// NewTransBase new a TransBase
func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {
return &TransBase{
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
}
}
@ -118,6 +116,7 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri
"trans_type": t.TransType,
"op": op,
}).
SetHeaders(t.BranchHeaders).
Post(url)
return resp, CheckResponse(resp, err)
}

8
dtmcli/dtmimp/vars.go

@ -31,13 +31,19 @@ var MapFailure = map[string]interface{}{"dtm_result": ResultFailure}
// RestyClient the resty object
var RestyClient = resty.New()
// PassthroughHeaders will be passed to every sub-trans call
var PassthroughHeaders = []string{}
// BarrierTableName the table name of barrier table
var BarrierTableName = "dtm_barrier.barrier"
func init() {
// RestyClient.SetTimeout(3 * time.Second)
// RestyClient.SetRetryCount(2)
// RestyClient.SetRetryWaitTime(1 * time.Second)
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL)
logger.Debugf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam)
logger.Debugf("requesting: %s %s %s", r.Method, r.URL, MustMarshalString(r.Body))
return nil
})
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {

6
dtmcli/tcc.go

@ -27,7 +27,13 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error)
// gid global transaction ID
// tccFunc tcc事务函数,里面会定义全局事务的分支
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
return TccGlobalTransaction2(dtm, gid, func(t *Tcc) {}, tccFunc)
}
// TccGlobalTransaction2 new version of TccGlobalTransaction, add custom param
func TccGlobalTransaction2(dtm string, gid string, custom func(*Tcc), tccFunc TccGlobalFunc) (rerr error) {
tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")}
custom(tcc)
rerr = dtmimp.TransCallDtm(&tcc.TransBase, tcc, "prepare")
if rerr != nil {
return rerr

23
dtmcli/types.go

@ -10,6 +10,7 @@ import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/go-resty/resty/v2"
)
// MustGenGid generate a new gid
@ -49,3 +50,25 @@ func SetXaSqlTimeoutMs(ms int) {
func GetXaSqlTimeoutMs() int {
return dtmimp.XaSqlTimeoutMs
}
func SetBarrierTableName(tablename string) {
dtmimp.BarrierTableName = tablename
}
// OnBeforeRequest add before request middleware
func OnBeforeRequest(middleware func(c *resty.Client, r *resty.Request) error) {
dtmimp.RestyClient.OnBeforeRequest(middleware)
}
// OnAfterResponse add after request middleware
func OnAfterResponse(middleware func(c *resty.Client, resp *resty.Response) error) {
dtmimp.RestyClient.OnAfterResponse(middleware)
}
// SetPassthroughHeaders experimental.
// apply to http header and grpc metadata
// dtm server will save these headers in trans creating request.
// and then passthrough them to sub-trans
func SetPassthroughHeaders(headers []string) {
dtmimp.PassthroughHeaders = headers
}

1
dtmcli/types_test.go

@ -28,4 +28,5 @@ func TestTypes(t *testing.T) {
func TestXaSqlTimeout(t *testing.T) {
old := GetXaSqlTimeoutMs()
SetXaSqlTimeoutMs(old)
SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func
}

12
dtmcli/xa.go

@ -83,11 +83,17 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) error
// XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
xa := Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")}
return xc.XaGlobalTransaction2(gid, func(x *Xa) {}, xaFunc)
}
// XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc XaGlobalFunc) (rerr error) {
xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")}
custom(xa)
return xc.HandleGlobalTrans(&xa.TransBase, func(action string) error {
return dtmimp.TransCallDtm(&xa.TransBase, &xa, action)
return dtmimp.TransCallDtm(&xa.TransBase, xa, action)
}, func() error {
_, rerr := xaFunc(&xa)
_, rerr := xaFunc(xa)
return rerr
})
}

7
dtmgrpc/dtmgimp/grpc_clients.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc "google.golang.org/grpc"
)
@ -35,6 +36,8 @@ func (cb rawCodec) Name() string { return "dtm_raw" }
var normalClients, rawClients sync.Map
var ClientInterceptors = []grpc.UnaryClientInterceptor{}
// MustGetDtmClient 1
func MustGetDtmClient(grpcServer string) dtmgpb.DtmClient {
return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, false))
@ -59,7 +62,9 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err
opts = grpc.WithDefaultCallOptions(grpc.ForceCodec(rawCodec{}))
}
logger.Debugf("grpc client connecting %s", grpcServer)
conn, rerr := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(GrpcClientLog), opts)
interceptors := append(ClientInterceptors, GrpcClientLog)
inOpt := grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(interceptors...))
conn, rerr := grpc.Dial(grpcServer, inOpt, grpc.WithInsecure(), opts)
if rerr == nil {
clients.Store(grpcServer, conn)
v = conn

45
dtmgrpc/dtmgimp/utils.go

@ -31,9 +31,11 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
Gid: s.Gid,
TransType: s.TransType,
TransOptions: &dtmgpb.DtmTransOptions{
WaitResult: s.WaitResult,
TimeoutToFail: s.TimeoutToFail,
RetryInterval: s.RetryInterval,
WaitResult: s.WaitResult,
TimeoutToFail: s.TimeoutToFail,
RetryInterval: s.RetryInterval,
PassthroughHeaders: s.PassthroughHeaders,
BranchHeaders: s.BranchHeaders,
},
QueryPrepared: s.QueryPrepared,
CustomedData: s.CustomData,
@ -42,20 +44,29 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
}, &reply)
}
const mdpre string = "dtm-"
const dtmpre string = "dtm-"
// TransInfo2Ctx add trans info to grpc context
func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context {
md := metadata.Pairs(
mdpre+"gid", gid,
mdpre+"trans_type", transType,
mdpre+"branch_id", branchID,
mdpre+"op", op,
mdpre+"dtm", dtm,
dtmpre+"gid", gid,
dtmpre+"trans_type", transType,
dtmpre+"branch_id", branchID,
dtmpre+"op", op,
dtmpre+"dtm", dtm,
)
return metadata.NewOutgoingContext(context.Background(), md)
}
// Map2Kvs map to metadata kv
func Map2Kvs(m map[string]string) []string {
kvs := []string{}
for k, v := range m {
kvs = append(kvs, k, v)
}
return kvs
}
// LogDtmCtx logout dtm info in context metadata
func LogDtmCtx(ctx context.Context) {
tb := TransBaseFromGrpc(ctx)
@ -64,8 +75,12 @@ func LogDtmCtx(ctx context.Context) {
}
}
func dtmGet(md metadata.MD, key string) string {
return mdGet(md, dtmpre+key)
}
func mdGet(md metadata.MD, key string) string {
v := md.Get(mdpre + key)
v := md.Get(key)
if len(v) == 0 {
return ""
}
@ -75,7 +90,13 @@ func mdGet(md metadata.MD, key string) string {
// TransBaseFromGrpc get trans base info from a context metadata
func TransBaseFromGrpc(ctx context.Context) *dtmimp.TransBase {
md, _ := metadata.FromIncomingContext(ctx)
tb := dtmimp.NewTransBase(mdGet(md, "gid"), mdGet(md, "trans_type"), mdGet(md, "dtm"), mdGet(md, "branch_id"))
tb.Op = mdGet(md, "op")
tb := dtmimp.NewTransBase(dtmGet(md, "gid"), dtmGet(md, "trans_type"), dtmGet(md, "dtm"), dtmGet(md, "branch_id"))
tb.Op = dtmGet(md, "op")
return tb
}
// GetMetaFromContext get header from context
func GetMetaFromContext(ctx context.Context, name string) string {
md, _ := metadata.FromIncomingContext(ctx)
return mdGet(md, name)
}

197
dtmgrpc/dtmgpb/dtmgimp.pb.go

@ -26,9 +26,11 @@ type DtmTransOptions struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
WaitResult bool `protobuf:"varint,1,opt,name=WaitResult,proto3" json:"WaitResult,omitempty"`
TimeoutToFail int64 `protobuf:"varint,2,opt,name=TimeoutToFail,proto3" json:"TimeoutToFail,omitempty"`
RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"`
WaitResult bool `protobuf:"varint,1,opt,name=WaitResult,proto3" json:"WaitResult,omitempty"`
TimeoutToFail int64 `protobuf:"varint,2,opt,name=TimeoutToFail,proto3" json:"TimeoutToFail,omitempty"`
RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"`
PassthroughHeaders []string `protobuf:"bytes,4,rep,name=PassthroughHeaders,proto3" json:"PassthroughHeaders,omitempty"`
BranchHeaders map[string]string `protobuf:"bytes,5,rep,name=BranchHeaders,proto3" json:"BranchHeaders,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *DtmTransOptions) Reset() {
@ -84,6 +86,20 @@ func (x *DtmTransOptions) GetRetryInterval() int64 {
return 0
}
func (x *DtmTransOptions) GetPassthroughHeaders() []string {
if x != nil {
return x.PassthroughHeaders
}
return nil
}
func (x *DtmTransOptions) GetBranchHeaders() map[string]string {
if x != nil {
return x.BranchHeaders
}
return nil
}
// DtmRequest request sent to dtm server
type DtmRequest struct {
state protoimpl.MessageState
@ -321,69 +337,82 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x2f, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07,
0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7d, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73,
0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52,
0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61, 0x69,
0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f,
0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d,
0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x24, 0x0a,
0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03,
0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72,
0x76, 0x61, 0x6c, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79,
0x70, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x12, 0x22, 0x0a, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61,
0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64,
0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f,
0x61, 0x64, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50,
0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51,
0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x53, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x53, 0x74, 0x65,
0x70, 0x73, 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c,
0x79, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
0x47, 0x69, 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63,
0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc2, 0x02, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74,
0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61,
0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65,
0x6f, 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52,
0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x24,
0x0a, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18,
0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65,
0x72, 0x76, 0x61, 0x6c, 0x12, 0x2e, 0x0a, 0x12, 0x50, 0x61, 0x73, 0x73, 0x74, 0x68, 0x72, 0x6f,
0x75, 0x67, 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09,
0x52, 0x12, 0x50, 0x61, 0x73, 0x73, 0x74, 0x68, 0x72, 0x6f, 0x75, 0x67, 0x68, 0x48, 0x65, 0x61,
0x64, 0x65, 0x72, 0x73, 0x12, 0x51, 0x0a, 0x0d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65,
0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x64, 0x74,
0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, 0x61, 0x64,
0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x40, 0x0a, 0x12, 0x42, 0x72, 0x61, 0x6e, 0x63,
0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a,
0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74,
0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72,
0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54,
0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e,
0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e,
0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x52, 0x02, 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61,
0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a,
0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a,
0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d,
0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75,
0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44,
0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f,
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d,
0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x43, 0x75,
0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69,
0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0c, 0x52,
0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d,
0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72,
0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47,
0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74,
0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10,
0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64,
0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a,
0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70,
0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61,
0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44,
0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f,
0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1,
0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64,
0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00,
0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d,
0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65,
0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44,
0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a,
0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70,
0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65,
0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d,
0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08,
0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52,
0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e,
0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63,
0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -398,33 +427,35 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP() []byte {
return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescData
}
var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_dtmgrpc_dtmgpb_dtmgimp_proto_goTypes = []interface{}{
(*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions
(*DtmRequest)(nil), // 1: dtmgimp.DtmRequest
(*DtmGidReply)(nil), // 2: dtmgimp.DtmGidReply
(*DtmBranchRequest)(nil), // 3: dtmgimp.DtmBranchRequest
nil, // 4: dtmgimp.DtmBranchRequest.DataEntry
(*emptypb.Empty)(nil), // 5: google.protobuf.Empty
nil, // 4: dtmgimp.DtmTransOptions.BranchHeadersEntry
nil, // 5: dtmgimp.DtmBranchRequest.DataEntry
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
}
var file_dtmgrpc_dtmgpb_dtmgimp_proto_depIdxs = []int32{
0, // 0: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions
4, // 1: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry
5, // 2: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty
1, // 3: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest
1, // 4: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest
1, // 5: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest
3, // 6: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest
2, // 7: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply
5, // 8: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty
5, // 9: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty
5, // 10: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty
5, // 11: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty
7, // [7:12] is the sub-list for method output_type
2, // [2:7] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
4, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry
0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions
5, // 2: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry
6, // 3: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty
1, // 4: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest
1, // 5: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest
1, // 6: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest
3, // 7: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest
2, // 8: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply
6, // 9: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty
6, // 10: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty
6, // 11: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty
6, // 12: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty
8, // [8:13] is the sub-list for method output_type
3, // [3:8] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
func init() { file_dtmgrpc_dtmgpb_dtmgimp_proto_init() }
@ -488,7 +519,7 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc,
NumEnums: 0,
NumMessages: 5,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},

2
dtmgrpc/dtmgpb/dtmgimp.proto

@ -18,6 +18,8 @@ message DtmTransOptions {
bool WaitResult = 1;
int64 TimeoutToFail = 2;
int64 RetryInterval = 3;
repeated string PassthroughHeaders = 4;
map<string, string> BranchHeaders = 5;
}
// DtmRequest request sent to dtm server

23
dtmgrpc/tcc.go

@ -14,6 +14,7 @@ import (
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
@ -30,13 +31,14 @@ type TccGlobalFunc func(tcc *TccGrpc) error
// gid 全局事务id
// tccFunc tcc事务函数,里面会定义全局事务的分支
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
return TccGlobalTransaction2(dtm, gid, func(tg *TccGrpc) {}, tccFunc)
}
// TccGlobalTransaction2 new version of TccGlobalTransaction
func TccGlobalTransaction2(dtm string, gid string, custom func(*TccGrpc), tccFunc TccGlobalFunc) (rerr error) {
tcc := &TccGrpc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")}
dc := dtmgimp.MustGetDtmClient(tcc.Dtm)
dr := &dtmgpb.DtmRequest{
Gid: tcc.Gid,
TransType: tcc.TransType,
}
_, rerr = dc.Prepare(context.Background(), dr)
custom(tcc)
rerr = dtmgimp.DtmGrpcCall(&tcc.TransBase, "Prepare")
if rerr != nil {
return rerr
}
@ -44,10 +46,10 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
defer func() {
x := recover()
if x == nil && rerr == nil {
_, rerr = dc.Submit(context.Background(), dr)
rerr = dtmgimp.DtmGrpcCall(&tcc.TransBase, "Submit")
return
}
_, err := dc.Abort(context.Background(), dr)
err := dtmgimp.DtmGrpcCall(&tcc.TransBase, "Abort")
if rerr == nil {
rerr = err
}
@ -87,6 +89,7 @@ func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL st
if err != nil {
return err
}
return dtmgimp.MustGetGrpcConn(server, false).Invoke(
dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm), method, busiMsg, reply)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm)
ctx = metadata.AppendToOutgoingContext(ctx, dtmgimp.Map2Kvs(t.BranchHeaders)...)
return dtmgimp.MustGetGrpcConn(server, false).Invoke(ctx, method, busiMsg, reply)
}

17
dtmgrpc/type.go

@ -9,10 +9,10 @@ package dtmgrpc
import (
context "context"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtmdriver"
grpc "google.golang.org/grpc"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
@ -24,16 +24,11 @@ func MustGenGid(grpcServer string) string {
return r.Gid
}
// SetCurrentDBType set the current db type
func SetCurrentDBType(dbType string) {
dtmcli.SetCurrentDBType(dbType)
}
// GetCurrentDBType set the current db type
func GetCurrentDBType() string {
return dtmcli.GetCurrentDBType()
}
// UseDriver use the specified driver to handle grpc urls
func UseDriver(driverName string) error {
return dtmdriver.Use(driverName)
}
func AddUnaryInterceptor(interceptor grpc.UnaryClientInterceptor) {
dtmgimp.ClientInterceptors = append(dtmgimp.ClientInterceptors, interceptor)
}

5
dtmgrpc/type_test.go

@ -10,7 +10,6 @@ import (
"context"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/stretchr/testify/assert"
)
@ -21,10 +20,6 @@ func TestType(t *testing.T) {
_, err = TccFromGrpc(context.Background())
assert.Error(t, err)
old := GetCurrentDBType()
SetCurrentDBType(dtmcli.DBTypeMysql)
SetCurrentDBType(old)
err = UseDriver("default")
assert.Nil(t, err)
}

10
dtmgrpc/xa.go

@ -92,7 +92,13 @@ func (xc *XaGrpcClient) XaLocalTransaction(ctx context.Context, msg proto.Messag
// XaGlobalTransaction start a xa global transaction
func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) error {
xa := XaGrpc{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.Server, "")}
return xc.XaGlobalTransaction2(gid, func(xg *XaGrpc) {}, xaFunc)
}
// XaGlobalTransaction2 new version of XaGlobalTransaction. support custom
func (xc *XaGrpcClient) XaGlobalTransaction2(gid string, custom func(*XaGrpc), xaFunc XaGrpcGlobalFunc) error {
xa := &XaGrpc{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.Server, "")}
custom(xa)
dc := dtmgimp.MustGetDtmClient(xa.Dtm)
req := &dtmgpb.DtmRequest{
Gid: gid,
@ -107,7 +113,7 @@ func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc)
_, err := f(context.Background(), req)
return err
}, func() error {
return xaFunc(&xa)
return xaFunc(xa)
})
}

6
dtmsvr/api_grpc.go

@ -25,17 +25,17 @@ func (s *dtmServer) NewGid(ctx context.Context, in *emptypb.Empty) (*pb.DtmGidRe
}
func (s *dtmServer) Submit(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r, err := svcSubmit(TransFromDtmRequest(in))
r, err := svcSubmit(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
}
func (s *dtmServer) Prepare(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r, err := svcPrepare(TransFromDtmRequest(in))
r, err := svcPrepare(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
}
func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r, err := svcAbort(TransFromDtmRequest(in))
r, err := svcAbort(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
}

24
dtmsvr/config/config.go

@ -24,16 +24,18 @@ type MicroService struct {
}
type Store struct {
Driver string `yaml:"Driver" default:"boltdb"`
Host string `yaml:"Host"`
Port int64 `yaml:"Port"`
User string `yaml:"User"`
Password string `yaml:"Password"`
MaxOpenConns int64 `yaml:"MaxOpenConns" default:"500"`
MaxIdleConns int64 `yaml:"MaxIdleConns" default:"500"`
ConnMaxLifeTime int64 `yaml:"ConnMaxLifeTime" default:"5"`
DataExpire int64 `yaml:"DataExpire" default:"604800"` // Trans data will expire in 7 days. only for redis/boltdb.
RedisPrefix string `yaml:"RedisPrefix" default:"{a}"` // Redis storage prefix. store data to only one slot in cluster
Driver string `yaml:"Driver" default:"boltdb"`
Host string `yaml:"Host"`
Port int64 `yaml:"Port"`
User string `yaml:"User"`
Password string `yaml:"Password"`
MaxOpenConns int64 `yaml:"MaxOpenConns" default:"500"`
MaxIdleConns int64 `yaml:"MaxIdleConns" default:"500"`
ConnMaxLifeTime int64 `yaml:"ConnMaxLifeTime" default:"5"`
DataExpire int64 `yaml:"DataExpire" default:"604800"` // Trans data will expire in 7 days. only for redis/boltdb.
RedisPrefix string `yaml:"RedisPrefix" default:"{a}"` // Redis storage prefix. store data to only one slot in cluster
TransGlobalTable string `yaml:"TransGlobalTable" default:"dtm.trans_global"`
TransBranchOpTable string `yaml:"BranchTransOpTable" default:"dtm.trans_branch_op"`
}
func (s *Store) IsDB() bool {
@ -77,7 +79,7 @@ func MustLoadConfig(confFile string) {
scont, err := json.MarshalIndent(&Config, "", " ")
logger.FatalIfError(err)
logger.Infof("config file: %s loaded config is: \n%s", confFile, scont)
err = checkConfig()
err = checkConfig(&Config)
logger.FatalfIf(err != nil, `config error: '%v'.
please visit http://d.dtm.pub to see the config document.`, err)
}

43
dtmsvr/config/config_test.go

@ -17,38 +17,47 @@ func TestLoadFromEnv(t *testing.T) {
assert.Equal(t, "d1", ms.Driver)
}
func TestCheckConfig(t *testing.T) {
func TestLoadConfig(t *testing.T) {
MustLoadConfig("../../conf.sample.yml")
config := &Config
config.RetryInterval = 1
retryIntervalErr := checkConfig()
}
func TestCheckConfig(t *testing.T) {
conf := Config
conf.RetryInterval = 1
retryIntervalErr := checkConfig(&conf)
retryIntervalExpect := errors.New("RetryInterval should not be less than 10")
assert.Equal(t, retryIntervalErr, retryIntervalExpect)
config.RetryInterval = 10
config.TimeoutToFail = 5
timeoutToFailErr := checkConfig()
conf.RetryInterval = 10
conf.TimeoutToFail = 5
timeoutToFailErr := checkConfig(&conf)
timeoutToFailExpect := errors.New("TimeoutToFail should not be less than RetryInterval")
assert.Equal(t, timeoutToFailErr, timeoutToFailExpect)
config.TimeoutToFail = 20
driverErr := checkConfig()
conf.TimeoutToFail = 20
driverErr := checkConfig(&conf)
assert.Equal(t, driverErr, nil)
config.Store = Store{Driver: Mysql}
hostErr := checkConfig()
conf.Store = Store{Driver: Mysql}
hostErr := checkConfig(&conf)
hostExpect := errors.New("Db host not valid ")
assert.Equal(t, hostErr, hostExpect)
config.Store = Store{Driver: Mysql, Host: "127.0.0.1"}
portErr := checkConfig()
conf.Store = Store{Driver: Mysql, Host: "127.0.0.1"}
portErr := checkConfig(&conf)
portExpect := errors.New("Db port not valid ")
assert.Equal(t, portErr, portExpect)
config.Store = Store{Driver: Mysql, Host: "127.0.0.1", Port: 8686}
userErr := checkConfig()
conf.Store = Store{Driver: Mysql, Host: "127.0.0.1", Port: 8686}
userErr := checkConfig(&conf)
userExpect := errors.New("Db user not valid ")
assert.Equal(t, userErr, userExpect)
conf.Store = Store{Driver: Redis, Host: "", Port: 8686}
assert.Equal(t, errors.New("Redis host not valid"), checkConfig(&conf))
conf.Store = Store{Driver: Redis, Host: "127.0.0.1", Port: 0}
assert.Equal(t, errors.New("Redis port not valid"), checkConfig(&conf))
}
func TestConfig(t *testing.T) {
@ -61,7 +70,7 @@ func TestConfig(t *testing.T) {
func testConfigStringField(fd *string, val string, t *testing.T) {
old := *fd
*fd = val
str := checkConfig()
str := checkConfig(&Config)
assert.NotEqual(t, "", str)
*fd = old
}
@ -69,7 +78,7 @@ func testConfigStringField(fd *string, val string, t *testing.T) {
func testConfigIntField(fd *int64, val int64, t *testing.T) {
old := *fd
*fd = val
str := checkConfig()
str := checkConfig(&Config)
assert.NotEqual(t, "", str)
*fd = old
}

22
dtmsvr/config/config_utils.go

@ -55,32 +55,32 @@ func toUnderscoreUpper(key string) string {
return strings.ToUpper(s2)
}
func checkConfig() error {
if Config.RetryInterval < 10 {
func checkConfig(conf *configType) error {
if conf.RetryInterval < 10 {
return errors.New("RetryInterval should not be less than 10")
}
if Config.TimeoutToFail < Config.RetryInterval {
if conf.TimeoutToFail < conf.RetryInterval {
return errors.New("TimeoutToFail should not be less than RetryInterval")
}
switch Config.Store.Driver {
switch conf.Store.Driver {
case BoltDb:
return nil
case Mysql:
if Config.Store.Host == "" {
if conf.Store.Host == "" {
return errors.New("Db host not valid ")
}
if Config.Store.Port == 0 {
if conf.Store.Port == 0 {
return errors.New("Db port not valid ")
}
if Config.Store.User == "" {
if conf.Store.User == "" {
return errors.New("Db user not valid ")
}
case Redis:
if Config.Store.Host == "" {
return errors.New("Redis host not valid ")
if conf.Store.Host == "" {
return errors.New("Redis host not valid")
}
if Config.Store.Port == 0 {
return errors.New("Redis port not valid ")
if conf.Store.Port == 0 {
return errors.New("Redis port not valid")
}
}
return nil

12
dtmsvr/storage/trans.go

@ -5,9 +5,14 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmutil"
)
type TransGlobalExt struct {
Headers map[string]string `json:"headers,omitempty" gorm:"-"`
}
type TransGlobalStore struct {
dtmutil.ModelBase
Gid string `json:"gid,omitempty"`
@ -18,7 +23,6 @@ type TransGlobalStore struct {
Status string `json:"status,omitempty"`
QueryPrepared string `json:"query_prepared,omitempty"`
Protocol string `json:"protocol,omitempty"`
CommitTime *time.Time `json:"commit_time,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
Options string `json:"options,omitempty"`
@ -26,12 +30,14 @@ type TransGlobalStore struct {
NextCronInterval int64 `json:"next_cron_interval,omitempty"`
NextCronTime *time.Time `json:"next_cron_time,omitempty"`
Owner string `json:"owner,omitempty"`
Ext TransGlobalExt `json:"-" gorm:"-"`
ExtData string `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Options
dtmcli.TransOptions
}
// TableName TableName
func (g *TransGlobalStore) TableName() string {
return "dtm.trans_global"
return config.Config.Store.TransGlobalTable
}
func (g *TransGlobalStore) String() string {
@ -53,7 +59,7 @@ type TransBranchStore struct {
// TableName TableName
func (b *TransBranchStore) TableName() string {
return "dtm.trans_branch_op"
return config.Config.Store.TransBranchOpTable
}
func (b *TransBranchStore) String() string {

31
dtmsvr/trans_class.go

@ -7,11 +7,13 @@
package dtmsvr
import (
"context"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/gin-gonic/gin"
@ -69,11 +71,21 @@ func TransFromContext(c *gin.Context) *TransGlobal {
}
}
m.Protocol = "http"
m.Ext.Headers = map[string]string{}
if len(m.PassthroughHeaders) > 0 {
for _, h := range m.PassthroughHeaders {
v := c.GetHeader(h)
if v != "" {
m.Ext.Headers[h] = v
}
}
}
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(c *dtmgpb.DtmRequest) *TransGlobal {
func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal {
o := &dtmgpb.DtmTransOptions{}
if c.TransOptions != nil {
o = c.TransOptions
@ -85,13 +97,24 @@ func TransFromDtmRequest(c *dtmgpb.DtmRequest) *TransGlobal {
Protocol: "grpc",
BinPayloads: c.BinPayloads,
TransOptions: dtmcli.TransOptions{
WaitResult: o.WaitResult,
TimeoutToFail: o.TimeoutToFail,
RetryInterval: o.RetryInterval,
WaitResult: o.WaitResult,
TimeoutToFail: o.TimeoutToFail,
RetryInterval: o.RetryInterval,
PassthroughHeaders: o.PassthroughHeaders,
BranchHeaders: o.BranchHeaders,
},
}}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
if len(o.PassthroughHeaders) > 0 {
r.Ext.Headers = map[string]string{}
for _, h := range o.PassthroughHeaders {
v := dtmgimp.GetMetaFromContext(ctx, h)
if v != "" {
r.Ext.Headers[h] = v
}
}
}
return &r
}

7
dtmsvr/trans_process.go

@ -26,6 +26,9 @@ func (t *TransGlobal) process(branches []TransBranch) map[string]interface{} {
if t.Options != "" {
dtmimp.MustUnmarshalString(t.Options, &t.TransOptions)
}
if t.ExtData != "" {
dtmimp.MustUnmarshalString(t.ExtData, &t.Ext)
}
if !t.WaitResult {
go t.processInner(branches)
@ -63,6 +66,10 @@ func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) {
func (t *TransGlobal) saveNew() ([]TransBranch, error) {
t.NextCronInterval = t.getNextCronInterval(cronReset)
t.NextCronTime = dtmutil.GetNextTime(t.NextCronInterval)
t.ExtData = dtmimp.MustMarshalString(t.Ext)
if t.ExtData == "{}" {
t.ExtData = ""
}
t.Options = dtmimp.MustMarshalString(t.TransOptions)
if t.Options == "{}" {
t.Options = ""

6
dtmsvr/trans_status.go

@ -17,6 +17,7 @@ import (
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@ -83,6 +84,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
}
conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "")
kvs := dtmgimp.Map2Kvs(t.Ext.Headers)
kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...)
ctx = metadata.AppendToOutgoingContext(ctx, kvs...)
err = conn.Invoke(ctx, method, branchPayload, &[]byte{})
if err == nil {
return dtmcli.ResultSuccess, nil
@ -106,6 +110,8 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
"op": op,
}).
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url)
if err != nil {
return "", err

1
go.mod

@ -8,7 +8,6 @@ require (
github.com/dtm-labs/dtmdriver-polaris v0.0.2
github.com/dtm-labs/dtmdriver-protocol1 v0.0.1
github.com/gin-gonic/gin v1.6.3
github.com/go-redis/redis/v8 v8.11.4
github.com/go-resty/resty/v2 v2.7.0
github.com/go-sql-driver/mysql v1.6.0

6
go.sum

@ -509,6 +509,12 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/ychensha/dtmdriver-polaris v0.0.1/go.mod h1:0BdQvxXlGOlF6YVlsDoVvu8jyxdTlJZ9Kyh5t9lRA94=
github.com/yedf/dtm v1.7.2 h1:qGjio5O+Zm5CmMfga9Eb0rsUu9nkNHxGuJwc7PEkMfc=
github.com/yedf/dtm v1.7.2/go.mod h1:R1Q55spqLh7yHMVJhGI8RpS1iG7OvRX99pWZRlsAtME=
github.com/yedf/dtmdriver v0.0.0-20211203060147-29426c663b6e/go.mod h1:aeo6ZWiVI0x8P8O18r6uB1cG2uw9BCQyYZaH15MlRDI=
github.com/yedf/dtmdriver-gozero v0.0.0-20211204083751-a14485949435/go.mod h1:RYtA6oZny6LzlIRb1tPGt5bHfgqws/JaU6ogFly8ByQ=
github.com/yedf/dtmdriver-protocol1 v0.0.0-20211205112411-d7a7052dc90e/go.mod h1:kB3NPnDKSGioVjgdfj6qgbqYJinOml45GnlHqR46Ycc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

2
helper/sync-dtmcli.sh

@ -30,8 +30,6 @@ go build || exit 1
git add .
git commit -m"update from dtm to version $ver"
git push
git tag $ver
git push --tags
cd ../dtmgrpc

4
sqls/dtmsvr.storage.mysql.sql

@ -12,14 +12,14 @@ CREATE TABLE if not EXISTS dtm.trans_global (
`protocol` varchar(45) not null comment '通信协议 http | grpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`commit_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(256) DEFAULT '',
`options` varchar(1024) DEFAULT '',
`custom_data` varchar(256) DEFAULT '',
`next_cron_interval` int(11) default null comment '下次定时处理的间隔',
`next_cron_time` datetime default null comment '下次定时处理的时间',
`owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者',
`ext_data` TEXT comment 'global扩展字段的数据',
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),

4
sqls/dtmsvr.storage.postgres.sql

@ -13,14 +13,14 @@ CREATE TABLE if not EXISTS dtm.trans_global (
protocol varchar(45) not null,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
commit_time timestamp(0) with time zone DEFAULT NULL,
finish_time timestamp(0) with time zone DEFAULT NULL,
rollback_time timestamp(0) with time zone DEFAULT NULL,
options varchar(256) DEFAULT '',
options varchar(1024) DEFAULT '',
custom_data varchar(256) DEFAULT '',
next_cron_interval int default null,
next_cron_time timestamp(0) with time zone default null,
owner varchar(128) not null default '',
ext_data text,
PRIMARY KEY (id),
CONSTRAINT gid UNIQUE (gid)
);

17
test/busi/base_grpc.go

@ -9,6 +9,7 @@ package busi
import (
"context"
"database/sql"
"errors"
"fmt"
"net"
@ -123,3 +124,19 @@ func (s *busiServer) TransInTccNested(ctx context.Context, in *BusiReq) (*emptyp
func (s *busiServer) XaNotify(ctx context.Context, in *emptypb.Empty) (*emptypb.Empty, error) {
return XaGrpcClient.HandleCallback(ctx)
}
func (s *busiServer) TransOutHeaderYes(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
meta := dtmgimp.GetMetaFromContext(ctx, "test_header")
if meta == "" {
return &emptypb.Empty{}, errors.New("no header found in HeaderYes")
}
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutResult.Fetch(), in.TransOutResult, dtmimp.GetFuncName())
}
func (s *busiServer) TransOutHeaderNo(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
meta := dtmgimp.GetMetaFromContext(ctx, "test_header")
if meta != "" {
return &emptypb.Empty{}, errors.New("header found in HeaderNo")
}
return &emptypb.Empty{}, nil
}

14
test/busi/base_http.go

@ -154,4 +154,18 @@ func BaseAddRoute(app *gin.Engine) {
app.POST(BusiAPI+"/TccBSleepCancel", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
return sleepCancelHandler(c)
}))
app.POST(BusiAPI+"/TransOutHeaderYes", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
h := c.GetHeader("test_header")
if h == "" {
return nil, errors.New("no test_header found in TransOutHeaderYes")
}
return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut")
}))
app.POST(BusiAPI+"/TransOutHeaderNo", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
h := c.GetHeader("test_header")
if h != "" {
return nil, errors.New("test_header found in TransOutHeaderNo")
}
return dtmcli.MapSuccess, nil
}))
}

56
test/busi/busi.pb.go

@ -148,7 +148,7 @@ var file_test_busi_busi_proto_rawDesc = []byte{
0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x32, 0xd3, 0x07, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x2d, 0x0a, 0x09, 0x43, 0x61,
0x65, 0x32, 0xce, 0x08, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x2d, 0x0a, 0x09, 0x43, 0x61,
0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42,
0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, 0x61,
@ -209,8 +209,16 @@ var file_test_busi_busi_proto_rawDesc = []byte{
0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d,
0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73,
0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73,
0x4f, 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x59, 0x65, 0x73, 0x12, 0x0d, 0x2e, 0x62,
0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75,
0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -249,25 +257,29 @@ var file_test_busi_busi_proto_depIdxs = []int32{
0, // 14: busi.Busi.TransOutBSaga:input_type -> busi.BusiReq
0, // 15: busi.Busi.TransInRevertBSaga:input_type -> busi.BusiReq
0, // 16: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq
1, // 17: busi.Busi.CanSubmit:output_type -> busi.BusiReply
2, // 18: busi.Busi.TransIn:output_type -> google.protobuf.Empty
2, // 19: busi.Busi.TransOut:output_type -> google.protobuf.Empty
2, // 20: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty
2, // 21: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty
2, // 22: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty
2, // 23: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
2, // 24: busi.Busi.XaNotify:output_type -> google.protobuf.Empty
2, // 25: busi.Busi.TransInXa:output_type -> google.protobuf.Empty
2, // 26: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty
2, // 27: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty
2, // 28: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty
2, // 29: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty
2, // 30: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty
2, // 31: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty
2, // 32: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty
2, // 33: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty
17, // [17:34] is the sub-list for method output_type
0, // [0:17] is the sub-list for method input_type
0, // 17: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq
0, // 18: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq
1, // 19: busi.Busi.CanSubmit:output_type -> busi.BusiReply
2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty
2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty
2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty
2, // 23: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty
2, // 24: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty
2, // 25: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
2, // 26: busi.Busi.XaNotify:output_type -> google.protobuf.Empty
2, // 27: busi.Busi.TransInXa:output_type -> google.protobuf.Empty
2, // 28: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty
2, // 29: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty
2, // 30: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty
2, // 31: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty
2, // 32: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty
2, // 33: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty
2, // 34: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty
2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty
2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty
2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty
19, // [19:38] is the sub-list for method output_type
0, // [0:19] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name

2
test/busi/busi.proto

@ -36,5 +36,7 @@ service Busi {
rpc TransOutBSaga(BusiReq) returns (google.protobuf.Empty) {}
rpc TransInRevertBSaga(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutRevertBSaga(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutHeaderYes(BusiReq) returns (google.protobuf.Empty) {}
rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {}
}

72
test/busi/busi_grpc.pb.go

@ -36,6 +36,8 @@ type BusiClient interface {
TransOutBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransInRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type busiClient struct {
@ -199,6 +201,24 @@ func (c *busiClient) TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts
return out, nil
}
func (c *busiClient) TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransOutHeaderYes", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/busi.Busi/TransOutHeaderNo", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BusiServer is the server API for Busi service.
// All implementations must embed UnimplementedBusiServer
// for forward compatibility
@ -220,6 +240,8 @@ type BusiServer interface {
TransOutBSaga(context.Context, *BusiReq) (*emptypb.Empty, error)
TransInRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error)
TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error)
mustEmbedUnimplementedBusiServer()
}
@ -278,6 +300,12 @@ func (UnimplementedBusiServer) TransInRevertBSaga(context.Context, *BusiReq) (*e
func (UnimplementedBusiServer) TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutRevertBSaga not implemented")
}
func (UnimplementedBusiServer) TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderYes not implemented")
}
func (UnimplementedBusiServer) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderNo not implemented")
}
func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {}
// UnsafeBusiServer may be embedded to opt out of forward compatibility for this service.
@ -597,6 +625,42 @@ func _Busi_TransOutRevertBSaga_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _Busi_TransOutHeaderYes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransOutHeaderYes(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/TransOutHeaderYes",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransOutHeaderYes(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransOutHeaderNo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BusiReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransOutHeaderNo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/busi.Busi/TransOutHeaderNo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransOutHeaderNo(ctx, req.(*BusiReq))
}
return interceptor(ctx, in, info, handler)
}
// Busi_ServiceDesc is the grpc.ServiceDesc for Busi service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -672,6 +736,14 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
MethodName: "TransOutRevertBSaga",
Handler: _Busi_TransOutRevertBSaga_Handler,
},
{
MethodName: "TransOutHeaderYes",
Handler: _Busi_TransOutHeaderYes_Handler,
},
{
MethodName: "TransOutHeaderNo",
Handler: _Busi_TransOutHeaderNo_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "test/busi/busi.proto",

28
test/busi/utils.go

@ -4,27 +4,32 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func dbGet() *dtmutil.DB {
return dtmutil.DbGet(BusiConf)
}
func sdbGet() *sql.DB {
func pdbGet() *sql.DB {
db, err := dtmimp.PooledDB(BusiConf)
logger.FatalIfError(err)
return db
}
func txGet() *sql.Tx {
db := sdbGet()
db := pdbGet()
tx, err := db.Begin()
logger.FatalIfError(err)
return tx
@ -59,3 +64,22 @@ func MustBarrierFromGrpc(ctx context.Context) *dtmcli.BranchBarrier {
logger.FatalIfError(err)
return ti
}
// SetGrpcHeaderForHeadersYes interceptor to set head for HeadersYes
func SetGrpcHeaderForHeadersYes(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if r, ok := req.(*dtmgpb.DtmRequest); ok && strings.HasSuffix(r.Gid, "HeadersYes") {
logger.Debugf("writing test_header:test to ctx")
md := metadata.New(map[string]string{"test_header": "test"})
ctx = metadata.NewOutgoingContext(ctx, md)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
// SetHttpHeaderForHeadersYes interceptor to set head for HeadersYes
func SetHttpHeaderForHeadersYes(c *resty.Client, r *resty.Request) error {
if b, ok := r.Body.(*dtmcli.Saga); ok && strings.HasSuffix(b.Gid, "HeadersYes") {
logger.Debugf("set test_header for url: %s", r.URL)
r.SetHeader("test_header", "yes")
}
return nil
}

3
test/dtmsvr_test.go

@ -10,7 +10,6 @@ import (
"testing"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmutil"
@ -45,7 +44,7 @@ func TestUpdateBranchAsync(t *testing.T) {
}
conf.UpdateBranchSync = 0
saga := genSaga1(dtmimp.GetFuncName(), false, false)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
saga.WaitResult = true
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)

4
test/main_test.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/test/busi"
@ -33,6 +34,9 @@ func TestMain(m *testing.M) {
dtmsvr.CronForwardDuration = 180 * time.Second
conf.UpdateBranchSync = 1
dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes)
dtmcli.OnBeforeRequest(busi.SetHttpHeaderForHeadersYes)
tenv := os.Getenv("TEST_STORE")
if tenv == "boltdb" {
conf.Store.Driver = "boltdb"

3
test/msg_grpc_test.go

@ -38,7 +38,8 @@ func TestMsgGrpcTimeoutSuccess(t *testing.T) {
cronTransOnceForwardNow(180)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
}

9
test/msg_options_test.go

@ -18,7 +18,8 @@ import (
func TestMsgOptionsTimeout(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
msg.Prepare("")
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
@ -28,7 +29,8 @@ func TestMsgOptionsTimeoutCustom(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
msg.TimeoutToFail = 120
msg.Prepare("")
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
@ -40,7 +42,8 @@ func TestMsgOptionsTimeoutFailed(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
msg.TimeoutToFail = 120
msg.Prepare("")
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))
cronTransOnceForwardNow(60)
assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid))

5
test/msg_test.go

@ -35,7 +35,8 @@ func TestMsgTimeoutSuccess(t *testing.T) {
busi.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, msg.Gid, g)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
@ -60,7 +61,7 @@ func TestMsgAbnormal(t *testing.T) {
assert.Nil(t, err)
err = msg.Submit()
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
err = msg.Prepare("")
assert.Error(t, err)
}

6
test/saga_concurrent_test.go

@ -34,7 +34,8 @@ func TestSagaConRollbackNormal(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(sagaCon.Gid)
assert.Equal(t, StatusAborting, getTransStatus(sagaCon.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, sagaCon.Gid, g)
assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid))
// TODO should fix this
// assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusSucceed, StatusSucceed}, getBranchesStatus(sagaCon.Gid))
@ -58,7 +59,8 @@ func TestSagaConCommittedOngoing(t *testing.T) {
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(sagaCon.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, sagaCon.Gid, g)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(sagaCon.Gid))
}

11
test/saga_cover_test.go

@ -0,0 +1,11 @@
package test
import (
"testing"
"github.com/dtm-labs/dtm/dtmcli"
)
func TestSagaCover(t *testing.T) {
dtmcli.SetPassthroughHeaders([]string{})
}

74
test/saga_grpc_test.go

@ -31,7 +31,8 @@ func TestSagaGrpcRollback(t *testing.T) {
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusAborting, getTransStatus(saga.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
}
@ -62,14 +63,15 @@ func TestSagaGrpcCommittedOngoing(t *testing.T) {
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
func TestSagaGrpcNormalWait(t *testing.T) {
saga := genSagaGrpc(dtmimp.GetFuncName(), false, false)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
saga.WaitResult = true
saga.Submit()
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
@ -94,3 +96,69 @@ func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
saga.Add(busi.BusiGrpc+"/busi.Busi/TransIn", busi.BusiGrpc+"/busi.Busi/TransInRevert", req)
return saga
}
func TestSagaGrpcPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)
sagaYes.WaitResult = true
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil)
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
}
func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
func TestSagaGrpcPassthroughHeadersNo(t *testing.T) {
gidNo := dtmimp.GetFuncName()
sagaNo := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidNo)
sagaNo.WaitResult = true
sagaNo.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderNo", "", nil)
err := sagaNo.Submit()
assert.Nil(t, err)
waitTransProcessed(gidNo)
}
func TestSagaGrpcHeaders(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes).
Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil)
sagaYes.BranchHeaders = map[string]string{
"test_header": "test",
}
sagaYes.WaitResult = true
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
}
func TestSagaGrpcCronHeaders(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)
sagaYes.BranchHeaders = map[string]string{
"test_header": "test",
}
sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}

85
test/saga_options_test.go

@ -11,6 +11,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
@ -22,7 +23,8 @@ func TestSagaOptionsRetryOngoing(t *testing.T) {
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
@ -34,10 +36,12 @@ func TestSagaOptionsRetryError(t *testing.T) {
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
cronTransOnce()
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
cronTransOnceForwardCron(360)
g := cronTransOnce()
assert.Equal(t, "", g)
g = cronTransOnceForwardCron(360)
assert.Equal(t, saga.Gid, g)
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
}
@ -55,7 +59,7 @@ func TestSagaOptionsTimeout(t *testing.T) {
func TestSagaOptionsNormalWait(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
saga.WaitResult = true
err := saga.Submit()
assert.Nil(t, err)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
@ -66,23 +70,90 @@ func TestSagaOptionsNormalWait(t *testing.T) {
func TestSagaOptionsCommittedOngoingWait(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
saga.WaitResult = true
err := saga.Submit()
assert.Error(t, err)
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
waitTransProcessed(saga.Gid)
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
func TestSagaOptionsRollbackWait(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, true)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
saga.WaitResult = true
err := saga.Submit()
assert.Error(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
}
func TestSagaPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes.WaitResult = true
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
}
func TestSagaCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
func TestSagaPassthroughHeadersNo(t *testing.T) {
gidNo := dtmimp.GetFuncName()
sagaNo := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidNo)
sagaNo.WaitResult = true
sagaNo.Add(busi.Busi+"/TransOutHeaderNo", "", nil)
err := sagaNo.Submit()
assert.Nil(t, err)
waitTransProcessed(gidNo)
}
func TestSagaHeaders(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes.BranchHeaders = map[string]string{
"test_header": "test",
}
sagaYes.WaitResult = true
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
}
func TestSagaHeadersYes1(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes.BranchHeaders = map[string]string{
"test_header": "test",
}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
g := cronTransOnce()
assert.Equal(t, gidYes, g)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}

8
test/saga_test.go

@ -31,7 +31,8 @@ func TestSagaOngoingSucceed(t *testing.T) {
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
@ -43,15 +44,18 @@ func TestSagaFailed(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, StatusAborting, getTransStatus(saga.Gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, saga.Gid, g)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
}
func TestSagaAbnormal(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
err = saga.Submit() // submit twice, ignored
assert.Nil(t, err)
waitTransProcessed(saga.Gid)

1
test/tcc_cover_test.go

@ -27,4 +27,5 @@ func TestTccCoverPanic(t *testing.T) {
assert.FailNow(t, "not executed")
})
assert.Contains(t, err.Error(), "user panic")
waitTransProcessed(gid)
}

3
test/tcc_grpc_cover_test.go

@ -28,6 +28,7 @@ func TestTccGrpcCoverPanic(t *testing.T) {
assert.FailNow(t, "not executed")
})
assert.Contains(t, err.Error(), "user panic")
waitTransProcessed(gid)
}
func TestTccGrpcCoverCallBranch(t *testing.T) {
@ -46,4 +47,6 @@ func TestTccGrpcCoverCallBranch(t *testing.T) {
return err
})
assert.Error(t, err)
g := cronTransOnceForwardNow(300)
assert.Equal(t, gid, g)
}

22
test/tcc_grpc_test.go

@ -49,7 +49,8 @@ func TestTccGrpcRollback(t *testing.T) {
assert.Error(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusAborting, getTransStatus(gid))
cronTransOnce()
g2 := cronTransOnce()
assert.Equal(t, gid, g2)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
}
@ -76,3 +77,22 @@ func TestTccGrpcType(t *testing.T) {
err = dtmgrpc.TccGlobalTransaction("-", "", func(tcc *dtmgrpc.TccGrpc) error { return nil })
assert.Error(t, err)
}
func TestTccGrpcHeaders(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmgrpc.TccGlobalTransaction2(dtmutil.DefaultGrpcServer, gid, func(tg *dtmgrpc.TccGrpc) {
tg.BranchHeaders = map[string]string{
"test_header": "test",
}
tg.WaitResult = true
}, func(tcc *dtmgrpc.TccGrpc) error {
data := &busi.BusiReq{Amount: 30}
r := &emptypb.Empty{}
return tcc.CallBranch(data, busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", "", r)
})
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid))
}

19
test/tcc_test.go

@ -43,7 +43,8 @@ func TestTccRollback(t *testing.T) {
assert.Error(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusAborting, getTransStatus(gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, gid, g)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid))
}
@ -84,3 +85,19 @@ func TestTccCompatible(t *testing.T) {
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(gid))
}
func TestTccHeaders(t *testing.T) {
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultHttpServer, gid, func(t *dtmcli.Tcc) {
t.BranchHeaders = map[string]string{
"test_header": "test",
}
}, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
return tcc.CallBranch(req, Busi+"/TransOutHeaderYes", "", "")
})
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid))
}

18
test/types.go

@ -29,21 +29,19 @@ func waitTransProcessed(gid string) {
logger.Debugf("waiting for gid %s", gid)
select {
case id := <-dtmsvr.TransProcessedTestChan:
for id != gid {
logger.Warnf("------- expecting: %s but %s found", gid, id)
id = <-dtmsvr.TransProcessedTestChan
}
logger.FatalfIf(id != gid, "------- expecting: %s but %s found", gid, id)
logger.Debugf("finish for gid %s", gid)
case <-time.After(time.Duration(time.Second * 3)):
logger.FatalfIf(true, "Wait Trans timeout")
}
}
func cronTransOnce() {
func cronTransOnce() string {
gid := dtmsvr.CronTransOnce()
if dtmsvr.TransProcessedTestChan != nil && gid != "" {
waitTransProcessed(gid)
}
return gid
}
var e2p = dtmimp.E2P
@ -54,18 +52,20 @@ type TransGlobal = dtmsvr.TransGlobal
// TransBranch alias
type TransBranch = dtmsvr.TransBranch
func cronTransOnceForwardNow(seconds int) {
func cronTransOnceForwardNow(seconds int) string {
old := dtmsvr.NowForwardDuration
dtmsvr.NowForwardDuration = time.Duration(seconds) * time.Second
cronTransOnce()
gid := cronTransOnce()
dtmsvr.NowForwardDuration = old
return gid
}
func cronTransOnceForwardCron(seconds int) {
func cronTransOnceForwardCron(seconds int) string {
old := dtmsvr.CronForwardDuration
dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second
cronTransOnce()
gid := cronTransOnce()
dtmsvr.CronForwardDuration = old
return gid
}
const (

3
test/xa_cover_test.go

@ -43,7 +43,7 @@ func TestXaCoverDTMError(t *testing.T) {
}
func TestXaCoverGidError(t *testing.T) {
gid := "errgid-' '"
gid := dtmimp.GetFuncName() + "-' '"
err := getXc().XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, false)
_, err := xa.CallBranch(req, busi.Busi+"/TransOutXa")
@ -51,4 +51,5 @@ func TestXaCoverGidError(t *testing.T) {
return nil, err
})
assert.Error(t, err)
waitTransProcessed(gid)
}

8
test/xa_grpc_test.go

@ -56,6 +56,7 @@ func TestXaGrpcRollback(t *testing.T) {
}
func TestXaGrpcType(t *testing.T) {
gid := dtmimp.GetFuncName()
_, err := dtmgrpc.XaGrpcFromRequest(context.Background())
assert.Error(t, err)
@ -63,15 +64,18 @@ func TestXaGrpcType(t *testing.T) {
assert.Error(t, err)
err = dtmimp.CatchP(func() {
busi.XaGrpcClient.XaGlobalTransaction("id1", func(xa *dtmgrpc.XaGrpc) error { panic(fmt.Errorf("hello")) })
busi.XaGrpcClient.XaGlobalTransaction(gid, func(xa *dtmgrpc.XaGrpc) error { panic(fmt.Errorf("hello")) })
})
assert.Error(t, err)
waitTransProcessed(gid)
}
func TestXaGrpcLocalError(t *testing.T) {
gid := dtmimp.GetFuncName()
xc := busi.XaGrpcClient
err := xc.XaGlobalTransaction(dtmimp.GetFuncName(), func(xa *dtmgrpc.XaGrpc) error {
err := xc.XaGlobalTransaction(gid, func(xa *dtmgrpc.XaGrpc) error {
return fmt.Errorf("an error")
})
assert.Error(t, err, fmt.Errorf("an error"))
waitTransProcessed(gid)
}

3
test/xa_test.go

@ -118,7 +118,8 @@ func TestXaNotTimeout(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSubmitted, getTransStatus(gid))
cronTransOnce()
g := cronTransOnce()
assert.Equal(t, gid, g)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid))
}

Loading…
Cancel
Save