Browse Source

support workflow pattern

pull/328/head
yedf2 4 years ago
parent
commit
af0e9f10ed
  1. 0
      dtmcli/cover_test.go
  2. 63
      dtmcli/dtmimp/trans_base.go
  3. 28
      dtmcli/dtmimp/vars.go
  4. 6
      dtmcli/trans_msg.go
  5. 2
      dtmcli/trans_saga.go
  6. 6
      dtmcli/trans_tcc.go
  7. 46
      dtmcli/types.go
  8. 60
      dtmcli/utils.go
  9. 2
      dtmcli/xa.go
  10. 16
      dtmgrpc/dtmgimp/types.go
  11. 21
      dtmgrpc/dtmgimp/utils.go
  12. 333
      dtmgrpc/dtmgpb/dtmgimp.pb.go
  13. 16
      dtmgrpc/dtmgpb/dtmgimp.proto
  14. 38
      dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go
  15. 15
      dtmgrpc/type.go
  16. 53
      dtmgrpc/workflow/dummyReadCloser.go
  17. 46
      dtmgrpc/workflow/factory.go
  18. 191
      dtmgrpc/workflow/imp.go
  19. 78
      dtmgrpc/workflow/rpc.go
  20. 26
      dtmgrpc/workflow/server.go
  21. 138
      dtmgrpc/workflow/utils.go
  22. 153
      dtmgrpc/workflow/wfpb/wf.pb.go
  23. 15
      dtmgrpc/workflow/wfpb/wf.proto
  24. 106
      dtmgrpc/workflow/wfpb/wf_grpc.pb.go
  25. 190
      dtmgrpc/workflow/workflow.go
  26. 7
      dtmsvr/api.go
  27. 16
      dtmsvr/api_grpc.go
  28. 6
      dtmsvr/api_http.go
  29. 2
      dtmsvr/cron.go
  30. 19
      dtmsvr/storage/boltdb/boltdb.go
  31. 10
      dtmsvr/storage/trans.go
  32. 2
      dtmsvr/trans_class.go
  33. 5
      dtmsvr/trans_process.go
  34. 43
      dtmsvr/trans_type_workflow.go
  35. 10
      helper/test-cover.sh
  36. 4
      sqls/dtmsvr.storage.mysql.sql
  37. 2
      sqls/dtmsvr.storage.postgres.sql
  38. 2
      sqls/dtmsvr.storage.tdsql.sql
  39. 24
      test/busi/base_grpc.go
  40. 7
      test/busi/base_http.go
  41. 12
      test/busi/base_workflow.go
  42. 16
      test/busi/data.go
  43. 27
      test/busi/startup.go
  44. 2
      test/busi/utils.go
  45. 3
      test/main_test.go
  46. 2
      test/msg_jrpc_test.go
  47. 265
      test/workflow_test.go

0
dtmcli/trans_test.go → dtmcli/cover_test.go

63
dtmcli/dtmimp/trans_base.go

@ -93,39 +93,28 @@ func TransBaseFromQuery(qs url.Values) *TransBase {
return NewTransBase(EscapeGet(qs, "gid"), EscapeGet(qs, "trans_type"), EscapeGet(qs, "dtm"), EscapeGet(qs, "branch_id"))
}
// TransCallDtm TransBase call dtm
func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
// TransCallDtmExt TransBase call dtm
func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
if tb.Protocol == Jrpc {
return transCallDtmJrpc(tb, body, operation)
}
if tb.RequestTimeout != 0 {
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
}
if tb.Protocol == Jrpc {
var result map[string]interface{}
resp, err := RestyClient.R().
SetBody(map[string]interface{}{
"jsonrpc": "2.0",
"id": "no-use",
"method": operation,
"params": body,
}).
SetResult(&result).
Post(tb.Dtm)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK || result["error"] != nil {
return errors.New(resp.String())
}
return nil
}
resp, err := RestyClient.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
if err != nil {
return err
return nil, err
}
if resp.StatusCode() != http.StatusOK || strings.Contains(resp.String(), ResultFailure) {
return errors.New(resp.String())
return nil, errors.New(resp.String())
}
return nil
return resp, nil
}
func TransCallDtm(tb *TransBase, operation string) error {
_, err := TransCallDtmExt(tb, tb, operation)
return err
}
// TransRegisterBranch TransBase register a branch to dtm
@ -137,7 +126,8 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
for k, v := range added {
m[k] = v
}
return TransCallDtm(tb, m, operation)
_, err := TransCallDtmExt(tb, m, operation)
return err
}
// TransRequestBranch TransBase request branch result
@ -165,3 +155,26 @@ func TransRequestBranch(t *TransBase, method string, body interface{}, branchID
}
return resp, err
}
func transCallDtmJrpc(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
if tb.RequestTimeout != 0 {
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
}
var result map[string]interface{}
resp, err := RestyClient.R().
SetBody(map[string]interface{}{
"jsonrpc": "2.0",
"id": "no-use",
"method": operation,
"params": body,
}).
SetResult(&result).
Post(tb.Dtm)
if err != nil {
return nil, err
}
if resp.StatusCode() != http.StatusOK || result["error"] != nil {
return nil, errors.New(resp.String())
}
return resp, nil
}

28
dtmcli/dtmimp/vars.go

@ -42,17 +42,21 @@ var PassthroughHeaders = []string{}
// BarrierTableName the table name of barrier table
var BarrierTableName = "dtm_barrier.barrier"
func BeforeRequest(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL)
u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL)
logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), u)
r.URL = u
return err
}
func AfterResponse(c *resty.Client, resp *resty.Response) error {
r := resp.Request
logger.Debugf("requested: %d %s %s %s", resp.StatusCode(), r.Method, r.URL, resp.String())
return nil
}
func init() {
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL)
u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL)
logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), u)
r.URL = u
return err
})
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
r := resp.Request
logger.Debugf("requested: %s %s %s", r.Method, r.URL, resp.String())
return nil
})
RestyClient.OnBeforeRequest(BeforeRequest)
RestyClient.OnAfterResponse(AfterResponse)
}

6
dtmcli/msg.go → dtmcli/trans_msg.go

@ -40,13 +40,13 @@ func (s *Msg) SetDelay(delay uint64) *Msg {
// Prepare prepare the msg, msg will later be submitted
func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
return dtmimp.TransCallDtm(&s.TransBase, s, "prepare")
return dtmimp.TransCallDtm(&s.TransBase, "prepare")
}
// Submit submit the msg
func (s *Msg) Submit() error {
s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
return dtmimp.TransCallDtm(&s.TransBase, "submit")
}
// DoAndSubmitDB short method for Do on db type. please see DoAndSubmit
@ -72,7 +72,7 @@ func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier)
_, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared)
}
if errors.Is(errb, ErrFailure) || errors.Is(err, ErrFailure) {
_ = dtmimp.TransCallDtm(&s.TransBase, s, "abort")
_ = dtmimp.TransCallDtm(&s.TransBase, "abort")
} else if err == nil {
err = s.Submit()
}

2
dtmcli/saga.go → dtmcli/trans_saga.go

