diff --git a/bench/svr/http.go b/bench/svr/http.go index d1c8978..92fd44d 100644 --- a/bench/svr/http.go +++ b/bench/svr/http.go @@ -33,14 +33,14 @@ const total = 200000 var benchPort = dtmimp.If(os.Getenv("BENCH_PORT") == "", "8083", os.Getenv("BENCH_PORT")).(string) var benchBusi = fmt.Sprintf("http://localhost:%s%s", benchPort, benchAPI) -func sdbGet() *sql.DB { +func pdbGet() *sql.DB { db, err := dtmimp.PooledDB(busi.BusiConf) logger.FatalIfError(err) return db } func txGet() *sql.Tx { - db := sdbGet() + db := pdbGet() tx, err := db.Begin() logger.FatalIfError(err) return tx @@ -49,7 +49,7 @@ func txGet() *sql.Tx { func reloadData() { time.Sleep(dtmsvr.UpdateBranchAsyncInterval * 2) began := time.Now() - db := sdbGet() + db := pdbGet() tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"} for _, t := range tables { _, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t)) @@ -70,7 +70,7 @@ var mode string = "" var sqls int = 1 func PrepareBenchDB() { - db := sdbGet() + db := pdbGet() _, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log") logger.FatalIfError(err) _, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log ( diff --git a/conf.sample.yml b/conf.sample.yml index 8dee2ea..807d582 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -1,3 +1,10 @@ +##################################################################### +### dtm can be run without any config. +### all config in this file is optional. the default value is as specified in each line +### all configs can be specified from env. for example: +### Store.MaxOpenConns can also specified from env: STORE_MAX_OPEN_CONNS +##################################################################### + # Store: # specify which engine to store trans status # Driver: 'boltdb' # default store engine @@ -19,10 +26,12 @@ # Password: 'mysecretpassword' # Port: '5432' -### following connection config is for only Driver postgres/mysql +### following config is for only Driver postgres/mysql # MaxOpenConns: 500 # MaxIdleConns: 500 # ConnMaxLifeTime 5 # default value is 5 (minutes) +# TransGlobalTable: 'dtm.trans_global' +# TransBranchOp: 'dtm.trans_branch_op' ### flollowing config is only for some Driver # DataExpire: 604800 # Trans data will expire in 7 days. only for redis/boltdb. diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index ff14336..62f59b2 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -54,7 +54,7 @@ func insertBarrier(tx DB, transType string, gid string, branchID string, op stri if op == "" { return 0, nil } - sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") + sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate(dtmimp.BarrierTableName+"(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason) } diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index b050064..9c89a9f 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -40,9 +40,11 @@ func (g *BranchIDGen) CurrentSubBranchID() string { // TransOptions transaction options type TransOptions struct { - WaitResult bool `json:"wait_result,omitempty" gorm:"-"` - TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc - RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc + WaitResult bool `json:"wait_result,omitempty" gorm:"-"` + TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc + RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc + PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` + BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` } // TransBase base for all trans @@ -62,18 +64,14 @@ type TransBase struct { QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG } -// SetOptions set options -func (tb *TransBase) SetOptions(options *TransOptions) { - tb.TransOptions = *options -} - // NewTransBase new a TransBase func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase { return &TransBase{ - Gid: gid, - TransType: transType, - BranchIDGen: BranchIDGen{BranchID: branchID}, - Dtm: dtm, + Gid: gid, + TransType: transType, + BranchIDGen: BranchIDGen{BranchID: branchID}, + Dtm: dtm, + TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders}, } } @@ -118,6 +116,7 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri "trans_type": t.TransType, "op": op, }). + SetHeaders(t.BranchHeaders). Post(url) return resp, CheckResponse(resp, err) } diff --git a/dtmcli/dtmimp/vars.go b/dtmcli/dtmimp/vars.go index eea02ad..fc51d70 100644 --- a/dtmcli/dtmimp/vars.go +++ b/dtmcli/dtmimp/vars.go @@ -31,13 +31,19 @@ var MapFailure = map[string]interface{}{"dtm_result": ResultFailure} // RestyClient the resty object var RestyClient = resty.New() +// PassthroughHeaders will be passed to every sub-trans call +var PassthroughHeaders = []string{} + +// BarrierTableName the table name of barrier table +var BarrierTableName = "dtm_barrier.barrier" + func init() { // RestyClient.SetTimeout(3 * time.Second) // RestyClient.SetRetryCount(2) // RestyClient.SetRetryWaitTime(1 * time.Second) RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { r.URL = MayReplaceLocalhost(r.URL) - logger.Debugf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) + logger.Debugf("requesting: %s %s %s", r.Method, r.URL, MustMarshalString(r.Body)) return nil }) RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 73d1172..d61a7fe 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -27,7 +27,13 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // gid global transaction ID // tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { + return TccGlobalTransaction2(dtm, gid, func(t *Tcc) {}, tccFunc) +} + +// TccGlobalTransaction2 new version of TccGlobalTransaction, add custom param +func TccGlobalTransaction2(dtm string, gid string, custom func(*Tcc), tccFunc TccGlobalFunc) (rerr error) { tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")} + custom(tcc) rerr = dtmimp.TransCallDtm(&tcc.TransBase, tcc, "prepare") if rerr != nil { return rerr diff --git a/dtmcli/types.go b/dtmcli/types.go index e0e54e2..482651a 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -10,6 +10,7 @@ import ( "fmt" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/go-resty/resty/v2" ) // MustGenGid generate a new gid @@ -49,3 +50,25 @@ func SetXaSqlTimeoutMs(ms int) { func GetXaSqlTimeoutMs() int { return dtmimp.XaSqlTimeoutMs } + +func SetBarrierTableName(tablename string) { + dtmimp.BarrierTableName = tablename +} + +// OnBeforeRequest add before request middleware +func OnBeforeRequest(middleware func(c *resty.Client, r *resty.Request) error) { + dtmimp.RestyClient.OnBeforeRequest(middleware) +} + +// OnAfterResponse add after request middleware +func OnAfterResponse(middleware func(c *resty.Client, resp *resty.Response) error) { + dtmimp.RestyClient.OnAfterResponse(middleware) +} + +// SetPassthroughHeaders experimental. +// apply to http header and grpc metadata +// dtm server will save these headers in trans creating request. +// and then passthrough them to sub-trans +func SetPassthroughHeaders(headers []string) { + dtmimp.PassthroughHeaders = headers +} diff --git a/dtmcli/types_test.go b/dtmcli/types_test.go index b37ba9a..269c54b 100644 --- a/dtmcli/types_test.go +++ b/dtmcli/types_test.go @@ -28,4 +28,5 @@ func TestTypes(t *testing.T) { func TestXaSqlTimeout(t *testing.T) { old := GetXaSqlTimeoutMs() SetXaSqlTimeoutMs(old) + SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index d28bed7..c74b4d1 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -83,11 +83,17 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) error // XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { - xa := Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")} + return xc.XaGlobalTransaction2(gid, func(x *Xa) {}, xaFunc) +} + +// XaGlobalTransaction start a xa global transaction +func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc XaGlobalFunc) (rerr error) { + xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")} + custom(xa) return xc.HandleGlobalTrans(&xa.TransBase, func(action string) error { - return dtmimp.TransCallDtm(&xa.TransBase, &xa, action) + return dtmimp.TransCallDtm(&xa.TransBase, xa, action) }, func() error { - _, rerr := xaFunc(&xa) + _, rerr := xaFunc(xa) return rerr }) } diff --git a/dtmgrpc/dtmgimp/grpc_clients.go b/dtmgrpc/dtmgimp/grpc_clients.go index d2c63a0..dc21e81 100644 --- a/dtmgrpc/dtmgimp/grpc_clients.go +++ b/dtmgrpc/dtmgimp/grpc_clients.go @@ -13,6 +13,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc "google.golang.org/grpc" ) @@ -35,6 +36,8 @@ func (cb rawCodec) Name() string { return "dtm_raw" } var normalClients, rawClients sync.Map +var ClientInterceptors = []grpc.UnaryClientInterceptor{} + // MustGetDtmClient 1 func MustGetDtmClient(grpcServer string) dtmgpb.DtmClient { return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, false)) @@ -59,7 +62,9 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err opts = grpc.WithDefaultCallOptions(grpc.ForceCodec(rawCodec{})) } logger.Debugf("grpc client connecting %s", grpcServer) - conn, rerr := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(GrpcClientLog), opts) + interceptors := append(ClientInterceptors, GrpcClientLog) + inOpt := grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(interceptors...)) + conn, rerr := grpc.Dial(grpcServer, inOpt, grpc.WithInsecure(), opts) if rerr == nil { clients.Store(grpcServer, conn) v = conn diff --git a/dtmgrpc/dtmgimp/utils.go b/dtmgrpc/dtmgimp/utils.go index bb49f39..fb3eadf 100644 --- a/dtmgrpc/dtmgimp/utils.go +++ b/dtmgrpc/dtmgimp/utils.go @@ -31,9 +31,11 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error { Gid: s.Gid, TransType: s.TransType, TransOptions: &dtmgpb.DtmTransOptions{ - WaitResult: s.WaitResult, - TimeoutToFail: s.TimeoutToFail, - RetryInterval: s.RetryInterval, + WaitResult: s.WaitResult, + TimeoutToFail: s.TimeoutToFail, + RetryInterval: s.RetryInterval, + PassthroughHeaders: s.PassthroughHeaders, + BranchHeaders: s.BranchHeaders, }, QueryPrepared: s.QueryPrepared, CustomedData: s.CustomData, @@ -42,20 +44,29 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error { }, &reply) } -const mdpre string = "dtm-" +const dtmpre string = "dtm-" // TransInfo2Ctx add trans info to grpc context func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context { md := metadata.Pairs( - mdpre+"gid", gid, - mdpre+"trans_type", transType, - mdpre+"branch_id", branchID, - mdpre+"op", op, - mdpre+"dtm", dtm, + dtmpre+"gid", gid, + dtmpre+"trans_type", transType, + dtmpre+"branch_id", branchID, + dtmpre+"op", op, + dtmpre+"dtm", dtm, ) return metadata.NewOutgoingContext(context.Background(), md) } +// Map2Kvs map to metadata kv +func Map2Kvs(m map[string]string) []string { + kvs := []string{} + for k, v := range m { + kvs = append(kvs, k, v) + } + return kvs +} + // LogDtmCtx logout dtm info in context metadata func LogDtmCtx(ctx context.Context) { tb := TransBaseFromGrpc(ctx) @@ -64,8 +75,12 @@ func LogDtmCtx(ctx context.Context) { } } +func dtmGet(md metadata.MD, key string) string { + return mdGet(md, dtmpre+key) +} + func mdGet(md metadata.MD, key string) string { - v := md.Get(mdpre + key) + v := md.Get(key) if len(v) == 0 { return "" } @@ -75,7 +90,13 @@ func mdGet(md metadata.MD, key string) string { // TransBaseFromGrpc get trans base info from a context metadata func TransBaseFromGrpc(ctx context.Context) *dtmimp.TransBase { md, _ := metadata.FromIncomingContext(ctx) - tb := dtmimp.NewTransBase(mdGet(md, "gid"), mdGet(md, "trans_type"), mdGet(md, "dtm"), mdGet(md, "branch_id")) - tb.Op = mdGet(md, "op") + tb := dtmimp.NewTransBase(dtmGet(md, "gid"), dtmGet(md, "trans_type"), dtmGet(md, "dtm"), dtmGet(md, "branch_id")) + tb.Op = dtmGet(md, "op") return tb } + +// GetMetaFromContext get header from context +func GetMetaFromContext(ctx context.Context, name string) string { + md, _ := metadata.FromIncomingContext(ctx) + return mdGet(md, name) +} diff --git a/dtmgrpc/dtmgpb/dtmgimp.pb.go b/dtmgrpc/dtmgpb/dtmgimp.pb.go index 89a2526..9247260 100644 --- a/dtmgrpc/dtmgpb/dtmgimp.pb.go +++ b/dtmgrpc/dtmgpb/dtmgimp.pb.go @@ -26,9 +26,11 @@ type DtmTransOptions struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - WaitResult bool `protobuf:"varint,1,opt,name=WaitResult,proto3" json:"WaitResult,omitempty"` - TimeoutToFail int64 `protobuf:"varint,2,opt,name=TimeoutToFail,proto3" json:"TimeoutToFail,omitempty"` - RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"` + WaitResult bool `protobuf:"varint,1,opt,name=WaitResult,proto3" json:"WaitResult,omitempty"` + TimeoutToFail int64 `protobuf:"varint,2,opt,name=TimeoutToFail,proto3" json:"TimeoutToFail,omitempty"` + RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"` + PassthroughHeaders []string `protobuf:"bytes,4,rep,name=PassthroughHeaders,proto3" json:"PassthroughHeaders,omitempty"` + BranchHeaders map[string]string `protobuf:"bytes,5,rep,name=BranchHeaders,proto3" json:"BranchHeaders,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *DtmTransOptions) Reset() { @@ -84,6 +86,20 @@ func (x *DtmTransOptions) GetRetryInterval() int64 { return 0 } +func (x *DtmTransOptions) GetPassthroughHeaders() []string { + if x != nil { + return x.PassthroughHeaders + } + return nil +} + +func (x *DtmTransOptions) GetBranchHeaders() map[string]string { + if x != nil { + return x.BranchHeaders + } + return nil +} + // DtmRequest request sent to dtm server type DtmRequest struct { state protoimpl.MessageState @@ -321,69 +337,82 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{ 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7d, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61, 0x69, - 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, - 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x24, 0x0a, - 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, - 0x76, 0x61, 0x6c, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, - 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x22, 0x0a, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, - 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, - 0x61, 0x64, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, - 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, - 0x53, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x53, 0x74, 0x65, - 0x70, 0x73, 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, - 0x79, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x47, 0x69, 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, - 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc2, 0x02, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61, + 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x54, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x24, + 0x0a, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x76, 0x61, 0x6c, 0x12, 0x2e, 0x0a, 0x12, 0x50, 0x61, 0x73, 0x73, 0x74, 0x68, 0x72, 0x6f, + 0x75, 0x67, 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x12, 0x50, 0x61, 0x73, 0x73, 0x74, 0x68, 0x72, 0x6f, 0x75, 0x67, 0x68, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x73, 0x12, 0x51, 0x0a, 0x0d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x64, 0x74, + 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x40, 0x0a, 0x12, 0x42, 0x72, 0x61, 0x6e, 0x63, + 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, + 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, - 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, - 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x02, 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, - 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, - 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, - 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, - 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, - 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, - 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, - 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, - 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, + 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, + 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, + 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x43, 0x75, + 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69, + 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0c, 0x52, + 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, + 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, + 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, + 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, + 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, + 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, + 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, + 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, + 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, + 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, + 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, + 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, + 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, + 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, + 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, + 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, - 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, - 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, - 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, - 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, - 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, + 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, + 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, + 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -398,33 +427,35 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP() []byte { return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescData } -var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_dtmgrpc_dtmgpb_dtmgimp_proto_goTypes = []interface{}{ (*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions (*DtmRequest)(nil), // 1: dtmgimp.DtmRequest (*DtmGidReply)(nil), // 2: dtmgimp.DtmGidReply (*DtmBranchRequest)(nil), // 3: dtmgimp.DtmBranchRequest - nil, // 4: dtmgimp.DtmBranchRequest.DataEntry - (*emptypb.Empty)(nil), // 5: google.protobuf.Empty + nil, // 4: dtmgimp.DtmTransOptions.BranchHeadersEntry + nil, // 5: dtmgimp.DtmBranchRequest.DataEntry + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty } var file_dtmgrpc_dtmgpb_dtmgimp_proto_depIdxs = []int32{ - 0, // 0: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions - 4, // 1: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry - 5, // 2: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty - 1, // 3: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest - 1, // 4: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest - 1, // 5: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest - 3, // 6: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest - 2, // 7: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply - 5, // 8: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty - 5, // 9: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty - 5, // 10: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty - 5, // 11: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty - 7, // [7:12] is the sub-list for method output_type - 2, // [2:7] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 4, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry + 0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions + 5, // 2: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry + 6, // 3: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty + 1, // 4: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest + 1, // 5: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest + 1, // 6: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest + 3, // 7: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest + 2, // 8: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply + 6, // 9: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty + 6, // 10: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty + 6, // 11: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty + 6, // 12: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty + 8, // [8:13] is the sub-list for method output_type + 3, // [3:8] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_dtmgrpc_dtmgpb_dtmgimp_proto_init() } @@ -488,7 +519,7 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/dtmgrpc/dtmgpb/dtmgimp.proto b/dtmgrpc/dtmgpb/dtmgimp.proto index 84e3105..f97b277 100644 --- a/dtmgrpc/dtmgpb/dtmgimp.proto +++ b/dtmgrpc/dtmgpb/dtmgimp.proto @@ -18,6 +18,8 @@ message DtmTransOptions { bool WaitResult = 1; int64 TimeoutToFail = 2; int64 RetryInterval = 3; + repeated string PassthroughHeaders = 4; + map BranchHeaders = 5; } // DtmRequest request sent to dtm server diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index d4c0592..3f39676 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -14,6 +14,7 @@ import ( "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" "github.com/dtm-labs/dtmdriver" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" ) @@ -30,13 +31,14 @@ type TccGlobalFunc func(tcc *TccGrpc) error // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { + return TccGlobalTransaction2(dtm, gid, func(tg *TccGrpc) {}, tccFunc) +} + +// TccGlobalTransaction2 new version of TccGlobalTransaction +func TccGlobalTransaction2(dtm string, gid string, custom func(*TccGrpc), tccFunc TccGlobalFunc) (rerr error) { tcc := &TccGrpc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")} - dc := dtmgimp.MustGetDtmClient(tcc.Dtm) - dr := &dtmgpb.DtmRequest{ - Gid: tcc.Gid, - TransType: tcc.TransType, - } - _, rerr = dc.Prepare(context.Background(), dr) + custom(tcc) + rerr = dtmgimp.DtmGrpcCall(&tcc.TransBase, "Prepare") if rerr != nil { return rerr } @@ -44,10 +46,10 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e defer func() { x := recover() if x == nil && rerr == nil { - _, rerr = dc.Submit(context.Background(), dr) + rerr = dtmgimp.DtmGrpcCall(&tcc.TransBase, "Submit") return } - _, err := dc.Abort(context.Background(), dr) + err := dtmgimp.DtmGrpcCall(&tcc.TransBase, "Abort") if rerr == nil { rerr = err } @@ -87,6 +89,7 @@ func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL st if err != nil { return err } - return dtmgimp.MustGetGrpcConn(server, false).Invoke( - dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm), method, busiMsg, reply) + ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm) + ctx = metadata.AppendToOutgoingContext(ctx, dtmgimp.Map2Kvs(t.BranchHeaders)...) + return dtmgimp.MustGetGrpcConn(server, false).Invoke(ctx, method, busiMsg, reply) } diff --git a/dtmgrpc/type.go b/dtmgrpc/type.go index c2b7fa7..d166b5c 100644 --- a/dtmgrpc/type.go +++ b/dtmgrpc/type.go @@ -9,10 +9,10 @@ package dtmgrpc import ( context "context" - "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtmdriver" + grpc "google.golang.org/grpc" emptypb "google.golang.org/protobuf/types/known/emptypb" ) @@ -24,16 +24,11 @@ func MustGenGid(grpcServer string) string { return r.Gid } -// SetCurrentDBType set the current db type -func SetCurrentDBType(dbType string) { - dtmcli.SetCurrentDBType(dbType) -} - -// GetCurrentDBType set the current db type -func GetCurrentDBType() string { - return dtmcli.GetCurrentDBType() -} - +// UseDriver use the specified driver to handle grpc urls func UseDriver(driverName string) error { return dtmdriver.Use(driverName) } + +func AddUnaryInterceptor(interceptor grpc.UnaryClientInterceptor) { + dtmgimp.ClientInterceptors = append(dtmgimp.ClientInterceptors, interceptor) +} diff --git a/dtmgrpc/type_test.go b/dtmgrpc/type_test.go index 7c44e4a..c54583e 100644 --- a/dtmgrpc/type_test.go +++ b/dtmgrpc/type_test.go @@ -10,7 +10,6 @@ import ( "context" "testing" - "github.com/dtm-labs/dtm/dtmcli" "github.com/stretchr/testify/assert" ) @@ -21,10 +20,6 @@ func TestType(t *testing.T) { _, err = TccFromGrpc(context.Background()) assert.Error(t, err) - old := GetCurrentDBType() - SetCurrentDBType(dtmcli.DBTypeMysql) - SetCurrentDBType(old) - err = UseDriver("default") assert.Nil(t, err) } diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index 95a97a1..3cfd8bf 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -92,7 +92,13 @@ func (xc *XaGrpcClient) XaLocalTransaction(ctx context.Context, msg proto.Messag // XaGlobalTransaction start a xa global transaction func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) error { - xa := XaGrpc{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.Server, "")} + return xc.XaGlobalTransaction2(gid, func(xg *XaGrpc) {}, xaFunc) +} + +// XaGlobalTransaction2 new version of XaGlobalTransaction. support custom +func (xc *XaGrpcClient) XaGlobalTransaction2(gid string, custom func(*XaGrpc), xaFunc XaGrpcGlobalFunc) error { + xa := &XaGrpc{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.Server, "")} + custom(xa) dc := dtmgimp.MustGetDtmClient(xa.Dtm) req := &dtmgpb.DtmRequest{ Gid: gid, @@ -107,7 +113,7 @@ func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) _, err := f(context.Background(), req) return err }, func() error { - return xaFunc(&xa) + return xaFunc(xa) }) } diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go index 461b251..8cd48d3 100644 --- a/dtmsvr/api_grpc.go +++ b/dtmsvr/api_grpc.go @@ -25,17 +25,17 @@ func (s *dtmServer) NewGid(ctx context.Context, in *emptypb.Empty) (*pb.DtmGidRe } func (s *dtmServer) Submit(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) { - r, err := svcSubmit(TransFromDtmRequest(in)) + r, err := svcSubmit(TransFromDtmRequest(ctx, in)) return &emptypb.Empty{}, dtmgimp.Result2Error(r, err) } func (s *dtmServer) Prepare(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) { - r, err := svcPrepare(TransFromDtmRequest(in)) + r, err := svcPrepare(TransFromDtmRequest(ctx, in)) return &emptypb.Empty{}, dtmgimp.Result2Error(r, err) } func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) { - r, err := svcAbort(TransFromDtmRequest(in)) + r, err := svcAbort(TransFromDtmRequest(ctx, in)) return &emptypb.Empty{}, dtmgimp.Result2Error(r, err) } diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go index 9acc8f5..5aad90f 100644 --- a/dtmsvr/config/config.go +++ b/dtmsvr/config/config.go @@ -24,16 +24,18 @@ type MicroService struct { } type Store struct { - Driver string `yaml:"Driver" default:"boltdb"` - Host string `yaml:"Host"` - Port int64 `yaml:"Port"` - User string `yaml:"User"` - Password string `yaml:"Password"` - MaxOpenConns int64 `yaml:"MaxOpenConns" default:"500"` - MaxIdleConns int64 `yaml:"MaxIdleConns" default:"500"` - ConnMaxLifeTime int64 `yaml:"ConnMaxLifeTime" default:"5"` - DataExpire int64 `yaml:"DataExpire" default:"604800"` // Trans data will expire in 7 days. only for redis/boltdb. - RedisPrefix string `yaml:"RedisPrefix" default:"{a}"` // Redis storage prefix. store data to only one slot in cluster + Driver string `yaml:"Driver" default:"boltdb"` + Host string `yaml:"Host"` + Port int64 `yaml:"Port"` + User string `yaml:"User"` + Password string `yaml:"Password"` + MaxOpenConns int64 `yaml:"MaxOpenConns" default:"500"` + MaxIdleConns int64 `yaml:"MaxIdleConns" default:"500"` + ConnMaxLifeTime int64 `yaml:"ConnMaxLifeTime" default:"5"` + DataExpire int64 `yaml:"DataExpire" default:"604800"` // Trans data will expire in 7 days. only for redis/boltdb. + RedisPrefix string `yaml:"RedisPrefix" default:"{a}"` // Redis storage prefix. store data to only one slot in cluster + TransGlobalTable string `yaml:"TransGlobalTable" default:"dtm.trans_global"` + TransBranchOpTable string `yaml:"BranchTransOpTable" default:"dtm.trans_branch_op"` } func (s *Store) IsDB() bool { @@ -77,7 +79,7 @@ func MustLoadConfig(confFile string) { scont, err := json.MarshalIndent(&Config, "", " ") logger.FatalIfError(err) logger.Infof("config file: %s loaded config is: \n%s", confFile, scont) - err = checkConfig() + err = checkConfig(&Config) logger.FatalfIf(err != nil, `config error: '%v'. please visit http://d.dtm.pub to see the config document.`, err) } diff --git a/dtmsvr/config/config_test.go b/dtmsvr/config/config_test.go index 8088bef..f7ac1e8 100644 --- a/dtmsvr/config/config_test.go +++ b/dtmsvr/config/config_test.go @@ -17,38 +17,47 @@ func TestLoadFromEnv(t *testing.T) { assert.Equal(t, "d1", ms.Driver) } -func TestCheckConfig(t *testing.T) { +func TestLoadConfig(t *testing.T) { MustLoadConfig("../../conf.sample.yml") - config := &Config - config.RetryInterval = 1 - retryIntervalErr := checkConfig() +} +func TestCheckConfig(t *testing.T) { + conf := Config + conf.RetryInterval = 1 + retryIntervalErr := checkConfig(&conf) retryIntervalExpect := errors.New("RetryInterval should not be less than 10") assert.Equal(t, retryIntervalErr, retryIntervalExpect) - config.RetryInterval = 10 - config.TimeoutToFail = 5 - timeoutToFailErr := checkConfig() + conf.RetryInterval = 10 + conf.TimeoutToFail = 5 + timeoutToFailErr := checkConfig(&conf) timeoutToFailExpect := errors.New("TimeoutToFail should not be less than RetryInterval") assert.Equal(t, timeoutToFailErr, timeoutToFailExpect) - config.TimeoutToFail = 20 - driverErr := checkConfig() + conf.TimeoutToFail = 20 + driverErr := checkConfig(&conf) assert.Equal(t, driverErr, nil) - config.Store = Store{Driver: Mysql} - hostErr := checkConfig() + conf.Store = Store{Driver: Mysql} + hostErr := checkConfig(&conf) hostExpect := errors.New("Db host not valid ") assert.Equal(t, hostErr, hostExpect) - config.Store = Store{Driver: Mysql, Host: "127.0.0.1"} - portErr := checkConfig() + conf.Store = Store{Driver: Mysql, Host: "127.0.0.1"} + portErr := checkConfig(&conf) portExpect := errors.New("Db port not valid ") assert.Equal(t, portErr, portExpect) - config.Store = Store{Driver: Mysql, Host: "127.0.0.1", Port: 8686} - userErr := checkConfig() + conf.Store = Store{Driver: Mysql, Host: "127.0.0.1", Port: 8686} + userErr := checkConfig(&conf) userExpect := errors.New("Db user not valid ") assert.Equal(t, userErr, userExpect) + + conf.Store = Store{Driver: Redis, Host: "", Port: 8686} + assert.Equal(t, errors.New("Redis host not valid"), checkConfig(&conf)) + + conf.Store = Store{Driver: Redis, Host: "127.0.0.1", Port: 0} + assert.Equal(t, errors.New("Redis port not valid"), checkConfig(&conf)) + } func TestConfig(t *testing.T) { @@ -61,7 +70,7 @@ func TestConfig(t *testing.T) { func testConfigStringField(fd *string, val string, t *testing.T) { old := *fd *fd = val - str := checkConfig() + str := checkConfig(&Config) assert.NotEqual(t, "", str) *fd = old } @@ -69,7 +78,7 @@ func testConfigStringField(fd *string, val string, t *testing.T) { func testConfigIntField(fd *int64, val int64, t *testing.T) { old := *fd *fd = val - str := checkConfig() + str := checkConfig(&Config) assert.NotEqual(t, "", str) *fd = old } diff --git a/dtmsvr/config/config_utils.go b/dtmsvr/config/config_utils.go index 1dc991f..e5b1785 100644 --- a/dtmsvr/config/config_utils.go +++ b/dtmsvr/config/config_utils.go @@ -55,32 +55,32 @@ func toUnderscoreUpper(key string) string { return strings.ToUpper(s2) } -func checkConfig() error { - if Config.RetryInterval < 10 { +func checkConfig(conf *configType) error { + if conf.RetryInterval < 10 { return errors.New("RetryInterval should not be less than 10") } - if Config.TimeoutToFail < Config.RetryInterval { + if conf.TimeoutToFail < conf.RetryInterval { return errors.New("TimeoutToFail should not be less than RetryInterval") } - switch Config.Store.Driver { + switch conf.Store.Driver { case BoltDb: return nil case Mysql: - if Config.Store.Host == "" { + if conf.Store.Host == "" { return errors.New("Db host not valid ") } - if Config.Store.Port == 0 { + if conf.Store.Port == 0 { return errors.New("Db port not valid ") } - if Config.Store.User == "" { + if conf.Store.User == "" { return errors.New("Db user not valid ") } case Redis: - if Config.Store.Host == "" { - return errors.New("Redis host not valid ") + if conf.Store.Host == "" { + return errors.New("Redis host not valid") } - if Config.Store.Port == 0 { - return errors.New("Redis port not valid ") + if conf.Store.Port == 0 { + return errors.New("Redis port not valid") } } return nil diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index e1f3fa3..80663c9 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -5,9 +5,14 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmutil" ) +type TransGlobalExt struct { + Headers map[string]string `json:"headers,omitempty" gorm:"-"` +} + type TransGlobalStore struct { dtmutil.ModelBase Gid string `json:"gid,omitempty"` @@ -18,7 +23,6 @@ type TransGlobalStore struct { Status string `json:"status,omitempty"` QueryPrepared string `json:"query_prepared,omitempty"` Protocol string `json:"protocol,omitempty"` - CommitTime *time.Time `json:"commit_time,omitempty"` FinishTime *time.Time `json:"finish_time,omitempty"` RollbackTime *time.Time `json:"rollback_time,omitempty"` Options string `json:"options,omitempty"` @@ -26,12 +30,14 @@ type TransGlobalStore struct { NextCronInterval int64 `json:"next_cron_interval,omitempty"` NextCronTime *time.Time `json:"next_cron_time,omitempty"` Owner string `json:"owner,omitempty"` + Ext TransGlobalExt `json:"-" gorm:"-"` + ExtData string `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Options dtmcli.TransOptions } // TableName TableName func (g *TransGlobalStore) TableName() string { - return "dtm.trans_global" + return config.Config.Store.TransGlobalTable } func (g *TransGlobalStore) String() string { @@ -53,7 +59,7 @@ type TransBranchStore struct { // TableName TableName func (b *TransBranchStore) TableName() string { - return "dtm.trans_branch_op" + return config.Config.Store.TransBranchOpTable } func (b *TransBranchStore) String() string { diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 977f44b..50f9b01 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -7,11 +7,13 @@ package dtmsvr import ( + "context" "time" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" "github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/gin-gonic/gin" @@ -69,11 +71,21 @@ func TransFromContext(c *gin.Context) *TransGlobal { } } m.Protocol = "http" + + m.Ext.Headers = map[string]string{} + if len(m.PassthroughHeaders) > 0 { + for _, h := range m.PassthroughHeaders { + v := c.GetHeader(h) + if v != "" { + m.Ext.Headers[h] = v + } + } + } return &m } // TransFromDtmRequest TransFromContext -func TransFromDtmRequest(c *dtmgpb.DtmRequest) *TransGlobal { +func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal { o := &dtmgpb.DtmTransOptions{} if c.TransOptions != nil { o = c.TransOptions @@ -85,13 +97,24 @@ func TransFromDtmRequest(c *dtmgpb.DtmRequest) *TransGlobal { Protocol: "grpc", BinPayloads: c.BinPayloads, TransOptions: dtmcli.TransOptions{ - WaitResult: o.WaitResult, - TimeoutToFail: o.TimeoutToFail, - RetryInterval: o.RetryInterval, + WaitResult: o.WaitResult, + TimeoutToFail: o.TimeoutToFail, + RetryInterval: o.RetryInterval, + PassthroughHeaders: o.PassthroughHeaders, + BranchHeaders: o.BranchHeaders, }, }} if c.Steps != "" { dtmimp.MustUnmarshalString(c.Steps, &r.Steps) } + if len(o.PassthroughHeaders) > 0 { + r.Ext.Headers = map[string]string{} + for _, h := range o.PassthroughHeaders { + v := dtmgimp.GetMetaFromContext(ctx, h) + if v != "" { + r.Ext.Headers[h] = v + } + } + } return &r } diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go index cac4c96..db6243c 100644 --- a/dtmsvr/trans_process.go +++ b/dtmsvr/trans_process.go @@ -26,6 +26,9 @@ func (t *TransGlobal) process(branches []TransBranch) map[string]interface{} { if t.Options != "" { dtmimp.MustUnmarshalString(t.Options, &t.TransOptions) } + if t.ExtData != "" { + dtmimp.MustUnmarshalString(t.ExtData, &t.Ext) + } if !t.WaitResult { go t.processInner(branches) @@ -63,6 +66,10 @@ func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) { func (t *TransGlobal) saveNew() ([]TransBranch, error) { t.NextCronInterval = t.getNextCronInterval(cronReset) t.NextCronTime = dtmutil.GetNextTime(t.NextCronInterval) + t.ExtData = dtmimp.MustMarshalString(t.Ext) + if t.ExtData == "{}" { + t.ExtData = "" + } t.Options = dtmimp.MustMarshalString(t.TransOptions) if t.Options == "{}" { t.Options = "" diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 6f1dea0..c04110a 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -17,6 +17,7 @@ import ( "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtmdriver" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -83,6 +84,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa } conn := dtmgimp.MustGetGrpcConn(server, true) ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "") + kvs := dtmgimp.Map2Kvs(t.Ext.Headers) + kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...) + ctx = metadata.AppendToOutgoingContext(ctx, kvs...) err = conn.Invoke(ctx, method, branchPayload, &[]byte{}) if err == nil { return dtmcli.ResultSuccess, nil @@ -106,6 +110,8 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa "op": op, }). SetHeader("Content-type", "application/json"). + SetHeaders(t.Ext.Headers). + SetHeaders(t.TransOptions.BranchHeaders). Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url) if err != nil { return "", err diff --git a/go.mod b/go.mod index 0d19a2f..306fad3 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/dtm-labs/dtmdriver-polaris v0.0.2 github.com/dtm-labs/dtmdriver-protocol1 v0.0.1 github.com/gin-gonic/gin v1.6.3 - github.com/go-redis/redis/v8 v8.11.4 github.com/go-resty/resty/v2 v2.7.0 github.com/go-sql-driver/mysql v1.6.0 diff --git a/go.sum b/go.sum index 09eb829..d37ef25 100644 --- a/go.sum +++ b/go.sum @@ -509,6 +509,12 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/ychensha/dtmdriver-polaris v0.0.1/go.mod h1:0BdQvxXlGOlF6YVlsDoVvu8jyxdTlJZ9Kyh5t9lRA94= +github.com/yedf/dtm v1.7.2 h1:qGjio5O+Zm5CmMfga9Eb0rsUu9nkNHxGuJwc7PEkMfc= +github.com/yedf/dtm v1.7.2/go.mod h1:R1Q55spqLh7yHMVJhGI8RpS1iG7OvRX99pWZRlsAtME= +github.com/yedf/dtmdriver v0.0.0-20211203060147-29426c663b6e/go.mod h1:aeo6ZWiVI0x8P8O18r6uB1cG2uw9BCQyYZaH15MlRDI= +github.com/yedf/dtmdriver-gozero v0.0.0-20211204083751-a14485949435/go.mod h1:RYtA6oZny6LzlIRb1tPGt5bHfgqws/JaU6ogFly8ByQ= +github.com/yedf/dtmdriver-protocol1 v0.0.0-20211205112411-d7a7052dc90e/go.mod h1:kB3NPnDKSGioVjgdfj6qgbqYJinOml45GnlHqR46Ycc= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/helper/sync-dtmcli.sh b/helper/sync-dtmcli.sh index 3ac42b0..dd70a5a 100755 --- a/helper/sync-dtmcli.sh +++ b/helper/sync-dtmcli.sh @@ -30,8 +30,6 @@ go build || exit 1 git add . git commit -m"update from dtm to version $ver" git push -git tag $ver -git push --tags cd ../dtmgrpc diff --git a/sqls/dtmsvr.storage.mysql.sql b/sqls/dtmsvr.storage.mysql.sql index 59d61dd..09c82f5 100644 --- a/sqls/dtmsvr.storage.mysql.sql +++ b/sqls/dtmsvr.storage.mysql.sql @@ -12,14 +12,14 @@ CREATE TABLE if not EXISTS dtm.trans_global ( `protocol` varchar(45) not null comment '通信协议 http | grpc', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, - `commit_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, - `options` varchar(256) DEFAULT '', + `options` varchar(1024) DEFAULT '', `custom_data` varchar(256) DEFAULT '', `next_cron_interval` int(11) default null comment '下次定时处理的间隔', `next_cron_time` datetime default null comment '下次定时处理的时间', `owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者', + `ext_data` TEXT comment 'global扩展字段的数据', PRIMARY KEY (`id`), UNIQUE KEY `gid` (`gid`), key `owner`(`owner`), diff --git a/sqls/dtmsvr.storage.postgres.sql b/sqls/dtmsvr.storage.postgres.sql index bccb8be..f19d25e 100644 --- a/sqls/dtmsvr.storage.postgres.sql +++ b/sqls/dtmsvr.storage.postgres.sql @@ -13,14 +13,14 @@ CREATE TABLE if not EXISTS dtm.trans_global ( protocol varchar(45) not null, create_time timestamp(0) with time zone DEFAULT NULL, update_time timestamp(0) with time zone DEFAULT NULL, - commit_time timestamp(0) with time zone DEFAULT NULL, finish_time timestamp(0) with time zone DEFAULT NULL, rollback_time timestamp(0) with time zone DEFAULT NULL, - options varchar(256) DEFAULT '', + options varchar(1024) DEFAULT '', custom_data varchar(256) DEFAULT '', next_cron_interval int default null, next_cron_time timestamp(0) with time zone default null, owner varchar(128) not null default '', + ext_data text, PRIMARY KEY (id), CONSTRAINT gid UNIQUE (gid) ); diff --git a/test/busi/base_grpc.go b/test/busi/base_grpc.go index dcfca4e..6fd3790 100644 --- a/test/busi/base_grpc.go +++ b/test/busi/base_grpc.go @@ -9,6 +9,7 @@ package busi import ( "context" "database/sql" + "errors" "fmt" "net" @@ -123,3 +124,19 @@ func (s *busiServer) TransInTccNested(ctx context.Context, in *BusiReq) (*emptyp func (s *busiServer) XaNotify(ctx context.Context, in *emptypb.Empty) (*emptypb.Empty, error) { return XaGrpcClient.HandleCallback(ctx) } + +func (s *busiServer) TransOutHeaderYes(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + meta := dtmgimp.GetMetaFromContext(ctx, "test_header") + if meta == "" { + return &emptypb.Empty{}, errors.New("no header found in HeaderYes") + } + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutResult.Fetch(), in.TransOutResult, dtmimp.GetFuncName()) +} + +func (s *busiServer) TransOutHeaderNo(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + meta := dtmgimp.GetMetaFromContext(ctx, "test_header") + if meta != "" { + return &emptypb.Empty{}, errors.New("header found in HeaderNo") + } + return &emptypb.Empty{}, nil +} diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 4518d0e..fe47dd7 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -154,4 +154,18 @@ func BaseAddRoute(app *gin.Engine) { app.POST(BusiAPI+"/TccBSleepCancel", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { return sleepCancelHandler(c) })) + app.POST(BusiAPI+"/TransOutHeaderYes", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { + h := c.GetHeader("test_header") + if h == "" { + return nil, errors.New("no test_header found in TransOutHeaderYes") + } + return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut") + })) + app.POST(BusiAPI+"/TransOutHeaderNo", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) { + h := c.GetHeader("test_header") + if h != "" { + return nil, errors.New("test_header found in TransOutHeaderNo") + } + return dtmcli.MapSuccess, nil + })) } diff --git a/test/busi/busi.pb.go b/test/busi/busi.pb.go index fa73c00..cfa8766 100644 --- a/test/busi/busi.pb.go +++ b/test/busi/busi.pb.go @@ -148,7 +148,7 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x32, 0xd3, 0x07, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x2d, 0x0a, 0x09, 0x43, 0x61, + 0x65, 0x32, 0xce, 0x08, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x2d, 0x0a, 0x09, 0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, 0x61, @@ -209,8 +209,16 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, - 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x4f, 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x59, 0x65, 0x73, 0x12, 0x0d, 0x2e, 0x62, + 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, + 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, + 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -249,25 +257,29 @@ var file_test_busi_busi_proto_depIdxs = []int32{ 0, // 14: busi.Busi.TransOutBSaga:input_type -> busi.BusiReq 0, // 15: busi.Busi.TransInRevertBSaga:input_type -> busi.BusiReq 0, // 16: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq - 1, // 17: busi.Busi.CanSubmit:output_type -> busi.BusiReply - 2, // 18: busi.Busi.TransIn:output_type -> google.protobuf.Empty - 2, // 19: busi.Busi.TransOut:output_type -> google.protobuf.Empty - 2, // 20: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty - 2, // 21: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 2, // 22: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty - 2, // 23: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty - 2, // 24: busi.Busi.XaNotify:output_type -> google.protobuf.Empty - 2, // 25: busi.Busi.TransInXa:output_type -> google.protobuf.Empty - 2, // 26: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty - 2, // 27: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty - 2, // 28: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty - 2, // 29: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty - 2, // 30: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty - 2, // 31: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty - 2, // 32: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty - 2, // 33: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty - 17, // [17:34] is the sub-list for method output_type - 0, // [0:17] is the sub-list for method input_type + 0, // 17: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq + 0, // 18: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq + 1, // 19: busi.Busi.CanSubmit:output_type -> busi.BusiReply + 2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty + 2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty + 2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty + 2, // 23: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 2, // 24: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty + 2, // 25: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty + 2, // 26: busi.Busi.XaNotify:output_type -> google.protobuf.Empty + 2, // 27: busi.Busi.TransInXa:output_type -> google.protobuf.Empty + 2, // 28: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty + 2, // 29: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty + 2, // 30: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty + 2, // 31: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty + 2, // 32: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty + 2, // 33: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty + 2, // 34: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty + 2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty + 2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty + 2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty + 19, // [19:38] is the sub-list for method output_type + 0, // [0:19] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/test/busi/busi.proto b/test/busi/busi.proto index 0e421b0..9d43abd 100644 --- a/test/busi/busi.proto +++ b/test/busi/busi.proto @@ -36,5 +36,7 @@ service Busi { rpc TransOutBSaga(BusiReq) returns (google.protobuf.Empty) {} rpc TransInRevertBSaga(BusiReq) returns (google.protobuf.Empty) {} rpc TransOutRevertBSaga(BusiReq) returns (google.protobuf.Empty) {} + rpc TransOutHeaderYes(BusiReq) returns (google.protobuf.Empty) {} + rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {} } diff --git a/test/busi/busi_grpc.pb.go b/test/busi/busi_grpc.pb.go index bc612b1..095b295 100644 --- a/test/busi/busi_grpc.pb.go +++ b/test/busi/busi_grpc.pb.go @@ -36,6 +36,8 @@ type BusiClient interface { TransOutBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransInRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -199,6 +201,24 @@ func (c *busiClient) TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts return out, nil } +func (c *busiClient) TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/TransOutHeaderYes", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/TransOutHeaderNo", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility @@ -220,6 +240,8 @@ type BusiServer interface { TransOutBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) TransInRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) + TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error) + TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -278,6 +300,12 @@ func (UnimplementedBusiServer) TransInRevertBSaga(context.Context, *BusiReq) (*e func (UnimplementedBusiServer) TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOutRevertBSaga not implemented") } +func (UnimplementedBusiServer) TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderYes not implemented") +} +func (UnimplementedBusiServer) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderNo not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -597,6 +625,42 @@ func _Busi_TransOutRevertBSaga_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _Busi_TransOutHeaderYes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutHeaderYes(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/TransOutHeaderYes", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutHeaderYes(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransOutHeaderNo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutHeaderNo(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/TransOutHeaderNo", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutHeaderNo(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -672,6 +736,14 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "TransOutRevertBSaga", Handler: _Busi_TransOutRevertBSaga_Handler, }, + { + MethodName: "TransOutHeaderYes", + Handler: _Busi_TransOutHeaderYes_Handler, + }, + { + MethodName: "TransOutHeaderNo", + Handler: _Busi_TransOutHeaderNo_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "test/busi/busi.proto", diff --git a/test/busi/utils.go b/test/busi/utils.go index 2b13712..5df72d1 100644 --- a/test/busi/utils.go +++ b/test/busi/utils.go @@ -4,27 +4,32 @@ import ( "context" "database/sql" "fmt" + "strings" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmgrpc" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" "github.com/dtm-labs/dtm/dtmutil" "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) func dbGet() *dtmutil.DB { return dtmutil.DbGet(BusiConf) } -func sdbGet() *sql.DB { +func pdbGet() *sql.DB { db, err := dtmimp.PooledDB(BusiConf) logger.FatalIfError(err) return db } func txGet() *sql.Tx { - db := sdbGet() + db := pdbGet() tx, err := db.Begin() logger.FatalIfError(err) return tx @@ -59,3 +64,22 @@ func MustBarrierFromGrpc(ctx context.Context) *dtmcli.BranchBarrier { logger.FatalIfError(err) return ti } + +// SetGrpcHeaderForHeadersYes interceptor to set head for HeadersYes +func SetGrpcHeaderForHeadersYes(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if r, ok := req.(*dtmgpb.DtmRequest); ok && strings.HasSuffix(r.Gid, "HeadersYes") { + logger.Debugf("writing test_header:test to ctx") + md := metadata.New(map[string]string{"test_header": "test"}) + ctx = metadata.NewOutgoingContext(ctx, md) + } + return invoker(ctx, method, req, reply, cc, opts...) +} + +// SetHttpHeaderForHeadersYes interceptor to set head for HeadersYes +func SetHttpHeaderForHeadersYes(c *resty.Client, r *resty.Request) error { + if b, ok := r.Body.(*dtmcli.Saga); ok && strings.HasSuffix(b.Gid, "HeadersYes") { + logger.Debugf("set test_header for url: %s", r.URL) + r.SetHeader("test_header", "yes") + } + return nil +} diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index 7af9144..28da6e2 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmutil" @@ -45,7 +44,7 @@ func TestUpdateBranchAsync(t *testing.T) { } conf.UpdateBranchSync = 0 saga := genSaga1(dtmimp.GetFuncName(), false, false) - saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) + saga.WaitResult = true err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) diff --git a/test/main_test.go b/test/main_test.go index e7d7ca9..31bf4be 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -13,6 +13,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc" "github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/test/busi" @@ -33,6 +34,9 @@ func TestMain(m *testing.M) { dtmsvr.CronForwardDuration = 180 * time.Second conf.UpdateBranchSync = 1 + dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes) + dtmcli.OnBeforeRequest(busi.SetHttpHeaderForHeadersYes) + tenv := os.Getenv("TEST_STORE") if tenv == "boltdb" { conf.Store.Driver = "boltdb" diff --git a/test/msg_grpc_test.go b/test/msg_grpc_test.go index 44af2ab..e228fbb 100644 --- a/test/msg_grpc_test.go +++ b/test/msg_grpc_test.go @@ -38,7 +38,8 @@ func TestMsgGrpcTimeoutSuccess(t *testing.T) { cronTransOnceForwardNow(180) assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, msg.Gid, g) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) } diff --git a/test/msg_options_test.go b/test/msg_options_test.go index f69b8c9..97cfbe3 100644 --- a/test/msg_options_test.go +++ b/test/msg_options_test.go @@ -18,7 +18,8 @@ import ( func TestMsgOptionsTimeout(t *testing.T) { msg := genMsg(dtmimp.GetFuncName()) msg.Prepare("") - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, msg.Gid, g) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) cronTransOnceForwardNow(60) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) @@ -28,7 +29,8 @@ func TestMsgOptionsTimeoutCustom(t *testing.T) { msg := genMsg(dtmimp.GetFuncName()) msg.TimeoutToFail = 120 msg.Prepare("") - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, msg.Gid, g) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) cronTransOnceForwardNow(60) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) @@ -40,7 +42,8 @@ func TestMsgOptionsTimeoutFailed(t *testing.T) { msg := genMsg(dtmimp.GetFuncName()) msg.TimeoutToFail = 120 msg.Prepare("") - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, msg.Gid, g) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) cronTransOnceForwardNow(60) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) diff --git a/test/msg_test.go b/test/msg_test.go index fa29923..e9c1475 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -35,7 +35,8 @@ func TestMsgTimeoutSuccess(t *testing.T) { busi.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing) cronTransOnceForwardNow(180) assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, msg.Gid, g) assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) } @@ -60,7 +61,7 @@ func TestMsgAbnormal(t *testing.T) { assert.Nil(t, err) err = msg.Submit() assert.Nil(t, err) - + waitTransProcessed(msg.Gid) err = msg.Prepare("") assert.Error(t, err) } diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index 7ae5635..e8c32c0 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -34,7 +34,8 @@ func TestSagaConRollbackNormal(t *testing.T) { assert.Nil(t, err) waitTransProcessed(sagaCon.Gid) assert.Equal(t, StatusAborting, getTransStatus(sagaCon.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, sagaCon.Gid, g) assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid)) // TODO should fix this // assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusSucceed, StatusSucceed}, getBranchesStatus(sagaCon.Gid)) @@ -58,7 +59,8 @@ func TestSagaConCommittedOngoing(t *testing.T) { assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(sagaCon.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, sagaCon.Gid, g) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(sagaCon.Gid)) } diff --git a/test/saga_cover_test.go b/test/saga_cover_test.go new file mode 100644 index 0000000..500f424 --- /dev/null +++ b/test/saga_cover_test.go @@ -0,0 +1,11 @@ +package test + +import ( + "testing" + + "github.com/dtm-labs/dtm/dtmcli" +) + +func TestSagaCover(t *testing.T) { + dtmcli.SetPassthroughHeaders([]string{}) +} diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index 831c649..74fd827 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -31,7 +31,8 @@ func TestSagaGrpcRollback(t *testing.T) { saga.Submit() waitTransProcessed(saga.Gid) assert.Equal(t, StatusAborting, getTransStatus(saga.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, saga.Gid, g) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) } @@ -62,14 +63,15 @@ func TestSagaGrpcCommittedOngoing(t *testing.T) { waitTransProcessed(saga.Gid) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, saga.Gid, g) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) } func TestSagaGrpcNormalWait(t *testing.T) { saga := genSagaGrpc(dtmimp.GetFuncName(), false, false) - saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) + saga.WaitResult = true saga.Submit() assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) @@ -94,3 +96,69 @@ func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga.Add(busi.BusiGrpc+"/busi.Busi/TransIn", busi.BusiGrpc+"/busi.Busi/TransInRevert", req) return saga } + +func TestSagaGrpcPassthroughHeadersYes(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes) + sagaYes.WaitResult = true + sagaYes.PassthroughHeaders = []string{"test_header"} + sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil) + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) +} + +func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes) + sagaYes.PassthroughHeaders = []string{"test_header"} + sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil) + busi.MainSwitch.TransOutResult.SetOnce("ONGOING") + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) + assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) + g := cronTransOnce() + assert.Equal(t, gidYes, g) + assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) +} + +func TestSagaGrpcPassthroughHeadersNo(t *testing.T) { + gidNo := dtmimp.GetFuncName() + sagaNo := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidNo) + sagaNo.WaitResult = true + sagaNo.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderNo", "", nil) + err := sagaNo.Submit() + assert.Nil(t, err) + waitTransProcessed(gidNo) +} + +func TestSagaGrpcHeaders(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes). + Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil) + sagaYes.BranchHeaders = map[string]string{ + "test_header": "test", + } + sagaYes.WaitResult = true + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) +} + +func TestSagaGrpcCronHeaders(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes) + sagaYes.BranchHeaders = map[string]string{ + "test_header": "test", + } + sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil) + busi.MainSwitch.TransOutResult.SetOnce("ONGOING") + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) + assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) + g := cronTransOnce() + assert.Equal(t, gidYes, g) + assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) +} diff --git a/test/saga_options_test.go b/test/saga_options_test.go index bbd1689..73ecc24 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -11,6 +11,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" ) @@ -22,7 +23,8 @@ func TestSagaOptionsRetryOngoing(t *testing.T) { err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, saga.Gid, g) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) } @@ -34,10 +36,12 @@ func TestSagaOptionsRetryError(t *testing.T) { err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) - cronTransOnce() assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) - cronTransOnceForwardCron(360) + g := cronTransOnce() + assert.Equal(t, "", g) + g = cronTransOnceForwardCron(360) + assert.Equal(t, saga.Gid, g) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) } @@ -55,7 +59,7 @@ func TestSagaOptionsTimeout(t *testing.T) { func TestSagaOptionsNormalWait(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) - saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) + saga.WaitResult = true err := saga.Submit() assert.Nil(t, err) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) @@ -66,23 +70,90 @@ func TestSagaOptionsNormalWait(t *testing.T) { func TestSagaOptionsCommittedOngoingWait(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) - saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) + saga.WaitResult = true err := saga.Submit() assert.Error(t, err) assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) waitTransProcessed(saga.Gid) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, saga.Gid, g) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } func TestSagaOptionsRollbackWait(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, true) - saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) + saga.WaitResult = true err := saga.Submit() assert.Error(t, err) waitTransProcessed(saga.Gid) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) } + +func TestSagaPassthroughHeadersYes(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes) + sagaYes.WaitResult = true + sagaYes.PassthroughHeaders = []string{"test_header"} + sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil) + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) +} + +func TestSagaCronPassthroughHeadersYes(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes) + sagaYes.PassthroughHeaders = []string{"test_header"} + sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil) + busi.MainSwitch.TransOutResult.SetOnce("ONGOING") + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) + assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) + g := cronTransOnce() + assert.Equal(t, gidYes, g) + assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) +} + +func TestSagaPassthroughHeadersNo(t *testing.T) { + gidNo := dtmimp.GetFuncName() + sagaNo := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidNo) + sagaNo.WaitResult = true + sagaNo.Add(busi.Busi+"/TransOutHeaderNo", "", nil) + err := sagaNo.Submit() + assert.Nil(t, err) + waitTransProcessed(gidNo) +} + +func TestSagaHeaders(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes) + sagaYes.BranchHeaders = map[string]string{ + "test_header": "test", + } + sagaYes.WaitResult = true + sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil) + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) +} + +func TestSagaHeadersYes1(t *testing.T) { + gidYes := dtmimp.GetFuncName() + sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes) + sagaYes.BranchHeaders = map[string]string{ + "test_header": "test", + } + sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil) + busi.MainSwitch.TransOutResult.SetOnce("ONGOING") + err := sagaYes.Submit() + assert.Nil(t, err) + waitTransProcessed(gidYes) + assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) + g := cronTransOnce() + assert.Equal(t, gidYes, g) + assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) +} diff --git a/test/saga_test.go b/test/saga_test.go index 57d3a85..0fa3a97 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -31,7 +31,8 @@ func TestSagaOngoingSucceed(t *testing.T) { waitTransProcessed(saga.Gid) assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, saga.Gid, g) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } @@ -43,15 +44,18 @@ func TestSagaFailed(t *testing.T) { assert.Nil(t, err) waitTransProcessed(saga.Gid) assert.Equal(t, StatusAborting, getTransStatus(saga.Gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, saga.Gid, g) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) } func TestSagaAbnormal(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) + busi.MainSwitch.TransOutResult.SetOnce("ONGOING") err := saga.Submit() assert.Nil(t, err) + waitTransProcessed(saga.Gid) err = saga.Submit() // submit twice, ignored assert.Nil(t, err) waitTransProcessed(saga.Gid) diff --git a/test/tcc_cover_test.go b/test/tcc_cover_test.go index 9ca4285..3683f23 100644 --- a/test/tcc_cover_test.go +++ b/test/tcc_cover_test.go @@ -27,4 +27,5 @@ func TestTccCoverPanic(t *testing.T) { assert.FailNow(t, "not executed") }) assert.Contains(t, err.Error(), "user panic") + waitTransProcessed(gid) } diff --git a/test/tcc_grpc_cover_test.go b/test/tcc_grpc_cover_test.go index 6056b9e..192d491 100644 --- a/test/tcc_grpc_cover_test.go +++ b/test/tcc_grpc_cover_test.go @@ -28,6 +28,7 @@ func TestTccGrpcCoverPanic(t *testing.T) { assert.FailNow(t, "not executed") }) assert.Contains(t, err.Error(), "user panic") + waitTransProcessed(gid) } func TestTccGrpcCoverCallBranch(t *testing.T) { @@ -46,4 +47,6 @@ func TestTccGrpcCoverCallBranch(t *testing.T) { return err }) assert.Error(t, err) + g := cronTransOnceForwardNow(300) + assert.Equal(t, gid, g) } diff --git a/test/tcc_grpc_test.go b/test/tcc_grpc_test.go index a9fc40b..b3a8169 100644 --- a/test/tcc_grpc_test.go +++ b/test/tcc_grpc_test.go @@ -49,7 +49,8 @@ func TestTccGrpcRollback(t *testing.T) { assert.Error(t, err) waitTransProcessed(gid) assert.Equal(t, StatusAborting, getTransStatus(gid)) - cronTransOnce() + g2 := cronTransOnce() + assert.Equal(t, gid, g2) assert.Equal(t, StatusFailed, getTransStatus(gid)) assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) } @@ -76,3 +77,22 @@ func TestTccGrpcType(t *testing.T) { err = dtmgrpc.TccGlobalTransaction("-", "", func(tcc *dtmgrpc.TccGrpc) error { return nil }) assert.Error(t, err) } + +func TestTccGrpcHeaders(t *testing.T) { + gid := dtmimp.GetFuncName() + err := dtmgrpc.TccGlobalTransaction2(dtmutil.DefaultGrpcServer, gid, func(tg *dtmgrpc.TccGrpc) { + tg.BranchHeaders = map[string]string{ + "test_header": "test", + } + tg.WaitResult = true + }, func(tcc *dtmgrpc.TccGrpc) error { + data := &busi.BusiReq{Amount: 30} + r := &emptypb.Empty{} + return tcc.CallBranch(data, busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", "", r) + }) + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) + assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid)) + +} diff --git a/test/tcc_test.go b/test/tcc_test.go index 3766c59..4324769 100644 --- a/test/tcc_test.go +++ b/test/tcc_test.go @@ -43,7 +43,8 @@ func TestTccRollback(t *testing.T) { assert.Error(t, err) waitTransProcessed(gid) assert.Equal(t, StatusAborting, getTransStatus(gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, gid, g) assert.Equal(t, StatusFailed, getTransStatus(gid)) assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) } @@ -84,3 +85,19 @@ func TestTccCompatible(t *testing.T) { assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(gid)) } + +func TestTccHeaders(t *testing.T) { + req := busi.GenTransReq(30, false, false) + gid := dtmimp.GetFuncName() + err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultHttpServer, gid, func(t *dtmcli.Tcc) { + t.BranchHeaders = map[string]string{ + "test_header": "test", + } + }, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + return tcc.CallBranch(req, Busi+"/TransOutHeaderYes", "", "") + }) + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) + assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid)) +} diff --git a/test/types.go b/test/types.go index 8ed65c8..4fcbe34 100644 --- a/test/types.go +++ b/test/types.go @@ -29,21 +29,19 @@ func waitTransProcessed(gid string) { logger.Debugf("waiting for gid %s", gid) select { case id := <-dtmsvr.TransProcessedTestChan: - for id != gid { - logger.Warnf("------- expecting: %s but %s found", gid, id) - id = <-dtmsvr.TransProcessedTestChan - } + logger.FatalfIf(id != gid, "------- expecting: %s but %s found", gid, id) logger.Debugf("finish for gid %s", gid) case <-time.After(time.Duration(time.Second * 3)): logger.FatalfIf(true, "Wait Trans timeout") } } -func cronTransOnce() { +func cronTransOnce() string { gid := dtmsvr.CronTransOnce() if dtmsvr.TransProcessedTestChan != nil && gid != "" { waitTransProcessed(gid) } + return gid } var e2p = dtmimp.E2P @@ -54,18 +52,20 @@ type TransGlobal = dtmsvr.TransGlobal // TransBranch alias type TransBranch = dtmsvr.TransBranch -func cronTransOnceForwardNow(seconds int) { +func cronTransOnceForwardNow(seconds int) string { old := dtmsvr.NowForwardDuration dtmsvr.NowForwardDuration = time.Duration(seconds) * time.Second - cronTransOnce() + gid := cronTransOnce() dtmsvr.NowForwardDuration = old + return gid } -func cronTransOnceForwardCron(seconds int) { +func cronTransOnceForwardCron(seconds int) string { old := dtmsvr.CronForwardDuration dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second - cronTransOnce() + gid := cronTransOnce() dtmsvr.CronForwardDuration = old + return gid } const ( diff --git a/test/xa_cover_test.go b/test/xa_cover_test.go index fddada6..0103539 100644 --- a/test/xa_cover_test.go +++ b/test/xa_cover_test.go @@ -43,7 +43,7 @@ func TestXaCoverDTMError(t *testing.T) { } func TestXaCoverGidError(t *testing.T) { - gid := "errgid-' '" + gid := dtmimp.GetFuncName() + "-' '" err := getXc().XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := busi.GenTransReq(30, false, false) _, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") @@ -51,4 +51,5 @@ func TestXaCoverGidError(t *testing.T) { return nil, err }) assert.Error(t, err) + waitTransProcessed(gid) } diff --git a/test/xa_grpc_test.go b/test/xa_grpc_test.go index 586e3db..890d129 100644 --- a/test/xa_grpc_test.go +++ b/test/xa_grpc_test.go @@ -56,6 +56,7 @@ func TestXaGrpcRollback(t *testing.T) { } func TestXaGrpcType(t *testing.T) { + gid := dtmimp.GetFuncName() _, err := dtmgrpc.XaGrpcFromRequest(context.Background()) assert.Error(t, err) @@ -63,15 +64,18 @@ func TestXaGrpcType(t *testing.T) { assert.Error(t, err) err = dtmimp.CatchP(func() { - busi.XaGrpcClient.XaGlobalTransaction("id1", func(xa *dtmgrpc.XaGrpc) error { panic(fmt.Errorf("hello")) }) + busi.XaGrpcClient.XaGlobalTransaction(gid, func(xa *dtmgrpc.XaGrpc) error { panic(fmt.Errorf("hello")) }) }) assert.Error(t, err) + waitTransProcessed(gid) } func TestXaGrpcLocalError(t *testing.T) { + gid := dtmimp.GetFuncName() xc := busi.XaGrpcClient - err := xc.XaGlobalTransaction(dtmimp.GetFuncName(), func(xa *dtmgrpc.XaGrpc) error { + err := xc.XaGlobalTransaction(gid, func(xa *dtmgrpc.XaGrpc) error { return fmt.Errorf("an error") }) assert.Error(t, err, fmt.Errorf("an error")) + waitTransProcessed(gid) } diff --git a/test/xa_test.go b/test/xa_test.go index 0515c02..fcb460c 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -118,7 +118,8 @@ func TestXaNotTimeout(t *testing.T) { assert.Nil(t, err) waitTransProcessed(gid) assert.Equal(t, StatusSubmitted, getTransStatus(gid)) - cronTransOnce() + g := cronTransOnce() + assert.Equal(t, gid, g) assert.Equal(t, StatusSucceed, getTransStatus(gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid)) }