@ -43,7 +43,7 @@ func (s *Saga) SetConcurrent() *Saga {
// Submit submit the saga trans
func (s *Saga) Submit() error {
s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
return dtmimp.TransCallDtm(&s.TransBase, "submit")
}
// BuildCustomOptions add custom options to the request context

6
dtmcli/tcc.go → dtmcli/trans_tcc.go

@ -34,14 +34,14 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
func TccGlobalTransaction2(dtm string, gid string, custom func(*Tcc), tccFunc TccGlobalFunc) (rerr error) {
tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")}
custom(tcc)
rerr = dtmimp.TransCallDtm(&tcc.TransBase, tcc, "prepare")
rerr = dtmimp.TransCallDtm(&tcc.TransBase, "prepare")
if rerr != nil {
return rerr
}
defer dtmimp.DeferDo(&rerr, func() error {
return dtmimp.TransCallDtm(&tcc.TransBase, tcc, "submit")
return dtmimp.TransCallDtm(&tcc.TransBase, "submit")
}, func() error {
return dtmimp.TransCallDtm(&tcc.TransBase, tcc, "abort")
return dtmimp.TransCallDtm(&tcc.TransBase, "abort")
})
_, rerr = tccFunc(tcc)
return

46
dtmcli/types.go

@ -7,24 +7,10 @@
package dtmcli
import (
"errors"
"fmt"
"net/http"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/go-resty/resty/v2"
)
// MustGenGid generate a new gid
func MustGenGid(server string) string {
res := map[string]string{}
resp, err := dtmimp.RestyClient.R().SetResult(&res).Get(server + "/newGid")
if err != nil || res["gid"] == "" {
panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp))
}
return res["gid"]
}
// DB interface
type DB = dtmimp.DB
@ -34,16 +20,6 @@ type TransOptions = dtmimp.TransOptions
// DBConf declares db configuration
type DBConf = dtmimp.DBConf
// String2DtmError translate string to dtm error
func String2DtmError(str string) error {
return map[string]error{
ResultFailure: ErrFailure,
ResultOngoing: ErrOngoing,
ResultSuccess: nil,
"": nil,
}[str]
}
// SetCurrentDBType set currentDBType
func SetCurrentDBType(dbType string) {
dtmimp.SetCurrentDBType(dbType)
@ -81,25 +57,3 @@ func GetRestyClient() *resty.Client {
func SetPassthroughHeaders(headers []string) {
dtmimp.PassthroughHeaders = headers
}
// Result2HttpJSON return the http code and json result
// if result is error, the return proper code, else return StatusOK
func Result2HttpJSON(result interface{}) (code int, res interface{}) {
err, _ := result.(error)
if err == nil {
code = http.StatusOK
res = result
} else {
res = map[string]string{
"error": err.Error(),
}
if errors.Is(err, ErrFailure) {
code = http.StatusConflict
} else if errors.Is(err, ErrOngoing) {
code = http.StatusTooEarly
} else if err != nil {
code = http.StatusInternalServerError
}
}
return
}

60
dtmcli/utils.go

@ -0,0 +1,60 @@
package dtmcli
import (
"errors"
"fmt"
"net/http"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/go-resty/resty/v2"
)
// MustGenGid generate a new gid
func MustGenGid(server string) string {
res := map[string]string{}
resp, err := dtmimp.RestyClient.R().SetResult(&res).Get(server + "/newGid")
if err != nil || res["gid"] == "" {
panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp))
}
return res["gid"]
}
// String2DtmError translate string to dtm error
func String2DtmError(str string) error {
return map[string]error{
ResultFailure: ErrFailure,
ResultOngoing: ErrOngoing,
ResultSuccess: nil,
"": nil,
}[str]
}
// Result2HttpJSON return the http code and json result
// if result is error, the return proper code, else return StatusOK
func Result2HttpJSON(result interface{}) (code int, res interface{}) {
err, _ := result.(error)
if err == nil {
code = http.StatusOK
res = result
} else {
res = map[string]string{
"error": err.Error(),
}
if errors.Is(err, ErrFailure) {
code = http.StatusConflict
} else if errors.Is(err, ErrOngoing) {
code = http.StatusTooEarly
} else if err != nil {
code = http.StatusInternalServerError
}
}
return
}
func IsRollback(resp *resty.Response, err error) bool {
return err == ErrFailure || dtmimp.RespAsErrorCompatible(resp) == ErrFailure
}
func IsOngoing(resp *resty.Response, err error) bool {
return err == ErrOngoing || dtmimp.RespAsErrorCompatible(resp) == ErrOngoing
}

2
dtmcli/xa.go

@ -69,7 +69,7 @@ func XaGlobalTransaction2(server string, gid string, custom func(*Xa), xaFunc Xa
xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", server, "")}
custom(xa)
return dtmimp.XaHandleGlobalTrans(&xa.TransBase, func(action string) error {
return dtmimp.TransCallDtm(&xa.TransBase, xa, action)
return dtmimp.TransCallDtm(&xa.TransBase, action)
}, func() error {
_, rerr := xaFunc(xa)
return rerr

16
dtmgrpc/dtmgimp/types.go

@ -15,7 +15,9 @@ import (
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
@ -27,10 +29,11 @@ func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerI
m, err := handler(ctx, req)
res := fmt.Sprintf("%2dms %v %s %s %s",
time.Since(began).Milliseconds(), err, info.FullMethod, dtmimp.MustMarshalString(m), dtmimp.MustMarshalString(req))
if err != nil {
logger.Errorf("%s", res)
} else {
st, _ := status.FromError(err)
if err == nil || st != nil && st.Code() == codes.FailedPrecondition {
logger.Infof("%s", res)
} else {
logger.Errorf("%s", res)
}
return m, err
}
@ -42,10 +45,11 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
err := invoker(ctx, method, req, reply, cc, opts...)
res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v",
cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err)
if err != nil {
logger.Errorf("%s", res)
st, _ := status.FromError(err)
if err == nil || st != nil && st.Code() == codes.FailedPrecondition {
logger.Infof("%s", res)
} else {
logger.Debugf("%s", res)
logger.Errorf("%s", res)
}
return err
}

21
dtmgrpc/dtmgimp/utils.go

@ -24,10 +24,15 @@ func MustProtoMarshal(msg proto.Message) []byte {
return b
}
// DtmGrpcCall make a convenient call to dtm
func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
reply := emptypb.Empty{}
return MustGetGrpcConn(s.Dtm, false).Invoke(s.Context, "/dtmgimp.Dtm/"+operation, &dtmgpb.DtmRequest{
// MustProtoUnmarshal must version of proto.Unmarshal
func MustProtoUnmarshal(data []byte, msg proto.Message) {
err := proto.Unmarshal(data, msg)
dtmimp.PanicIf(err != nil, err)
}
// GetDtmRequest return a DtmRequest from TransBase
func GetDtmRequest(s *dtmimp.TransBase) *dtmgpb.DtmRequest {
return &dtmgpb.DtmRequest{
Gid: s.Gid,
TransType: s.TransType,
TransOptions: &dtmgpb.DtmTransOptions{
@ -42,7 +47,13 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
CustomedData: s.CustomData,
BinPayloads: s.BinPayloads,
Steps: dtmimp.MustMarshalString(s.Steps),
}, &reply)
}
}
// DtmGrpcCall make a convenient call to dtm
func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
reply := emptypb.Empty{}
return MustGetGrpcConn(s.Dtm, false).Invoke(s.Context, "/dtmgimp.Dtm/"+operation, GetDtmRequest(s), &reply)
}
const dtmpre string = "dtm-"

333
dtmgrpc/dtmgpb/dtmgimp.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.19.4
// protoc-gen-go v1.28.0
// protoc v3.17.3
// source: dtmgrpc/dtmgpb/dtmgimp.proto
package dtmgpb
@ -114,13 +114,14 @@ type DtmRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"`
TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"`
TransOptions *DtmTransOptions `protobuf:"bytes,3,opt,name=TransOptions,proto3" json:"TransOptions,omitempty"`
CustomedData string `protobuf:"bytes,4,opt,name=CustomedData,proto3" json:"CustomedData,omitempty"`
BinPayloads [][]byte `protobuf:"bytes,5,rep,name=BinPayloads,proto3" json:"BinPayloads,omitempty"` // for MSG/SAGA branch payloads
QueryPrepared string `protobuf:"bytes,6,opt,name=QueryPrepared,proto3" json:"QueryPrepared,omitempty"` // for MSG
Steps string `protobuf:"bytes,7,opt,name=Steps,proto3" json:"Steps,omitempty"`
Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"`
TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"`
TransOptions *DtmTransOptions `protobuf:"bytes,3,opt,name=TransOptions,proto3" json:"TransOptions,omitempty"`
CustomedData string `protobuf:"bytes,4,opt,name=CustomedData,proto3" json:"CustomedData,omitempty"`
BinPayloads [][]byte `protobuf:"bytes,5,rep,name=BinPayloads,proto3" json:"BinPayloads,omitempty"` // for Msg/Saga/Workflow branch payloads
QueryPrepared string `protobuf:"bytes,6,opt,name=QueryPrepared,proto3" json:"QueryPrepared,omitempty"` // for Msg
Steps string `protobuf:"bytes,7,opt,name=Steps,proto3" json:"Steps,omitempty"`
ReqExtra map[string]string `protobuf:"bytes,8,rep,name=ReqExtra,proto3" json:"ReqExtra,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *DtmRequest) Reset() {
@ -204,6 +205,13 @@ func (x *DtmRequest) GetSteps() string {
return ""
}
func (x *DtmRequest) GetReqExtra() map[string]string {
if x != nil {
return x.ReqExtra
}
return nil
}
type DtmGidReply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -338,6 +346,124 @@ func (x *DtmBranchRequest) GetBusiPayload() []byte {
return nil
}
type DtmProgressesReply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Progresses []*DtmProgress `protobuf:"bytes,1,rep,name=Progresses,proto3" json:"Progresses,omitempty"`
}
func (x *DtmProgressesReply) Reset() {
*x = DtmProgressesReply{}
if protoimpl.UnsafeEnabled {
mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *DtmProgressesReply) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DtmProgressesReply) ProtoMessage() {}
func (x *DtmProgressesReply) ProtoReflect() protoreflect.Message {
mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DtmProgressesReply.ProtoReflect.Descriptor instead.
func (*DtmProgressesReply) Descriptor() ([]byte, []int) {
return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP(), []int{4}
}
func (x *DtmProgressesReply) GetProgresses() []*DtmProgress {
if x != nil {
return x.Progresses
}
return nil
}
type DtmProgress struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"Status,omitempty"`
BinData []byte `protobuf:"bytes,2,opt,name=BinData,proto3" json:"BinData,omitempty"`
BranchID string `protobuf:"bytes,3,opt,name=BranchID,proto3" json:"BranchID,omitempty"`
Op string `protobuf:"bytes,4,opt,name=Op,proto3" json:"Op,omitempty"`
}
func (x *DtmProgress) Reset() {
*x = DtmProgress{}
if protoimpl.UnsafeEnabled {
mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *DtmProgress) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DtmProgress) ProtoMessage() {}
func (x *DtmProgress) ProtoReflect() protoreflect.Message {
mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DtmProgress.ProtoReflect.Descriptor instead.
func (*DtmProgress) Descriptor() ([]byte, []int) {
return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP(), []int{5}
}
func (x *DtmProgress) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
func (x *DtmProgress) GetBinData() []byte {
if x != nil {
return x.BinData
}
return nil
}
func (x *DtmProgress) GetBranchID() string {
if x != nil {
return x.BranchID
}
return ""
}
func (x *DtmProgress) GetOp() string {
if x != nil {
return x.Op
}
return ""
}
var File_dtmgrpc_dtmgpb_dtmgimp_proto protoreflect.FileDescriptor
var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
@ -368,7 +494,7 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x01, 0x22, 0xf8, 0x02, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47,
0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65,
@ -384,45 +510,69 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65,
0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x53, 0x74,
0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73,
0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12,
0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69,
0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72,
0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42,
0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a,
0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38,
0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69,
0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d,
0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41,
0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44,
0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42,
0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e,
0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f,
0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x08, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x21, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, 0x1a,
0x3b, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79,
0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1f, 0x0a, 0x0b,
0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x47,
0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x22, 0x82, 0x02,
0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79,
0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e,
0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x37,
0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72,
0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50,
0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75,
0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74,
0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02,
0x38, 0x01, 0x22, 0x4a, 0x0a, 0x12, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73,
0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x34, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x67,
0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65,
0x73, 0x73, 0x52, 0x0a, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x6b,
0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a,
0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12,
0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f,
0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x32, 0xf3, 0x02, 0x0a, 0x03,
0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e,
0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a,
0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d,
0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72,
0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00,
0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67,
0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69,
0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d,
0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12,
0x40, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x12, 0x13, 0x2e,
0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22,
0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -437,35 +587,42 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP() []byte {
return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescData
}
var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_dtmgrpc_dtmgpb_dtmgimp_proto_goTypes = []interface{}{
(*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions
(*DtmRequest)(nil), // 1: dtmgimp.DtmRequest
(*DtmGidReply)(nil), // 2: dtmgimp.DtmGidReply
(*DtmBranchRequest)(nil), // 3: dtmgimp.DtmBranchRequest
nil, // 4: dtmgimp.DtmTransOptions.BranchHeadersEntry
nil, // 5: dtmgimp.DtmBranchRequest.DataEntry
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
(*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions
(*DtmRequest)(nil), // 1: dtmgimp.DtmRequest
(*DtmGidReply)(nil), // 2: dtmgimp.DtmGidReply
(*DtmBranchRequest)(nil), // 3: dtmgimp.DtmBranchRequest
(*DtmProgressesReply)(nil), // 4: dtmgimp.DtmProgressesReply
(*DtmProgress)(nil), // 5: dtmgimp.DtmProgress
nil, // 6: dtmgimp.DtmTransOptions.BranchHeadersEntry
nil, // 7: dtmgimp.DtmRequest.ReqExtraEntry
nil, // 8: dtmgimp.DtmBranchRequest.DataEntry
(*emptypb.Empty)(nil), // 9: google.protobuf.Empty
}
var file_dtmgrpc_dtmgpb_dtmgimp_proto_depIdxs = []int32{
4, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry
0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions
5, // 2: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry
6, // 3: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty
1, // 4: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest
1, // 5: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest
1, // 6: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest
3, // 7: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest
2, // 8: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply
6, // 9: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty
6, // 10: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty
6, // 11: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty
6, // 12: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty
8, // [8:13] is the sub-list for method output_type
3, // [3:8] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
6, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry
0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions
7, // 2: dtmgimp.DtmRequest.ReqExtra:type_name -> dtmgimp.DtmRequest.ReqExtraEntry
8, // 3: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry
5, // 4: dtmgimp.DtmProgressesReply.Progresses:type_name -> dtmgimp.DtmProgress
9, // 5: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty
1, // 6: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest
1, // 7: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest
1, // 8: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest
3, // 9: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest
1, // 10: dtmgimp.Dtm.Progresses:input_type -> dtmgimp.DtmRequest
2, // 11: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply
9, // 12: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty
9, // 13: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty
9, // 14: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty
9, // 15: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty
4, // 16: dtmgimp.Dtm.Progresses:output_type -> dtmgimp.DtmProgressesReply
11, // [11:17] is the sub-list for method output_type
5, // [5:11] is the sub-list for method input_type
5, // [5:5] is the sub-list for extension type_name
5, // [5:5] is the sub-list for extension extendee
0, // [0:5] is the sub-list for field type_name
}
func init() { file_dtmgrpc_dtmgpb_dtmgimp_proto_init() }
@ -522,6 +679,30 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_init() {
return nil
}
}
file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DtmProgressesReply); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DtmProgress); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -529,7 +710,7 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc,
NumEnums: 0,
NumMessages: 6,
NumMessages: 9,
NumExtensions: 0,
NumServices: 1,
},

16
dtmgrpc/dtmgpb/dtmgimp.proto

@ -12,6 +12,7 @@ service Dtm {
rpc Prepare(DtmRequest) returns (google.protobuf.Empty) {}
rpc Abort(DtmRequest) returns (google.protobuf.Empty) {}
rpc RegisterBranch(DtmBranchRequest) returns (google.protobuf.Empty) {}
rpc Progresses(DtmRequest) returns (DtmProgressesReply) {}
}
message DtmTransOptions {
@ -29,9 +30,10 @@ message DtmRequest {
string TransType = 2;
DtmTransOptions TransOptions = 3;
string CustomedData = 4;
repeated bytes BinPayloads = 5; // for MSG/SAGA branch payloads
string QueryPrepared = 6; // for MSG
repeated bytes BinPayloads = 5; // for Msg/Saga/Workflow branch payloads
string QueryPrepared = 6; // for Msg
string Steps = 7;
map<string, string> ReqExtra = 8;
}
message DtmGidReply {
@ -47,3 +49,13 @@ message DtmBranchRequest {
bytes BusiPayload = 6;
}
message DtmProgressesReply {
repeated DtmProgress Progresses = 1;
}
message DtmProgress {
string Status = 1;
bytes BinData = 2;
string BranchID = 3;
string Op = 4;
}

38
dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// - protoc v3.17.3
// source: dtmgrpc/dtmgpb/dtmgimp.proto
package dtmgpb
@ -28,6 +28,7 @@ type DtmClient interface {
Prepare(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
Abort(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
RegisterBranch(ctx context.Context, in *DtmBranchRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
Progresses(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmProgressesReply, error)
}
type dtmClient struct {
@ -83,6 +84,15 @@ func (c *dtmClient) RegisterBranch(ctx context.Context, in *DtmBranchRequest, op
return out, nil
}
func (c *dtmClient) Progresses(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmProgressesReply, error) {
out := new(DtmProgressesReply)
err := c.cc.Invoke(ctx, "/dtmgimp.Dtm/Progresses", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DtmServer is the server API for Dtm service.
// All implementations must embed UnimplementedDtmServer
// for forward compatibility
@ -92,6 +102,7 @@ type DtmServer interface {
Prepare(context.Context, *DtmRequest) (*emptypb.Empty, error)
Abort(context.Context, *DtmRequest) (*emptypb.Empty, error)
RegisterBranch(context.Context, *DtmBranchRequest) (*emptypb.Empty, error)
Progresses(context.Context, *DtmRequest) (*DtmProgressesReply, error)
mustEmbedUnimplementedDtmServer()
}
@ -114,6 +125,9 @@ func (UnimplementedDtmServer) Abort(context.Context, *DtmRequest) (*emptypb.Empt
func (UnimplementedDtmServer) RegisterBranch(context.Context, *DtmBranchRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method RegisterBranch not implemented")
}
func (UnimplementedDtmServer) Progresses(context.Context, *DtmRequest) (*DtmProgressesReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method Progresses not implemented")
}
func (UnimplementedDtmServer) mustEmbedUnimplementedDtmServer() {}
// UnsafeDtmServer may be embedded to opt out of forward compatibility for this service.
@ -217,6 +231,24 @@ func _Dtm_RegisterBranch_Handler(srv interface{}, ctx context.Context, dec func(
return interceptor(ctx, in, info, handler)
}
func _Dtm_Progresses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DtmRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DtmServer).Progresses(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dtmgimp.Dtm/Progresses",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DtmServer).Progresses(ctx, req.(*DtmRequest))
}
return interceptor(ctx, in, info, handler)
}
// Dtm_ServiceDesc is the grpc.ServiceDesc for Dtm service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -244,6 +276,10 @@ var Dtm_ServiceDesc = grpc.ServiceDesc{
MethodName: "RegisterBranch",
Handler: _Dtm_RegisterBranch_Handler,
},
{
MethodName: "Progresses",
Handler: _Dtm_Progresses_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "dtmgrpc/dtmgpb/dtmgimp.proto",

15
dtmgrpc/type.go

@ -8,6 +8,7 @@ package dtmgrpc
import (
context "context"
"errors"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -22,24 +23,24 @@ import (
// DtmError2GrpcError translate dtm error to grpc error
func DtmError2GrpcError(res interface{}) error {
e, ok := res.(error)
if ok && e == dtmimp.ErrFailure {
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if ok && e == dtmimp.ErrOngoing {
return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err()
if ok && errors.Is(e, dtmimp.ErrFailure) {
return status.New(codes.Aborted, e.Error()).Err()
} else if ok && errors.Is(e, dtmimp.ErrOngoing) {
return status.New(codes.FailedPrecondition, e.Error()).Err()
}
return e
}
// GrpcError2DtmError translate grpc error to dtm error
func GrpcError2DtmError(err error) error {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Aborted {
st, _ := status.FromError(err)
if st != nil && st.Code() == codes.Aborted {
// version lower then v1.10, will specify Ongoing in code Aborted
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ErrOngoing
}
return dtmcli.ErrFailure
} else if ok && st.Code() == codes.FailedPrecondition {
} else if st != nil && st.Code() == codes.FailedPrecondition {
return dtmcli.ErrOngoing
}
return err

53
dtmgrpc/workflow/dummyReadCloser.go

@ -0,0 +1,53 @@
package workflow
import (
"bytes"
"io"
"strings"
)
// NewRespBodyFromString creates an io.ReadCloser from a string that
// is suitable for use as an http response body.
//
// To pass the content of an existing file as body use httpmock.File as in:
// httpmock.NewRespBodyFromString(httpmock.File("body.txt").String())
func NewRespBodyFromString(body string) io.ReadCloser {
return &dummyReadCloser{orig: body}
}
// NewRespBodyFromBytes creates an io.ReadCloser from a byte slice
// that is suitable for use as an http response body.
//
// To pass the content of an existing file as body use httpmock.File as in:
// httpmock.NewRespBodyFromBytes(httpmock.File("body.txt").Bytes())
func NewRespBodyFromBytes(body []byte) io.ReadCloser {
return &dummyReadCloser{orig: body}
}
type dummyReadCloser struct {
orig interface{} // string or []byte
body io.ReadSeeker // instanciated on demand from orig
}
// setup ensures d.body is correctly initialized.
func (d *dummyReadCloser) setup() {
if d.body == nil {
switch body := d.orig.(type) {
case string:
d.body = strings.NewReader(body)
case []byte:
d.body = bytes.NewReader(body)
}
}
}
func (d *dummyReadCloser) Read(p []byte) (n int, err error) {
d.setup()
return d.body.Read(p)
}
func (d *dummyReadCloser) Close() error {
d.setup()
d.body.Seek(0, io.SeekEnd) // nolint: errcheck
return nil
}

46
dtmgrpc/workflow/factory.go

@ -0,0 +1,46 @@
package workflow
import (
"fmt"
"net/url"
"github.com/dtm-labs/dtm/dtmcli/logger"
)
type workflowFactory struct {
protocol string
httpDtm string
httpCallback string
grpcDtm string
grpcCallback string
handlers map[string]WfFunc
}
var defaultFac = workflowFactory{
handlers: map[string]WfFunc{},
}
func (w *workflowFactory) execute(name string, gid string, data []byte) error {
handler := w.handlers[name]
if handler == nil {
return fmt.Errorf("workflow '%s' not registered. please register at startup", name)
}
wf := w.newWorkflow(name, gid, data)
return wf.process(handler, data)
}
func (w *workflowFactory) executeByQS(qs url.Values, body []byte) error {
name := qs.Get("op")
gid := qs.Get("gid")
return w.execute(name, gid, body)
}
func (w *workflowFactory) register(name string, handler WfFunc) error {
e := w.handlers[name]
if e != nil {
return fmt.Errorf("a handler already exists for %s", name)
}
logger.Debugf("workflow '%s' registered.", name)
w.handlers[name] = handler
return nil
}

191
dtmgrpc/workflow/imp.go

@ -0,0 +1,191 @@
package workflow
import (
"context"
"errors"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/go-resty/resty/v2"
)
type workflowImp struct {
restyClient *resty.Client
idGen dtmimp.BranchIDGen
currentBranch string
progresses map[string]*stepResult
currentOp string
succeededOps []workflowPhase2Item
failedOps []workflowPhase2Item
}
type workflowPhase2Item struct {
branchID, op string
fn WfPhase2Func
}
func (wf *Workflow) loadProgresses() error {
progresses, err := wf.getProgress()
if err == nil {
wf.progresses = map[string]*stepResult{}
for _, p := range progresses {
wf.progresses[p.BranchID+"-"+p.Op] = &stepResult{
Status: p.Status,
Data: p.BinData,
}
}
}
return err
}
type wfMeta struct{}
func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Workflow {
wf := &Workflow{
TransBase: dtmimp.NewTransBase(gid, "workflow", "not inited", ""),
Name: name,
workflowImp: workflowImp{
idGen: dtmimp.BranchIDGen{},
succeededOps: []workflowPhase2Item{},
failedOps: []workflowPhase2Item{},
currentOp: dtmimp.OpAction,
},
}
wf.Protocol = w.protocol
if w.protocol == dtmimp.ProtocolGRPC {
wf.Dtm = w.grpcDtm
wf.QueryPrepared = w.grpcCallback
} else {
wf.Dtm = w.httpDtm
wf.QueryPrepared = w.httpCallback
}
wf.newBranch()
wf.CustomData = dtmimp.MustMarshalString(map[string]interface{}{
"name": wf.Name,
"data": data,
})
wf.Context = context.WithValue(wf.Context, wfMeta{}, wf)
wf.initRestyClient()
return wf
}
func (wf *Workflow) initRestyClient() {
wf.restyClient = resty.New()
wf.restyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
r.SetQueryParams(map[string]string{
"gid": wf.Gid,
"trans_type": wf.TransType,
"branch_id": wf.currentBranch,
"op": dtmimp.OpAction,
})
err := dtmimp.BeforeRequest(c, r)
return err
})
old := wf.restyClient.GetClient().Transport
wf.restyClient.GetClient().Transport = NewRoundTripper(old, wf)
wf.restyClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error {
err := dtmimp.AfterResponse(c, r)
if err == nil && !wf.Options.DisalbeAutoError {
err = dtmimp.RespAsErrorCompatible(r) // check for dtm error
}
return err
})
}
func (wf *Workflow) process(handler WfFunc, data []byte) (err error) {
err = wf.prepare()
if err == nil {
err = wf.loadProgresses()
}
if err == nil {
err = handler(wf, data)
err = dtmgrpc.GrpcError2DtmError(err)
if err != nil && !errors.Is(err, dtmcli.ErrFailure) {
return err
}
err = wf.processPhase2(err)
}
if err == nil || errors.Is(err, dtmcli.ErrFailure) {
err1 := wf.submit(wfErrorToStatus(err))
if err1 != nil {
return err1
}
}
return err
}
func (wf *Workflow) saveResult(branchID string, op string, sr *stepResult) error {
if sr.Status == "" {
return sr.Error
}
return wf.registerBranch(sr.Data, branchID, op, sr.Status)
}
func (wf *Workflow) processPhase2(err error) error {
ops := wf.succeededOps
if err == nil {
wf.currentOp = dtmimp.OpCommit
} else {
wf.currentOp = dtmimp.OpRollback
ops = wf.failedOps
}
for i := len(ops) - 1; i >= 0; i-- {
op := ops[i]
err1 := wf.callPhase2(op.branchID, op.op, op.fn)
if err1 != nil {
return err1
}
}
return err
}
func (wf *Workflow) callPhase2(branchID string, op string, fn WfPhase2Func) error {
wf.currentBranch = branchID
r := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
err := fn(bb)
if errors.Is(err, dtmcli.ErrFailure) {
panic("should not return ErrFail in phase2")
}
return stepResultFromLocal(nil, err)
})
_, err := stepResultToLocal(r)
return err
}
func (wf *Workflow) recordedDo(fn func(bb *dtmcli.BranchBarrier) *stepResult) *stepResult {
branchID := wf.currentBranch
r := wf.getStepResult()
if wf.currentOp == dtmimp.OpAction { // for action steps, an action will start a new branch
wf.newBranch()
}
if r != nil {
logger.Debugf("progress restored: %s %s %v %s %s", branchID, wf.currentOp, r.Error, r.Status, r.Data)
return r
}
bb := &dtmcli.BranchBarrier{
TransType: wf.TransType,
Gid: wf.Gid,
BranchID: branchID,
Op: wf.currentOp,
}
r = fn(bb)
err := wf.saveResult(branchID, wf.currentOp, r)
if err != nil {
r = stepResultFromLocal(nil, err)
}
return r
}
func (wf *Workflow) newBranch() {
wf.idGen.NewSubBranchID()
wf.currentBranch = wf.idGen.CurrentSubBranchID()
}
func (wf *Workflow) getStepResult() *stepResult {
logger.Debugf("getStepResult: %s %v", wf.currentBranch+"-"+wf.currentOp, wf.progresses[wf.currentBranch+"-"+wf.currentOp])
return wf.progresses[wf.currentBranch+"-"+wf.currentOp]
}

78
dtmgrpc/workflow/rpc.go

@ -0,0 +1,78 @@
package workflow
import (
"context"
"strings"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"google.golang.org/protobuf/types/known/emptypb"
)
func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) {
if wf.Protocol == dtmimp.ProtocolGRPC {
var reply dtmgpb.DtmProgressesReply
err := dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/Progresses",
dtmgimp.GetDtmRequest(wf.TransBase), &reply)
if err == nil {
return reply.Progresses, nil
}
return nil, err
}
resp, err := dtmimp.RestyClient.R().SetQueryParam("gid", wf.Gid).Get(wf.Dtm + "/progresses")
var progresses []*dtmgpb.DtmProgress
if err == nil {
dtmimp.MustUnmarshal(resp.Body(), &progresses)
}
return progresses, err
}
func (wf *Workflow) submit(status string) error {
if wf.Protocol == dtmimp.ProtocolHTTP {
m := map[string]interface{}{
"gid": wf.Gid,
"trans_type": wf.TransType,
"req_extra": map[string]string{
"status": status,
},
}
_, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit")
return err
}
req := dtmgimp.GetDtmRequest(wf.TransBase)
req.ReqExtra = map[string]string{
"status": status,
}
reply := emptypb.Empty{}
return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply)
}
func (wf *Workflow) registerBranch(res []byte, branchID string, op string, status string) error {
logger.Errorf("registerBranch: %s %s %s", branchID, op, status)
if wf.Protocol == dtmimp.ProtocolHTTP {
return dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{
"data": string(res),
"branch_id": branchID,
"op": op,
"status": status,
}, "registerBranch")
}
_, err := dtmgimp.MustGetDtmClient(wf.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{
Gid: wf.Gid,
TransType: wf.TransType,
BranchID: branchID,
BusiPayload: res,
Data: map[string]string{"status": status, "op": op},
})
return err
}
func (wf *Workflow) prepare() error {
operation := "prepare"
if wf.Protocol == dtmimp.ProtocolGRPC {
return dtmgimp.DtmGrpcCall(wf.TransBase, strings.Title(operation))
}
return dtmimp.TransCallDtm(wf.TransBase, operation)
}

26
dtmgrpc/workflow/server.go

@ -0,0 +1,26 @@
package workflow
import (
"context"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
type workflowServer struct {
wfpb.UnimplementedWorkflowServer
}
func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*emptypb.Empty, error) {
if defaultFac.protocol != dtmimp.ProtocolGRPC {
return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first")
}
tb := dtmgimp.TransBaseFromGrpc(ctx)
err := defaultFac.execute(tb.Op, tb.Gid, wd.Data)
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
}

138
dtmgrpc/workflow/utils.go

@ -0,0 +1,138 @@
package workflow
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"
)
const HBranchID = "dtm-branch-id"
const HBranchOp = "dtm-branch-op"
func statusToCode(status string) int {
if status == "succeed" {
return 200
}
return 409
}
func wfErrorToStatus(err error) string {
if err == nil {
return dtmcli.StatusSucceed
} else if errors.Is(err, dtmcli.ErrFailure) {
return dtmcli.StatusFailed
}
return ""
}
type stepResult struct {
Error error // if Error != nil || Status == "", result will not be saved
Status string // succeed | failed | ""
// if status == succeed, data is the result.
// if status == failed, data is the error message
Data []byte
}
type roundTripper struct {
old http.RoundTripper
wf *Workflow
}
func newJSONResponse(status int, result []byte) *http.Response {
return &http.Response{
Status: strconv.Itoa(status),
StatusCode: status,
Body: NewRespBodyFromBytes(result),
Header: http.Header{
"Content-Type": []string{"application/json"},
},
ContentLength: -1,
}
}
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
wf := r.wf
if wf.currentOp != dtmimp.OpAction { // in phase 2, do not save, because it is saved outer
return r.old.RoundTrip(req)
}
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
resp, err := r.old.RoundTrip(req)
return stepResultFromHttp(resp, err)
})
return stepResultToHttp(sr)
}
func NewRoundTripper(old http.RoundTripper, wf *Workflow) http.RoundTripper {
return &roundTripper{old: old, wf: wf}
}
func stepResultFromLocal(data []byte, err error) *stepResult {
return &stepResult{
Error: err,
Status: wfErrorToStatus(err),
Data: data,
}
}
func stepResultToLocal(s *stepResult) ([]byte, error) {
if s.Error != nil {
return nil, s.Error
} else if s.Status == dtmcli.StatusFailed {
return nil, fmt.Errorf("%s. %w", string(s.Data), dtmcli.ErrFailure)
}
return s.Data, nil
}
func stepResultFromGrpc(reply interface{}, err error) *stepResult {
sr := &stepResult{}
st, ok := status.FromError(err)
if err == nil {
sr.Status = dtmcli.StatusSucceed
sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage))
} else if ok && st.Code() == codes.Aborted {
sr.Status = dtmcli.StatusFailed
sr.Data = []byte(st.Message())
} else {
sr.Error = err
}
return sr
}
func stepResultToGrpc(s *stepResult, reply interface{}) error {
if s.Error != nil {
return s.Error
} else if s.Status == dtmcli.StatusSucceed {
dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage))
return nil
}
return status.New(codes.Aborted, string(s.Data)).Err()
}
func stepResultFromHttp(resp *http.Response, err error) *stepResult {
sr := &stepResult{Error: err}
if err == nil {
sr.Data, sr.Error = ioutil.ReadAll(resp.Body)
if resp.StatusCode == http.StatusOK {
sr.Status = dtmcli.StatusSucceed
} else if resp.StatusCode == http.StatusConflict {
sr.Status = dtmcli.StatusFailed
}
}
return sr
}
func stepResultToHttp(s *stepResult) (*http.Response, error) {
if s.Error != nil {
return nil, s.Error
}
return newJSONResponse(statusToCode(s.Status), s.Data), nil
}

153
dtmgrpc/workflow/wfpb/wf.pb.go

@ -0,0 +1,153 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.17.3
// source: dtmgrpc/workflow/wfpb/wf.proto
package wfpb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type WorkflowData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"`
}
func (x *WorkflowData) Reset() {
*x = WorkflowData{}
if protoimpl.UnsafeEnabled {
mi := &file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WorkflowData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WorkflowData) ProtoMessage() {}
func (x *WorkflowData) ProtoReflect() protoreflect.Message {
mi := &file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WorkflowData.ProtoReflect.Descriptor instead.
func (*WorkflowData) Descriptor() ([]byte, []int) {
return file_dtmgrpc_workflow_wfpb_wf_proto_rawDescGZIP(), []int{0}
}
func (x *WorkflowData) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
var File_dtmgrpc_workflow_wfpb_wf_proto protoreflect.FileDescriptor
var file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc = []byte{
0x0a, 0x1e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c,
0x6f, 0x77, 0x2f, 0x77, 0x66, 0x70, 0x62, 0x2f, 0x77, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74,
0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x22, 0x0a, 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x32, 0x47, 0x0a, 0x08, 0x57,
0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x3b, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75,
0x74, 0x65, 0x12, 0x16, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x57, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x77, 0x66, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescOnce sync.Once
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData = file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc
)
func file_dtmgrpc_workflow_wfpb_wf_proto_rawDescGZIP() []byte {
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescOnce.Do(func() {
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData = protoimpl.X.CompressGZIP(file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData)
})
return file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData
}
var file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_dtmgrpc_workflow_wfpb_wf_proto_goTypes = []interface{}{
(*WorkflowData)(nil), // 0: workflow.WorkflowData
(*emptypb.Empty)(nil), // 1: google.protobuf.Empty
}
var file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs = []int32{
0, // 0: workflow.Workflow.Execute:input_type -> workflow.WorkflowData
1, // 1: workflow.Workflow.Execute:output_type -> google.protobuf.Empty
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_dtmgrpc_workflow_wfpb_wf_proto_init() }
func file_dtmgrpc_workflow_wfpb_wf_proto_init() {
if File_dtmgrpc_workflow_wfpb_wf_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WorkflowData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_dtmgrpc_workflow_wfpb_wf_proto_goTypes,
DependencyIndexes: file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs,
MessageInfos: file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes,
}.Build()
File_dtmgrpc_workflow_wfpb_wf_proto = out.File
file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc = nil
file_dtmgrpc_workflow_wfpb_wf_proto_goTypes = nil
file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs = nil
}

15
dtmgrpc/workflow/wfpb/wf.proto

@ -0,0 +1,15 @@
syntax = "proto3";
option go_package = "./wfpb";
import "google/protobuf/empty.proto";
package workflow;
// The Workflow service definition.
service Workflow {
rpc Execute(WorkflowData) returns (google.protobuf.Empty) {}
}
message WorkflowData {
bytes Data = 1;
}

106
dtmgrpc/workflow/wfpb/wf_grpc.pb.go

@ -0,0 +1,106 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.17.3
// source: dtmgrpc/workflow/wfpb/wf.proto
package wfpb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// WorkflowClient is the client API for Workflow service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type WorkflowClient interface {
Execute(ctx context.Context, in *WorkflowData, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type workflowClient struct {
cc grpc.ClientConnInterface
}
func NewWorkflowClient(cc grpc.ClientConnInterface) WorkflowClient {
return &workflowClient{cc}
}
func (c *workflowClient) Execute(ctx context.Context, in *WorkflowData, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/workflow.Workflow/Execute", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// WorkflowServer is the server API for Workflow service.
// All implementations must embed UnimplementedWorkflowServer
// for forward compatibility
type WorkflowServer interface {
Execute(context.Context, *WorkflowData) (*emptypb.Empty, error)
mustEmbedUnimplementedWorkflowServer()
}
// UnimplementedWorkflowServer must be embedded to have forward compatible implementations.
type UnimplementedWorkflowServer struct {
}
func (UnimplementedWorkflowServer) Execute(context.Context, *WorkflowData) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Execute not implemented")
}
func (UnimplementedWorkflowServer) mustEmbedUnimplementedWorkflowServer() {}
// UnsafeWorkflowServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to WorkflowServer will
// result in compilation errors.
type UnsafeWorkflowServer interface {
mustEmbedUnimplementedWorkflowServer()
}
func RegisterWorkflowServer(s grpc.ServiceRegistrar, srv WorkflowServer) {
s.RegisterService(&Workflow_ServiceDesc, srv)
}
func _Workflow_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WorkflowData)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(WorkflowServer).Execute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/workflow.Workflow/Execute",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WorkflowServer).Execute(ctx, req.(*WorkflowData))
}
return interceptor(ctx, in, info, handler)
}
// Workflow_ServiceDesc is the grpc.ServiceDesc for Workflow service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Workflow_ServiceDesc = grpc.ServiceDesc{
ServiceName: "workflow.Workflow",
HandlerType: (*WorkflowServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Execute",
Handler: _Workflow_Execute_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "dtmgrpc/workflow/wfpb/wf.proto",
}

190
dtmgrpc/workflow/workflow.go

@ -0,0 +1,190 @@
package workflow
import (
"context"
"database/sql"
"fmt"
"net/url"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb"
"github.com/go-resty/resty/v2"
"google.golang.org/grpc"
)
// InitHttp will init Workflow engine to use http
// param httpDtm specify the dtm address
// param callback specify the url for dtm to callback if a workflow timeout
func InitHttp(httpDtm string, callback string) {
defaultFac.protocol = dtmimp.ProtocolHTTP
defaultFac.httpDtm = httpDtm
defaultFac.httpCallback = callback
}
// InitHttp will init Workflow engine to use grpc
// param dtm specify the dtm address
// param clientHost specify the client host for dtm to callback if a workflow timeout
// param grpcServer specify the grpc server
func InitGrpc(grpcDtm string, clientHost string, grpcServer *grpc.Server) {
defaultFac.protocol = dtmimp.ProtocolGRPC
defaultFac.grpcDtm = grpcDtm
wfpb.RegisterWorkflowServer(grpcServer, &workflowServer{})
defaultFac.grpcCallback = clientHost + "/workflow.Workflow/Execute"
}
// SetProtocolForTest change protocol directly. only used by test
func SetProtocolForTest(protocol string) {
defaultFac.protocol = protocol
}
// Register will register a workflow with the specified name
func Register(name string, handler WfFunc) error {
return defaultFac.register(name, handler)
}
// Execute will execute a workflow with the gid and specified params
// if the workflow with the gid does not exist, then create a new workflow and execute it
// if the workflow with the gid exists, resume to execute it
func Execute(name string, gid string, data []byte) error {
return defaultFac.execute(name, gid, data)
}
// ExecuteByQS is like Execute, but name and gid will be obtained from qs
func ExecuteByQS(qs url.Values, body []byte) error {
return defaultFac.executeByQS(qs, body)
}
// WorkflowOptions is for specifying workflow options
type WorkflowOptions struct {
// if this flag is set true, then Workflow's restyClient will keep the origin http response
// or else, Workflow's restyClient will convert http reponse to error if status code is not 200
DisalbeAutoError bool
}
// Workflow is the type for a workflow
type Workflow struct {
// The name of the workflow
Name string
Options WorkflowOptions
*dtmimp.TransBase
workflowImp
}
// WfFunc is the type for workflow function
type WfFunc func(wf *Workflow, data []byte) error
// WfPhase2Func is the type for phase 2 function
// param bb is a BranchBarrier, which is introduced by http://d.dtm.pub/practice/barrier.html
type WfPhase2Func func(bb *dtmcli.BranchBarrier) error
// NewRequest return a new resty request, whose progress will be recorded
func (wf *Workflow) NewRequest() *resty.Request {
return wf.restyClient.R().SetContext(wf.Context)
}
// DefineSagaPhase2 will define a saga branch transaction
// param compensate specify a function for the compensation of next workflow action
func (wf *Workflow) DefineSagaPhase2(compensate WfPhase2Func) {
branchID := wf.currentBranch
wf.failedOps = append(wf.failedOps, workflowPhase2Item{
branchID: branchID,
op: dtmimp.OpRollback,
fn: compensate,
})
}
// DefineSagaPhase2 will define a tcc branch transaction
// param confirm, concel specify the confirm and cancel operation of next workflow action
func (wf *Workflow) DefineTccPhase2(confirm, cancel WfPhase2Func) {
branchID := wf.currentBranch
wf.failedOps = append(wf.failedOps, workflowPhase2Item{
branchID: branchID,
op: dtmimp.OpRollback,
fn: cancel,
})
wf.succeededOps = append(wf.succeededOps, workflowPhase2Item{
branchID: branchID,
op: dtmimp.OpCommit,
fn: confirm,
})
}
// DoAction will do an action which will be recored
func (wf *Workflow) DoAction(fn func(bb *dtmcli.BranchBarrier) ([]byte, error)) ([]byte, error) {
res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
r, e := fn(bb)
return stepResultFromLocal(r, e)
})
return stepResultToLocal(res)
}
func (wf *Workflow) DoXaAction(dbConf dtmcli.DBConf, fn func(db *sql.DB) ([]byte, error)) ([]byte, error) {
branchID := wf.currentBranch
res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
sBusi := "business"
k := bb.BranchID + "-" + sBusi
if wf.progresses[k] != nil {
return &stepResult{
Error: fmt.Errorf("error occur at prepare, not resumable, to rollback. %w", dtmcli.ErrFailure),
}
}
sr := &stepResult{}
wf.TransBase.BranchID = branchID
wf.TransBase.Op = sBusi
err := dtmimp.XaHandleLocalTrans(wf.TransBase, dbConf, func(d *sql.DB) error {
r, e := fn(d)
sr.Data = r
if e == nil {
e = wf.saveResult(branchID, sBusi, &stepResult{Status: dtmcli.StatusSucceed})
}
return e
})
sr.Error = err
sr.Status = wfErrorToStatus(err)
return sr
})
phase2 := func(bb *dtmcli.BranchBarrier) error {
return dtmimp.XaHandlePhase2(bb.Gid, dbConf, bb.BranchID, bb.Op)
}
wf.succeededOps = append(wf.succeededOps, workflowPhase2Item{
branchID: branchID,
op: dtmimp.OpCommit,
fn: phase2,
})
wf.failedOps = append(wf.failedOps, workflowPhase2Item{
branchID: branchID,
op: dtmimp.OpRollback,
fn: phase2,
})
return res.Data, res.Error
}
// Interceptor is the middleware for workflow to capture grpc call result
func Interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req))
wf := ctx.Value(wfMeta{}).(*Workflow)
origin := func() error {
ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm)
err := invoker(ctx1, method, req, reply, cc, opts...)
res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v",
cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err)
if err != nil {
logger.Errorf("%s", res)
} else {
logger.Debugf("%s", res)
}
return err
}
if wf.currentOp != dtmimp.OpAction {
return origin()
}
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
err := origin()
return stepResultFromGrpc(reply, err)
})
return stepResultToGrpc(sr, reply)
}

7
dtmsvr/api.go

@ -20,6 +20,9 @@ var Version = ""
func svcSubmit(t *TransGlobal) interface{} {
t.Status = dtmcli.StatusSubmitted
if t.ReqExtra != nil && t.ReqExtra["status"] != "" {
t.Status = t.ReqExtra["status"]
}
branches, err := t.saveNew()
if err == storage.ErrUniqueConflict {
@ -82,6 +85,10 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st
branches[0].URL = data["url"]
branches[1].Op = dtmimp.OpCommit
branches[1].URL = data["url"]
} else if transType == "workflow" {
branches = []TransBranch{*branch}
branches[0].Status = data["status"]
branches[0].Op = data["op"]
} else {
return fmt.Errorf("unknow trans type: %s", transType)
}

16
dtmsvr/api_grpc.go

@ -48,3 +48,19 @@ func (s *dtmServer) RegisterBranch(ctx context.Context, in *pb.DtmBranchRequest)
}, in.Data)
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(r)
}
func (s *dtmServer) Progresses(ctx context.Context, in *pb.DtmRequest) (*pb.DtmProgressesReply, error) {
branches := GetStore().FindBranches(in.Gid)
reply := &pb.DtmProgressesReply{
Progresses: []*pb.DtmProgress{},
}
for _, b := range branches {
reply.Progresses = append(reply.Progresses, &pb.DtmProgress{
Status: b.Status,
BranchID: b.BranchID,
Op: b.Op,
BinData: b.BinData,
})
}
return reply, nil
}

6
dtmsvr/api_http.go

@ -31,6 +31,7 @@ func addRoute(engine *gin.Engine) {
engine.POST("/api/dtmsvr/registerXaBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk
engine.POST("/api/dtmsvr/registerTccBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk
engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query))
engine.GET("/api/dtmsvr/progresses", dtmutil.WrapHandler2(progresses))
engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all))
engine.GET("/api/dtmsvr/resetCronTime", dtmutil.WrapHandler2(resetCronTime))
@ -85,6 +86,11 @@ func query(c *gin.Context) interface{} {
return map[string]interface{}{"transaction": trans, "branches": branches}
}
func progresses(c *gin.Context) interface{} {
gid := c.Query("gid")
return GetStore().FindBranches(gid)
}
func all(c *gin.Context) interface{} {
position := c.Query("position")
sLimit := dtmimp.OrString(c.Query("limit"), "100")

2
dtmsvr/cron.go

@ -35,7 +35,7 @@ func CronTransOnce() (gid string) {
trans.WaitResult = true
branches := GetStore().FindBranches(gid)
err := trans.Process(branches)
dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err)
dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure) && !errors.Is(err, dtmcli.ErrOngoing), err)
return
}

19
dtmsvr/storage/boltdb/boltdb.go

@ -11,7 +11,6 @@ import (
"strings"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr/storage"
@ -389,11 +388,7 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
err := s.boltDb.Update(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
toDelete := [][]byte{}
for trans == nil || trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed {
k, v := cursor.First()
if k == nil || string(k) > min {
return storage.ErrNotFound
}
for k, v := cursor.First(); k != nil && string(k) <= min && (trans == nil || trans.IsFinished()); k, v = cursor.Next() {
trans = tGetGlobal(t, string(v))
toDelete = append(toDelete, k)
}
@ -401,14 +396,14 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
err := t.Bucket(bucketIndex).Delete(k)
dtmimp.E2P(err)
}
trans.NextCronTime = &next
tPutGlobal(t, trans)
tPutIndex(t, next.Unix(), trans.Gid)
if trans != nil && !trans.IsFinished() {
trans.NextCronTime = &next
tPutGlobal(t, trans)
// this put should be after delete, because the data may be the same
tPutIndex(t, next.Unix(), trans.Gid)
}
return nil
})
if err == storage.ErrNotFound {
return nil
}
dtmimp.E2P(err)
return trans
}

10
dtmsvr/storage/trans.go

@ -51,12 +51,16 @@ func (g *TransGlobalStore) String() string {
return dtmimp.MustMarshalString(g)
}
func (g *TransGlobalStore) IsFinished() bool {
return g.Status == dtmcli.StatusFailed || g.Status == dtmcli.StatusSucceed
}
// TransBranchStore branch transaction
type TransBranchStore struct {
dtmutil.ModelBase
Gid string `json:"gid,omitempty"`
URL string `json:"url,omitempty"`
BinData []byte
Gid string `json:"gid,omitempty"`
URL string `json:"url,omitempty"`
BinData []byte `json:"bin_data,omitempty"`
BranchID string `json:"branch_id,omitempty"`
Op string `json:"op,omitempty"`
Status string `json:"status,omitempty"`

2
dtmsvr/trans_class.go

@ -16,6 +16,7 @@ import (
// TransGlobal global transaction
type TransGlobal struct {
storage.TransGlobalStore
ReqExtra map[string]string `json:"req_extra"`
Context context.Context
lastTouched time.Time // record the start time of process
updateBranchSync bool
@ -109,6 +110,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
RequestTimeout: o.RequestTimeout,
},
}}
r.ReqExtra = c.ReqExtra
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}

5
dtmsvr/trans_process.go

@ -7,6 +7,7 @@
package dtmsvr
import (
"errors"
"fmt"
"time"
@ -34,7 +35,7 @@ func (t *TransGlobal) process(branches []TransBranch) error {
if !t.WaitResult {
go func() {
err := t.processInner(branches)
if err != nil {
if err != nil && !errors.Is(err, dtmimp.ErrOngoing) {
logger.Errorf("processInner err: %v", err)
}
}()
@ -54,7 +55,7 @@ func (t *TransGlobal) process(branches []TransBranch) error {
func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil && rerr != dtmcli.ErrOngoing {
if rerr != nil && !errors.Is(rerr, dtmcli.ErrOngoing) {
logger.Errorf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {

43
dtmsvr/trans_type_workflow.go

@ -0,0 +1,43 @@
package dtmsvr
import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb"
)
type transWorkflowProcessor struct {
*TransGlobal
}
func init() {
registorProcessorCreator("workflow", func(trans *TransGlobal) transProcessor { return &transWorkflowProcessor{TransGlobal: trans} })
}
func (t *transWorkflowProcessor) GenBranches() []TransBranch {
return []TransBranch{}
}
type cWorkflowCustom struct {
Name string `json:"name"`
Data []byte `json:"data"`
}
func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error {
if t.Status == dtmcli.StatusSubmitted { // client workflow finished
t.changeStatus(dtmcli.StatusSucceed)
return nil
} else if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed {
return nil
}
cmc := cWorkflowCustom{}
dtmimp.MustUnmarshalString(t.CustomData, &cmc)
data := cmc.Data
if t.Protocol == dtmimp.ProtocolGRPC {
wd := wfpb.WorkflowData{Data: cmc.Data}
data = dtmgimp.MustProtoMarshal(&wd)
}
return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data)
}

10
helper/test-cover.sh

@ -1,13 +1,15 @@
set -x
echo "" > coverage.txt
for store in redis mysql boltdb postgres; do
echo "mode: count" coverage.txt
for store in redis boltdb mysql postgres; do
for d in $(go list ./... | grep -v vendor); do
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/workflow,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
cat profile.out | grep -v 'mode:' >> coverage.txt
echo > profile.out
fi
done
done
# go tool cover -html=coverage.txt
curl -s https://codecov.io/bash | bash

4
sqls/dtmsvr.storage.mysql.sql

@ -7,14 +7,14 @@ CREATE TABLE if not EXISTS dtm.trans_global (
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for 2-phase message',
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
`custom_data` varchar(256) DEFAULT '' COMMENT 'custom data for transaction',
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',

2
sqls/dtmsvr.storage.postgres.sql

@ -13,7 +13,7 @@ CREATE TABLE if not EXISTS trans_global (
finish_time timestamp(0) with time zone DEFAULT NULL,
rollback_time timestamp(0) with time zone DEFAULT NULL,
options varchar(1024) DEFAULT '',
custom_data varchar(256) DEFAULT '',
custom_data varchar(1024) DEFAULT '',
next_cron_interval int default null,
next_cron_time timestamp(0) with time zone default null,
owner varchar(128) not null default '',

2
sqls/dtmsvr.storage.tdsql.sql

@ -14,7 +14,7 @@ CREATE TABLE if not EXISTS dtm.trans_global (
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
`custom_data` varchar(256) DEFAULT '' COMMENT 'custom data for transaction',
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',

24
test/busi/base_grpc.go

@ -21,6 +21,7 @@ import (
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
emptypb "google.golang.org/protobuf/types/known/emptypb"
@ -32,22 +33,31 @@ var BusiGrpc = fmt.Sprintf("localhost:%d", BusiGrpcPort)
// DtmClient grpc client for dtm
var DtmClient dtmgpb.DtmClient
var BusiCli BusiClient
// GrpcStartup for grpc
func GrpcStartup() {
func GrpcStartup() *grpc.Server {
conn, err := grpc.Dial(dtmutil.DefaultGrpcServer, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(dtmgimp.GrpcClientLog))
logger.FatalIfError(err)
DtmClient = dtmgpb.NewDtmClient(conn)
logger.Debugf("dtm client inited")
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort))
conn1, err := grpc.Dial(BusiGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(workflow.Interceptor))
logger.FatalIfError(err)
BusiCli = NewBusiClient(conn1)
s := grpc.NewServer(grpc.UnaryInterceptor(dtmgimp.GrpcServerLog))
RegisterBusiServer(s, &busiServer{})
go func() {
logger.Debugf("busi grpc listening at %v", lis.Addr())
err := s.Serve(lis)
logger.FatalIfError(err)
}()
return s
}
// GrpcServe start to serve grpc
func GrpcServe(server *grpc.Server) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort))
logger.FatalIfError(err)
logger.Debugf("busi grpc listening at %v", lis.Addr())
err = server.Serve(lis)
logger.FatalIfError(err)
}
// busiServer is used to implement busi.BusiServer.

7
test/busi/base_http.go

@ -10,10 +10,12 @@ import (
"database/sql"
"errors"
"fmt"
"io/ioutil"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
"gorm.io/driver/mysql"
@ -77,6 +79,11 @@ func BaseAppStartup() *gin.Engine {
// BaseAddRoute add base route handler
func BaseAddRoute(app *gin.Engine) {
app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} {
data, err := ioutil.ReadAll(ctx.Request.Body)
logger.FatalIfError(err)
return workflow.ExecuteByQS(ctx.Request.URL.Query(), data)
}))
app.POST(BusiAPI+"/TransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn")
}))

12
test/busi/base_workflow.go

@ -0,0 +1,12 @@
package busi
import (
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/dtmutil"
"google.golang.org/grpc"
)
func WorkflowStarup(server *grpc.Server) {
workflow.InitHttp(dtmServer, Busi+"/workflow/resume")
workflow.InitGrpc(dtmutil.DefaultGrpcServer, BusiGrpc, server)
}

16
test/busi/busi.go → test/busi/data.go

@ -9,6 +9,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -16,6 +17,21 @@ import (
status "google.golang.org/grpc/status"
)
// PopulateDB populate example mysql data
func PopulateDB(skipDrop bool) {
resetXaData()
file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
file = fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
_, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear
dtmimp.E2P(err)
SetRedisBothAccount(10000, 10000)
SetupMongoBarrierAndBusi()
}
// TransOutUID 1
const TransOutUID = 1

27
test/busi/startup.go

@ -1,31 +1,14 @@
package busi
import (
"context"
"fmt"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
)
// Startup startup the busi's grpc and http service
func Startup() *gin.Engine {
GrpcStartup()
return BaseAppStartup()
}
// PopulateDB populate example mysql data
func PopulateDB(skipDrop bool) {
resetXaData()
file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
file = fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
_, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear
dtmimp.E2P(err)
SetRedisBothAccount(10000, 10000)
SetupMongoBarrierAndBusi()
svr := GrpcStartup()
app := BaseAppStartup()
WorkflowStarup(svr)
go GrpcServe(svr)
return app
}

2
test/busi/utils.go

@ -84,7 +84,7 @@ func SetGrpcHeaderForHeadersYes(ctx context.Context, method string, req, reply i
// SetHTTPHeaderForHeadersYes interceptor to set head for HeadersYes
func SetHTTPHeaderForHeadersYes(c *resty.Client, r *resty.Request) error {
if b, ok := r.Body.(*dtmcli.Saga); ok && strings.HasSuffix(b.Gid, "HeadersYes") {
if b, ok := r.Body.(*dtmimp.TransBase); ok && strings.HasSuffix(b.Gid, "HeadersYes") {
logger.Debugf("set test_header for url: %s", r.URL)
r.SetHeader("test_header", "yes")
}

3
test/main_test.go

@ -12,6 +12,7 @@ import (
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmsvr"
@ -39,7 +40,7 @@ func TestMain(m *testing.M) {
dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHTTPHeaderForHeadersYes)
dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil })
tenv := os.Getenv("TEST_STORE")
tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis)
conf.Store.Host = "localhost"
conf.Store.Driver = tenv
if tenv == "boltdb" {

2
test/msg_jrpc_test.go

@ -130,7 +130,7 @@ func TestMsgJprcAbnormal(t *testing.T) {
func TestMsgJprcAbnormal2(t *testing.T) {
tb := dtmimp.NewTransBase(dtmimp.GetFuncName(), "msg", dtmutil.DefaultJrpcServer, "01")
tb.Protocol = "json-rpc"
err := dtmimp.TransCallDtm(tb, "", "newGid")
_, err := dtmimp.TransCallDtmExt(tb, "", "newGid")
assert.Nil(t, err)
}

265
test/workflow_test.go

@ -0,0 +1,265 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"database/sql"
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestWorkflowNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.TransReq
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TransOut")
if err != nil {
return err
}
_, err = wf.NewRequest().SetBody(req).Post(Busi + "/TransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenTransReq(30, false, true)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.TransReq
dtmimp.MustUnmarshal(data, &req)
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom")
return err
})
_, err := wf.DoAction(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "")
})
})
if err != nil {
return err
}
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "")
})
})
_, err = wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn")
if err != nil {
return err
}
return nil
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
func TestWorkflowGrpcNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if err != nil {
return err
}
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err
})
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req)
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
waitTransProcessed(gid)
}
var ongoingStep = 0
func fetchOngoingStep(dest int) bool {
c := ongoingStep
logger.Debugf("ongoing step is: %d", c)
if c == dest {
ongoingStep++
return true
}
return false
}
func TestWorkflowGrpcRollbackResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
ongoingStep = 0
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req)
if fetchOngoingStep(0) {
return dtmcli.ErrOngoing
}
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(4) {
return dtmcli.ErrOngoing
}
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if fetchOngoingStep(1) {
return dtmcli.ErrOngoing
}
if err != nil {
return err
}
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(3) {
return dtmcli.ErrOngoing
}
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err
})
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req)
if fetchOngoingStep(2) {
return dtmcli.ErrOngoing
}
return err
})
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrOngoing)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}
func TestWorkflowXaAction(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
})
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Nil(t, err)
waitTransProcessed(gid)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowXaRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
logger.FatalIfError(e)
return nil, dtmcli.ErrFailure
})
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrFailure, err)
waitTransProcessed(gid)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}
func TestWorkflowXaResume(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
ongoingStep = 0
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
if fetchOngoingStep(0) {
return nil, dtmcli.ErrOngoing
}
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
_, err = wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
if fetchOngoingStep(1) {
return nil, dtmcli.ErrOngoing
}
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess)
})
if err != nil {
return err
}
if fetchOngoingStep(2) {
return dtmcli.ErrOngoing
}
return err
})
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrOngoing, err)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
go waitTransProcessed(gid)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
Loading…
Cancel
Save