From af0e9f10ed79626016a61771b41aa755b7b947fa Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 29 Jun 2022 11:04:13 +0800 Subject: [PATCH] support workflow pattern --- dtmcli/{trans_test.go => cover_test.go} | 0 dtmcli/dtmimp/trans_base.go | 63 +++-- dtmcli/dtmimp/vars.go | 28 +- dtmcli/{msg.go => trans_msg.go} | 6 +- dtmcli/{saga.go => trans_saga.go} | 2 +- dtmcli/{tcc.go => trans_tcc.go} | 6 +- dtmcli/types.go | 46 ---- dtmcli/utils.go | 60 +++++ dtmcli/xa.go | 2 +- dtmgrpc/dtmgimp/types.go | 16 +- dtmgrpc/dtmgimp/utils.go | 21 +- dtmgrpc/dtmgpb/dtmgimp.pb.go | 333 ++++++++++++++++++------ dtmgrpc/dtmgpb/dtmgimp.proto | 16 +- dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go | 38 ++- dtmgrpc/type.go | 15 +- dtmgrpc/workflow/dummyReadCloser.go | 53 ++++ dtmgrpc/workflow/factory.go | 46 ++++ dtmgrpc/workflow/imp.go | 191 ++++++++++++++ dtmgrpc/workflow/rpc.go | 78 ++++++ dtmgrpc/workflow/server.go | 26 ++ dtmgrpc/workflow/utils.go | 138 ++++++++++ dtmgrpc/workflow/wfpb/wf.pb.go | 153 +++++++++++ dtmgrpc/workflow/wfpb/wf.proto | 15 ++ dtmgrpc/workflow/wfpb/wf_grpc.pb.go | 106 ++++++++ dtmgrpc/workflow/workflow.go | 190 ++++++++++++++ dtmsvr/api.go | 7 + dtmsvr/api_grpc.go | 16 ++ dtmsvr/api_http.go | 6 + dtmsvr/cron.go | 2 +- dtmsvr/storage/boltdb/boltdb.go | 19 +- dtmsvr/storage/trans.go | 10 +- dtmsvr/trans_class.go | 2 + dtmsvr/trans_process.go | 5 +- dtmsvr/trans_type_workflow.go | 43 +++ helper/test-cover.sh | 10 +- sqls/dtmsvr.storage.mysql.sql | 4 +- sqls/dtmsvr.storage.postgres.sql | 2 +- sqls/dtmsvr.storage.tdsql.sql | 2 +- test/busi/base_grpc.go | 24 +- test/busi/base_http.go | 7 + test/busi/base_workflow.go | 12 + test/busi/{busi.go => data.go} | 16 ++ test/busi/startup.go | 27 +- test/busi/utils.go | 2 +- test/main_test.go | 3 +- test/msg_jrpc_test.go | 2 +- test/workflow_test.go | 265 +++++++++++++++++++ 47 files changed, 1888 insertions(+), 246 deletions(-) rename dtmcli/{trans_test.go => cover_test.go} (100%) rename dtmcli/{msg.go => trans_msg.go} (94%) rename dtmcli/{saga.go => trans_saga.go} (96%) rename dtmcli/{tcc.go => trans_tcc.go} (91%) create mode 100644 dtmcli/utils.go create mode 100644 dtmgrpc/workflow/dummyReadCloser.go create mode 100644 dtmgrpc/workflow/factory.go create mode 100644 dtmgrpc/workflow/imp.go create mode 100644 dtmgrpc/workflow/rpc.go create mode 100644 dtmgrpc/workflow/server.go create mode 100644 dtmgrpc/workflow/utils.go create mode 100644 dtmgrpc/workflow/wfpb/wf.pb.go create mode 100644 dtmgrpc/workflow/wfpb/wf.proto create mode 100644 dtmgrpc/workflow/wfpb/wf_grpc.pb.go create mode 100644 dtmgrpc/workflow/workflow.go create mode 100644 dtmsvr/trans_type_workflow.go create mode 100644 test/busi/base_workflow.go rename test/busi/{busi.go => data.go} (85%) create mode 100644 test/workflow_test.go diff --git a/dtmcli/trans_test.go b/dtmcli/cover_test.go similarity index 100% rename from dtmcli/trans_test.go rename to dtmcli/cover_test.go diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index 876bf55..775261a 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -93,39 +93,28 @@ func TransBaseFromQuery(qs url.Values) *TransBase { return NewTransBase(EscapeGet(qs, "gid"), EscapeGet(qs, "trans_type"), EscapeGet(qs, "dtm"), EscapeGet(qs, "branch_id")) } -// TransCallDtm TransBase call dtm -func TransCallDtm(tb *TransBase, body interface{}, operation string) error { +// TransCallDtmExt TransBase call dtm +func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) { + if tb.Protocol == Jrpc { + return transCallDtmJrpc(tb, body, operation) + } if tb.RequestTimeout != 0 { RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) } - if tb.Protocol == Jrpc { - var result map[string]interface{} - resp, err := RestyClient.R(). - SetBody(map[string]interface{}{ - "jsonrpc": "2.0", - "id": "no-use", - "method": operation, - "params": body, - }). - SetResult(&result). - Post(tb.Dtm) - if err != nil { - return err - } - if resp.StatusCode() != http.StatusOK || result["error"] != nil { - return errors.New(resp.String()) - } - return nil - } resp, err := RestyClient.R(). SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { - return err + return nil, err } if resp.StatusCode() != http.StatusOK || strings.Contains(resp.String(), ResultFailure) { - return errors.New(resp.String()) + return nil, errors.New(resp.String()) } - return nil + return resp, nil +} + +func TransCallDtm(tb *TransBase, operation string) error { + _, err := TransCallDtmExt(tb, tb, operation) + return err } // TransRegisterBranch TransBase register a branch to dtm @@ -137,7 +126,8 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin for k, v := range added { m[k] = v } - return TransCallDtm(tb, m, operation) + _, err := TransCallDtmExt(tb, m, operation) + return err } // TransRequestBranch TransBase request branch result @@ -165,3 +155,26 @@ func TransRequestBranch(t *TransBase, method string, body interface{}, branchID } return resp, err } + +func transCallDtmJrpc(tb *TransBase, body interface{}, operation string) (*resty.Response, error) { + if tb.RequestTimeout != 0 { + RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) + } + var result map[string]interface{} + resp, err := RestyClient.R(). + SetBody(map[string]interface{}{ + "jsonrpc": "2.0", + "id": "no-use", + "method": operation, + "params": body, + }). + SetResult(&result). + Post(tb.Dtm) + if err != nil { + return nil, err + } + if resp.StatusCode() != http.StatusOK || result["error"] != nil { + return nil, errors.New(resp.String()) + } + return resp, nil +} diff --git a/dtmcli/dtmimp/vars.go b/dtmcli/dtmimp/vars.go index ad688d0..7bb036a 100644 --- a/dtmcli/dtmimp/vars.go +++ b/dtmcli/dtmimp/vars.go @@ -42,17 +42,21 @@ var PassthroughHeaders = []string{} // BarrierTableName the table name of barrier table var BarrierTableName = "dtm_barrier.barrier" +func BeforeRequest(c *resty.Client, r *resty.Request) error { + r.URL = MayReplaceLocalhost(r.URL) + u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL) + logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), u) + r.URL = u + return err +} + +func AfterResponse(c *resty.Client, resp *resty.Response) error { + r := resp.Request + logger.Debugf("requested: %d %s %s %s", resp.StatusCode(), r.Method, r.URL, resp.String()) + return nil +} + func init() { - RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { - r.URL = MayReplaceLocalhost(r.URL) - u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL) - logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), u) - r.URL = u - return err - }) - RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { - r := resp.Request - logger.Debugf("requested: %s %s %s", r.Method, r.URL, resp.String()) - return nil - }) + RestyClient.OnBeforeRequest(BeforeRequest) + RestyClient.OnAfterResponse(AfterResponse) } diff --git a/dtmcli/msg.go b/dtmcli/trans_msg.go similarity index 94% rename from dtmcli/msg.go rename to dtmcli/trans_msg.go index e0e828e..95f5bd1 100644 --- a/dtmcli/msg.go +++ b/dtmcli/trans_msg.go @@ -40,13 +40,13 @@ func (s *Msg) SetDelay(delay uint64) *Msg { // Prepare prepare the msg, msg will later be submitted func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared) - return dtmimp.TransCallDtm(&s.TransBase, s, "prepare") + return dtmimp.TransCallDtm(&s.TransBase, "prepare") } // Submit submit the msg func (s *Msg) Submit() error { s.BuildCustomOptions() - return dtmimp.TransCallDtm(&s.TransBase, s, "submit") + return dtmimp.TransCallDtm(&s.TransBase, "submit") } // DoAndSubmitDB short method for Do on db type. please see DoAndSubmit @@ -72,7 +72,7 @@ func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) _, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared) } if errors.Is(errb, ErrFailure) || errors.Is(err, ErrFailure) { - _ = dtmimp.TransCallDtm(&s.TransBase, s, "abort") + _ = dtmimp.TransCallDtm(&s.TransBase, "abort") } else if err == nil { err = s.Submit() } diff --git a/dtmcli/saga.go b/dtmcli/trans_saga.go similarity index 96% rename from dtmcli/saga.go rename to dtmcli/trans_saga.go index a705fbe..4067d7a 100644 --- a/dtmcli/saga.go +++ b/dtmcli/trans_saga.go @@ -43,7 +43,7 @@ func (s *Saga) SetConcurrent() *Saga { // Submit submit the saga trans func (s *Saga) Submit() error { s.BuildCustomOptions() - return dtmimp.TransCallDtm(&s.TransBase, s, "submit") + return dtmimp.TransCallDtm(&s.TransBase, "submit") } // BuildCustomOptions add custom options to the request context diff --git a/dtmcli/tcc.go b/dtmcli/trans_tcc.go similarity index 91% rename from dtmcli/tcc.go rename to dtmcli/trans_tcc.go index 1451427..257ec33 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/trans_tcc.go @@ -34,14 +34,14 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e func TccGlobalTransaction2(dtm string, gid string, custom func(*Tcc), tccFunc TccGlobalFunc) (rerr error) { tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")} custom(tcc) - rerr = dtmimp.TransCallDtm(&tcc.TransBase, tcc, "prepare") + rerr = dtmimp.TransCallDtm(&tcc.TransBase, "prepare") if rerr != nil { return rerr } defer dtmimp.DeferDo(&rerr, func() error { - return dtmimp.TransCallDtm(&tcc.TransBase, tcc, "submit") + return dtmimp.TransCallDtm(&tcc.TransBase, "submit") }, func() error { - return dtmimp.TransCallDtm(&tcc.TransBase, tcc, "abort") + return dtmimp.TransCallDtm(&tcc.TransBase, "abort") }) _, rerr = tccFunc(tcc) return diff --git a/dtmcli/types.go b/dtmcli/types.go index 1cf4b2e..c81abbe 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -7,24 +7,10 @@ package dtmcli import ( - "errors" - "fmt" - "net/http" - "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/go-resty/resty/v2" ) -// MustGenGid generate a new gid -func MustGenGid(server string) string { - res := map[string]string{} - resp, err := dtmimp.RestyClient.R().SetResult(&res).Get(server + "/newGid") - if err != nil || res["gid"] == "" { - panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp)) - } - return res["gid"] -} - // DB interface type DB = dtmimp.DB @@ -34,16 +20,6 @@ type TransOptions = dtmimp.TransOptions // DBConf declares db configuration type DBConf = dtmimp.DBConf -// String2DtmError translate string to dtm error -func String2DtmError(str string) error { - return map[string]error{ - ResultFailure: ErrFailure, - ResultOngoing: ErrOngoing, - ResultSuccess: nil, - "": nil, - }[str] -} - // SetCurrentDBType set currentDBType func SetCurrentDBType(dbType string) { dtmimp.SetCurrentDBType(dbType) @@ -81,25 +57,3 @@ func GetRestyClient() *resty.Client { func SetPassthroughHeaders(headers []string) { dtmimp.PassthroughHeaders = headers } - -// Result2HttpJSON return the http code and json result -// if result is error, the return proper code, else return StatusOK -func Result2HttpJSON(result interface{}) (code int, res interface{}) { - err, _ := result.(error) - if err == nil { - code = http.StatusOK - res = result - } else { - res = map[string]string{ - "error": err.Error(), - } - if errors.Is(err, ErrFailure) { - code = http.StatusConflict - } else if errors.Is(err, ErrOngoing) { - code = http.StatusTooEarly - } else if err != nil { - code = http.StatusInternalServerError - } - } - return -} diff --git a/dtmcli/utils.go b/dtmcli/utils.go new file mode 100644 index 0000000..63ad69b --- /dev/null +++ b/dtmcli/utils.go @@ -0,0 +1,60 @@ +package dtmcli + +import ( + "errors" + "fmt" + "net/http" + + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/go-resty/resty/v2" +) + +// MustGenGid generate a new gid +func MustGenGid(server string) string { + res := map[string]string{} + resp, err := dtmimp.RestyClient.R().SetResult(&res).Get(server + "/newGid") + if err != nil || res["gid"] == "" { + panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp)) + } + return res["gid"] +} + +// String2DtmError translate string to dtm error +func String2DtmError(str string) error { + return map[string]error{ + ResultFailure: ErrFailure, + ResultOngoing: ErrOngoing, + ResultSuccess: nil, + "": nil, + }[str] +} + +// Result2HttpJSON return the http code and json result +// if result is error, the return proper code, else return StatusOK +func Result2HttpJSON(result interface{}) (code int, res interface{}) { + err, _ := result.(error) + if err == nil { + code = http.StatusOK + res = result + } else { + res = map[string]string{ + "error": err.Error(), + } + if errors.Is(err, ErrFailure) { + code = http.StatusConflict + } else if errors.Is(err, ErrOngoing) { + code = http.StatusTooEarly + } else if err != nil { + code = http.StatusInternalServerError + } + } + return +} + +func IsRollback(resp *resty.Response, err error) bool { + return err == ErrFailure || dtmimp.RespAsErrorCompatible(resp) == ErrFailure +} + +func IsOngoing(resp *resty.Response, err error) bool { + return err == ErrOngoing || dtmimp.RespAsErrorCompatible(resp) == ErrOngoing +} diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 662f8d6..e38b967 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -69,7 +69,7 @@ func XaGlobalTransaction2(server string, gid string, custom func(*Xa), xaFunc Xa xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", server, "")} custom(xa) return dtmimp.XaHandleGlobalTrans(&xa.TransBase, func(action string) error { - return dtmimp.TransCallDtm(&xa.TransBase, xa, action) + return dtmimp.TransCallDtm(&xa.TransBase, action) }, func() error { _, rerr := xaFunc(xa) return rerr diff --git a/dtmgrpc/dtmgimp/types.go b/dtmgrpc/dtmgimp/types.go index bf17182..34059c8 100644 --- a/dtmgrpc/dtmgimp/types.go +++ b/dtmgrpc/dtmgimp/types.go @@ -15,7 +15,9 @@ import ( "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtmdriver" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) @@ -27,10 +29,11 @@ func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerI m, err := handler(ctx, req) res := fmt.Sprintf("%2dms %v %s %s %s", time.Since(began).Milliseconds(), err, info.FullMethod, dtmimp.MustMarshalString(m), dtmimp.MustMarshalString(req)) - if err != nil { - logger.Errorf("%s", res) - } else { + st, _ := status.FromError(err) + if err == nil || st != nil && st.Code() == codes.FailedPrecondition { logger.Infof("%s", res) + } else { + logger.Errorf("%s", res) } return m, err } @@ -42,10 +45,11 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c err := invoker(ctx, method, req, reply, cc, opts...) res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v", cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err) - if err != nil { - logger.Errorf("%s", res) + st, _ := status.FromError(err) + if err == nil || st != nil && st.Code() == codes.FailedPrecondition { + logger.Infof("%s", res) } else { - logger.Debugf("%s", res) + logger.Errorf("%s", res) } return err } diff --git a/dtmgrpc/dtmgimp/utils.go b/dtmgrpc/dtmgimp/utils.go index d10b321..0b02dde 100644 --- a/dtmgrpc/dtmgimp/utils.go +++ b/dtmgrpc/dtmgimp/utils.go @@ -24,10 +24,15 @@ func MustProtoMarshal(msg proto.Message) []byte { return b } -// DtmGrpcCall make a convenient call to dtm -func DtmGrpcCall(s *dtmimp.TransBase, operation string) error { - reply := emptypb.Empty{} - return MustGetGrpcConn(s.Dtm, false).Invoke(s.Context, "/dtmgimp.Dtm/"+operation, &dtmgpb.DtmRequest{ +// MustProtoUnmarshal must version of proto.Unmarshal +func MustProtoUnmarshal(data []byte, msg proto.Message) { + err := proto.Unmarshal(data, msg) + dtmimp.PanicIf(err != nil, err) +} + +// GetDtmRequest return a DtmRequest from TransBase +func GetDtmRequest(s *dtmimp.TransBase) *dtmgpb.DtmRequest { + return &dtmgpb.DtmRequest{ Gid: s.Gid, TransType: s.TransType, TransOptions: &dtmgpb.DtmTransOptions{ @@ -42,7 +47,13 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error { CustomedData: s.CustomData, BinPayloads: s.BinPayloads, Steps: dtmimp.MustMarshalString(s.Steps), - }, &reply) + } +} + +// DtmGrpcCall make a convenient call to dtm +func DtmGrpcCall(s *dtmimp.TransBase, operation string) error { + reply := emptypb.Empty{} + return MustGetGrpcConn(s.Dtm, false).Invoke(s.Context, "/dtmgimp.Dtm/"+operation, GetDtmRequest(s), &reply) } const dtmpre string = "dtm-" diff --git a/dtmgrpc/dtmgpb/dtmgimp.pb.go b/dtmgrpc/dtmgpb/dtmgimp.pb.go index 671c1c6..c81aec0 100644 --- a/dtmgrpc/dtmgpb/dtmgimp.pb.go +++ b/dtmgrpc/dtmgpb/dtmgimp.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 -// protoc v3.19.4 +// protoc-gen-go v1.28.0 +// protoc v3.17.3 // source: dtmgrpc/dtmgpb/dtmgimp.proto package dtmgpb @@ -114,13 +114,14 @@ type DtmRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"` - TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"` - TransOptions *DtmTransOptions `protobuf:"bytes,3,opt,name=TransOptions,proto3" json:"TransOptions,omitempty"` - CustomedData string `protobuf:"bytes,4,opt,name=CustomedData,proto3" json:"CustomedData,omitempty"` - BinPayloads [][]byte `protobuf:"bytes,5,rep,name=BinPayloads,proto3" json:"BinPayloads,omitempty"` // for MSG/SAGA branch payloads - QueryPrepared string `protobuf:"bytes,6,opt,name=QueryPrepared,proto3" json:"QueryPrepared,omitempty"` // for MSG - Steps string `protobuf:"bytes,7,opt,name=Steps,proto3" json:"Steps,omitempty"` + Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"` + TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"` + TransOptions *DtmTransOptions `protobuf:"bytes,3,opt,name=TransOptions,proto3" json:"TransOptions,omitempty"` + CustomedData string `protobuf:"bytes,4,opt,name=CustomedData,proto3" json:"CustomedData,omitempty"` + BinPayloads [][]byte `protobuf:"bytes,5,rep,name=BinPayloads,proto3" json:"BinPayloads,omitempty"` // for Msg/Saga/Workflow branch payloads + QueryPrepared string `protobuf:"bytes,6,opt,name=QueryPrepared,proto3" json:"QueryPrepared,omitempty"` // for Msg + Steps string `protobuf:"bytes,7,opt,name=Steps,proto3" json:"Steps,omitempty"` + ReqExtra map[string]string `protobuf:"bytes,8,rep,name=ReqExtra,proto3" json:"ReqExtra,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *DtmRequest) Reset() { @@ -204,6 +205,13 @@ func (x *DtmRequest) GetSteps() string { return "" } +func (x *DtmRequest) GetReqExtra() map[string]string { + if x != nil { + return x.ReqExtra + } + return nil +} + type DtmGidReply struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -338,6 +346,124 @@ func (x *DtmBranchRequest) GetBusiPayload() []byte { return nil } +type DtmProgressesReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Progresses []*DtmProgress `protobuf:"bytes,1,rep,name=Progresses,proto3" json:"Progresses,omitempty"` +} + +func (x *DtmProgressesReply) Reset() { + *x = DtmProgressesReply{} + if protoimpl.UnsafeEnabled { + mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DtmProgressesReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DtmProgressesReply) ProtoMessage() {} + +func (x *DtmProgressesReply) ProtoReflect() protoreflect.Message { + mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DtmProgressesReply.ProtoReflect.Descriptor instead. +func (*DtmProgressesReply) Descriptor() ([]byte, []int) { + return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP(), []int{4} +} + +func (x *DtmProgressesReply) GetProgresses() []*DtmProgress { + if x != nil { + return x.Progresses + } + return nil +} + +type DtmProgress struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"Status,omitempty"` + BinData []byte `protobuf:"bytes,2,opt,name=BinData,proto3" json:"BinData,omitempty"` + BranchID string `protobuf:"bytes,3,opt,name=BranchID,proto3" json:"BranchID,omitempty"` + Op string `protobuf:"bytes,4,opt,name=Op,proto3" json:"Op,omitempty"` +} + +func (x *DtmProgress) Reset() { + *x = DtmProgress{} + if protoimpl.UnsafeEnabled { + mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DtmProgress) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DtmProgress) ProtoMessage() {} + +func (x *DtmProgress) ProtoReflect() protoreflect.Message { + mi := &file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DtmProgress.ProtoReflect.Descriptor instead. +func (*DtmProgress) Descriptor() ([]byte, []int) { + return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP(), []int{5} +} + +func (x *DtmProgress) GetStatus() string { + if x != nil { + return x.Status + } + return "" +} + +func (x *DtmProgress) GetBinData() []byte { + if x != nil { + return x.BinData + } + return nil +} + +func (x *DtmProgress) GetBranchID() string { + if x != nil { + return x.BranchID + } + return "" +} + +func (x *DtmProgress) GetOp() string { + if x != nil { + return x.Op + } + return "" +} + var File_dtmgrpc_dtmgpb_dtmgimp_proto protoreflect.FileDescriptor var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{ @@ -368,7 +494,7 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{ 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x01, 0x22, 0xf8, 0x02, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, @@ -384,45 +510,69 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{ 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, - 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, - 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, - 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, - 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, - 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, - 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, - 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, - 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a, - 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, - 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, - 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, - 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, - 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, - 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, - 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, - 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, - 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, - 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, - 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x08, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, 0x1a, + 0x3b, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1f, 0x0a, 0x0b, + 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x47, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x22, 0x82, 0x02, + 0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, + 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x37, + 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64, + 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75, + 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, + 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, + 0x38, 0x01, 0x22, 0x4a, 0x0a, 0x12, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, + 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x34, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, + 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, + 0x73, 0x73, 0x52, 0x0a, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x6b, + 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, + 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, + 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, + 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x32, 0xf3, 0x02, 0x0a, 0x03, + 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, + 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, + 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, + 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, + 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, + 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, + 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, + 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, + 0x40, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x12, 0x13, 0x2e, + 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, + 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, + 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -437,35 +587,42 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP() []byte { return file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescData } -var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_dtmgrpc_dtmgpb_dtmgimp_proto_goTypes = []interface{}{ - (*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions - (*DtmRequest)(nil), // 1: dtmgimp.DtmRequest - (*DtmGidReply)(nil), // 2: dtmgimp.DtmGidReply - (*DtmBranchRequest)(nil), // 3: dtmgimp.DtmBranchRequest - nil, // 4: dtmgimp.DtmTransOptions.BranchHeadersEntry - nil, // 5: dtmgimp.DtmBranchRequest.DataEntry - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions + (*DtmRequest)(nil), // 1: dtmgimp.DtmRequest + (*DtmGidReply)(nil), // 2: dtmgimp.DtmGidReply + (*DtmBranchRequest)(nil), // 3: dtmgimp.DtmBranchRequest + (*DtmProgressesReply)(nil), // 4: dtmgimp.DtmProgressesReply + (*DtmProgress)(nil), // 5: dtmgimp.DtmProgress + nil, // 6: dtmgimp.DtmTransOptions.BranchHeadersEntry + nil, // 7: dtmgimp.DtmRequest.ReqExtraEntry + nil, // 8: dtmgimp.DtmBranchRequest.DataEntry + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty } var file_dtmgrpc_dtmgpb_dtmgimp_proto_depIdxs = []int32{ - 4, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry - 0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions - 5, // 2: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry - 6, // 3: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty - 1, // 4: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest - 1, // 5: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest - 1, // 6: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest - 3, // 7: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest - 2, // 8: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply - 6, // 9: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty - 6, // 10: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty - 6, // 11: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty - 6, // 12: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty - 8, // [8:13] is the sub-list for method output_type - 3, // [3:8] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 6, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry + 0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions + 7, // 2: dtmgimp.DtmRequest.ReqExtra:type_name -> dtmgimp.DtmRequest.ReqExtraEntry + 8, // 3: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry + 5, // 4: dtmgimp.DtmProgressesReply.Progresses:type_name -> dtmgimp.DtmProgress + 9, // 5: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty + 1, // 6: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest + 1, // 7: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest + 1, // 8: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest + 3, // 9: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest + 1, // 10: dtmgimp.Dtm.Progresses:input_type -> dtmgimp.DtmRequest + 2, // 11: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply + 9, // 12: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty + 9, // 13: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty + 9, // 14: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty + 9, // 15: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty + 4, // 16: dtmgimp.Dtm.Progresses:output_type -> dtmgimp.DtmProgressesReply + 11, // [11:17] is the sub-list for method output_type + 5, // [5:11] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_dtmgrpc_dtmgpb_dtmgimp_proto_init() } @@ -522,6 +679,30 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_init() { return nil } } + file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DtmProgressesReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DtmProgress); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -529,7 +710,7 @@ func file_dtmgrpc_dtmgpb_dtmgimp_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/dtmgrpc/dtmgpb/dtmgimp.proto b/dtmgrpc/dtmgpb/dtmgimp.proto index 0a76d53..1896d11 100644 --- a/dtmgrpc/dtmgpb/dtmgimp.proto +++ b/dtmgrpc/dtmgpb/dtmgimp.proto @@ -12,6 +12,7 @@ service Dtm { rpc Prepare(DtmRequest) returns (google.protobuf.Empty) {} rpc Abort(DtmRequest) returns (google.protobuf.Empty) {} rpc RegisterBranch(DtmBranchRequest) returns (google.protobuf.Empty) {} + rpc Progresses(DtmRequest) returns (DtmProgressesReply) {} } message DtmTransOptions { @@ -29,9 +30,10 @@ message DtmRequest { string TransType = 2; DtmTransOptions TransOptions = 3; string CustomedData = 4; - repeated bytes BinPayloads = 5; // for MSG/SAGA branch payloads - string QueryPrepared = 6; // for MSG + repeated bytes BinPayloads = 5; // for Msg/Saga/Workflow branch payloads + string QueryPrepared = 6; // for Msg string Steps = 7; + map ReqExtra = 8; } message DtmGidReply { @@ -47,3 +49,13 @@ message DtmBranchRequest { bytes BusiPayload = 6; } +message DtmProgressesReply { + repeated DtmProgress Progresses = 1; +} + +message DtmProgress { + string Status = 1; + bytes BinData = 2; + string BranchID = 3; + string Op = 4; +} \ No newline at end of file diff --git a/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go b/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go index db22f19..5045c33 100644 --- a/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go +++ b/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.19.4 +// - protoc v3.17.3 // source: dtmgrpc/dtmgpb/dtmgimp.proto package dtmgpb @@ -28,6 +28,7 @@ type DtmClient interface { Prepare(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) Abort(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) RegisterBranch(ctx context.Context, in *DtmBranchRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Progresses(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmProgressesReply, error) } type dtmClient struct { @@ -83,6 +84,15 @@ func (c *dtmClient) RegisterBranch(ctx context.Context, in *DtmBranchRequest, op return out, nil } +func (c *dtmClient) Progresses(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmProgressesReply, error) { + out := new(DtmProgressesReply) + err := c.cc.Invoke(ctx, "/dtmgimp.Dtm/Progresses", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // DtmServer is the server API for Dtm service. // All implementations must embed UnimplementedDtmServer // for forward compatibility @@ -92,6 +102,7 @@ type DtmServer interface { Prepare(context.Context, *DtmRequest) (*emptypb.Empty, error) Abort(context.Context, *DtmRequest) (*emptypb.Empty, error) RegisterBranch(context.Context, *DtmBranchRequest) (*emptypb.Empty, error) + Progresses(context.Context, *DtmRequest) (*DtmProgressesReply, error) mustEmbedUnimplementedDtmServer() } @@ -114,6 +125,9 @@ func (UnimplementedDtmServer) Abort(context.Context, *DtmRequest) (*emptypb.Empt func (UnimplementedDtmServer) RegisterBranch(context.Context, *DtmBranchRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method RegisterBranch not implemented") } +func (UnimplementedDtmServer) Progresses(context.Context, *DtmRequest) (*DtmProgressesReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Progresses not implemented") +} func (UnimplementedDtmServer) mustEmbedUnimplementedDtmServer() {} // UnsafeDtmServer may be embedded to opt out of forward compatibility for this service. @@ -217,6 +231,24 @@ func _Dtm_RegisterBranch_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Dtm_Progresses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DtmRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DtmServer).Progresses(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dtmgimp.Dtm/Progresses", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DtmServer).Progresses(ctx, req.(*DtmRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Dtm_ServiceDesc is the grpc.ServiceDesc for Dtm service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -244,6 +276,10 @@ var Dtm_ServiceDesc = grpc.ServiceDesc{ MethodName: "RegisterBranch", Handler: _Dtm_RegisterBranch_Handler, }, + { + MethodName: "Progresses", + Handler: _Dtm_Progresses_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "dtmgrpc/dtmgpb/dtmgimp.proto", diff --git a/dtmgrpc/type.go b/dtmgrpc/type.go index f92d830..e2a536e 100644 --- a/dtmgrpc/type.go +++ b/dtmgrpc/type.go @@ -8,6 +8,7 @@ package dtmgrpc import ( context "context" + "errors" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -22,24 +23,24 @@ import ( // DtmError2GrpcError translate dtm error to grpc error func DtmError2GrpcError(res interface{}) error { e, ok := res.(error) - if ok && e == dtmimp.ErrFailure { - return status.New(codes.Aborted, dtmcli.ResultFailure).Err() - } else if ok && e == dtmimp.ErrOngoing { - return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err() + if ok && errors.Is(e, dtmimp.ErrFailure) { + return status.New(codes.Aborted, e.Error()).Err() + } else if ok && errors.Is(e, dtmimp.ErrOngoing) { + return status.New(codes.FailedPrecondition, e.Error()).Err() } return e } // GrpcError2DtmError translate grpc error to dtm error func GrpcError2DtmError(err error) error { - st, ok := status.FromError(err) - if ok && st.Code() == codes.Aborted { + st, _ := status.FromError(err) + if st != nil && st.Code() == codes.Aborted { // version lower then v1.10, will specify Ongoing in code Aborted if st.Message() == dtmcli.ResultOngoing { return dtmcli.ErrOngoing } return dtmcli.ErrFailure - } else if ok && st.Code() == codes.FailedPrecondition { + } else if st != nil && st.Code() == codes.FailedPrecondition { return dtmcli.ErrOngoing } return err diff --git a/dtmgrpc/workflow/dummyReadCloser.go b/dtmgrpc/workflow/dummyReadCloser.go new file mode 100644 index 0000000..5a38a51 --- /dev/null +++ b/dtmgrpc/workflow/dummyReadCloser.go @@ -0,0 +1,53 @@ +package workflow + +import ( + "bytes" + "io" + "strings" +) + +// NewRespBodyFromString creates an io.ReadCloser from a string that +// is suitable for use as an http response body. +// +// To pass the content of an existing file as body use httpmock.File as in: +// httpmock.NewRespBodyFromString(httpmock.File("body.txt").String()) +func NewRespBodyFromString(body string) io.ReadCloser { + return &dummyReadCloser{orig: body} +} + +// NewRespBodyFromBytes creates an io.ReadCloser from a byte slice +// that is suitable for use as an http response body. +// +// To pass the content of an existing file as body use httpmock.File as in: +// httpmock.NewRespBodyFromBytes(httpmock.File("body.txt").Bytes()) +func NewRespBodyFromBytes(body []byte) io.ReadCloser { + return &dummyReadCloser{orig: body} +} + +type dummyReadCloser struct { + orig interface{} // string or []byte + body io.ReadSeeker // instanciated on demand from orig +} + +// setup ensures d.body is correctly initialized. +func (d *dummyReadCloser) setup() { + if d.body == nil { + switch body := d.orig.(type) { + case string: + d.body = strings.NewReader(body) + case []byte: + d.body = bytes.NewReader(body) + } + } +} + +func (d *dummyReadCloser) Read(p []byte) (n int, err error) { + d.setup() + return d.body.Read(p) +} + +func (d *dummyReadCloser) Close() error { + d.setup() + d.body.Seek(0, io.SeekEnd) // nolint: errcheck + return nil +} diff --git a/dtmgrpc/workflow/factory.go b/dtmgrpc/workflow/factory.go new file mode 100644 index 0000000..348dc19 --- /dev/null +++ b/dtmgrpc/workflow/factory.go @@ -0,0 +1,46 @@ +package workflow + +import ( + "fmt" + "net/url" + + "github.com/dtm-labs/dtm/dtmcli/logger" +) + +type workflowFactory struct { + protocol string + httpDtm string + httpCallback string + grpcDtm string + grpcCallback string + handlers map[string]WfFunc +} + +var defaultFac = workflowFactory{ + handlers: map[string]WfFunc{}, +} + +func (w *workflowFactory) execute(name string, gid string, data []byte) error { + handler := w.handlers[name] + if handler == nil { + return fmt.Errorf("workflow '%s' not registered. please register at startup", name) + } + wf := w.newWorkflow(name, gid, data) + return wf.process(handler, data) +} + +func (w *workflowFactory) executeByQS(qs url.Values, body []byte) error { + name := qs.Get("op") + gid := qs.Get("gid") + return w.execute(name, gid, body) +} + +func (w *workflowFactory) register(name string, handler WfFunc) error { + e := w.handlers[name] + if e != nil { + return fmt.Errorf("a handler already exists for %s", name) + } + logger.Debugf("workflow '%s' registered.", name) + w.handlers[name] = handler + return nil +} diff --git a/dtmgrpc/workflow/imp.go b/dtmgrpc/workflow/imp.go new file mode 100644 index 0000000..bea6fad --- /dev/null +++ b/dtmgrpc/workflow/imp.go @@ -0,0 +1,191 @@ +package workflow + +import ( + "context" + "errors" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc" + "github.com/go-resty/resty/v2" +) + +type workflowImp struct { + restyClient *resty.Client + idGen dtmimp.BranchIDGen + currentBranch string + progresses map[string]*stepResult + currentOp string + succeededOps []workflowPhase2Item + failedOps []workflowPhase2Item +} + +type workflowPhase2Item struct { + branchID, op string + fn WfPhase2Func +} + +func (wf *Workflow) loadProgresses() error { + progresses, err := wf.getProgress() + if err == nil { + wf.progresses = map[string]*stepResult{} + for _, p := range progresses { + wf.progresses[p.BranchID+"-"+p.Op] = &stepResult{ + Status: p.Status, + Data: p.BinData, + } + } + } + return err +} + +type wfMeta struct{} + +func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Workflow { + wf := &Workflow{ + TransBase: dtmimp.NewTransBase(gid, "workflow", "not inited", ""), + Name: name, + workflowImp: workflowImp{ + idGen: dtmimp.BranchIDGen{}, + succeededOps: []workflowPhase2Item{}, + failedOps: []workflowPhase2Item{}, + currentOp: dtmimp.OpAction, + }, + } + wf.Protocol = w.protocol + if w.protocol == dtmimp.ProtocolGRPC { + wf.Dtm = w.grpcDtm + wf.QueryPrepared = w.grpcCallback + } else { + wf.Dtm = w.httpDtm + wf.QueryPrepared = w.httpCallback + } + wf.newBranch() + wf.CustomData = dtmimp.MustMarshalString(map[string]interface{}{ + "name": wf.Name, + "data": data, + }) + wf.Context = context.WithValue(wf.Context, wfMeta{}, wf) + wf.initRestyClient() + return wf +} + +func (wf *Workflow) initRestyClient() { + wf.restyClient = resty.New() + wf.restyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { + r.SetQueryParams(map[string]string{ + "gid": wf.Gid, + "trans_type": wf.TransType, + "branch_id": wf.currentBranch, + "op": dtmimp.OpAction, + }) + err := dtmimp.BeforeRequest(c, r) + return err + }) + old := wf.restyClient.GetClient().Transport + wf.restyClient.GetClient().Transport = NewRoundTripper(old, wf) + wf.restyClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { + err := dtmimp.AfterResponse(c, r) + if err == nil && !wf.Options.DisalbeAutoError { + err = dtmimp.RespAsErrorCompatible(r) // check for dtm error + } + return err + }) +} + +func (wf *Workflow) process(handler WfFunc, data []byte) (err error) { + err = wf.prepare() + if err == nil { + err = wf.loadProgresses() + } + if err == nil { + err = handler(wf, data) + err = dtmgrpc.GrpcError2DtmError(err) + if err != nil && !errors.Is(err, dtmcli.ErrFailure) { + return err + } + err = wf.processPhase2(err) + } + if err == nil || errors.Is(err, dtmcli.ErrFailure) { + err1 := wf.submit(wfErrorToStatus(err)) + if err1 != nil { + return err1 + } + } + return err + +} + +func (wf *Workflow) saveResult(branchID string, op string, sr *stepResult) error { + if sr.Status == "" { + return sr.Error + } + return wf.registerBranch(sr.Data, branchID, op, sr.Status) +} + +func (wf *Workflow) processPhase2(err error) error { + ops := wf.succeededOps + if err == nil { + wf.currentOp = dtmimp.OpCommit + } else { + wf.currentOp = dtmimp.OpRollback + ops = wf.failedOps + } + for i := len(ops) - 1; i >= 0; i-- { + op := ops[i] + + err1 := wf.callPhase2(op.branchID, op.op, op.fn) + if err1 != nil { + return err1 + } + } + return err +} + +func (wf *Workflow) callPhase2(branchID string, op string, fn WfPhase2Func) error { + wf.currentBranch = branchID + r := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { + err := fn(bb) + if errors.Is(err, dtmcli.ErrFailure) { + panic("should not return ErrFail in phase2") + } + return stepResultFromLocal(nil, err) + }) + _, err := stepResultToLocal(r) + return err +} + +func (wf *Workflow) recordedDo(fn func(bb *dtmcli.BranchBarrier) *stepResult) *stepResult { + branchID := wf.currentBranch + r := wf.getStepResult() + if wf.currentOp == dtmimp.OpAction { // for action steps, an action will start a new branch + wf.newBranch() + } + if r != nil { + logger.Debugf("progress restored: %s %s %v %s %s", branchID, wf.currentOp, r.Error, r.Status, r.Data) + return r + } + bb := &dtmcli.BranchBarrier{ + TransType: wf.TransType, + Gid: wf.Gid, + BranchID: branchID, + Op: wf.currentOp, + } + r = fn(bb) + err := wf.saveResult(branchID, wf.currentOp, r) + if err != nil { + r = stepResultFromLocal(nil, err) + } + return r +} + +func (wf *Workflow) newBranch() { + wf.idGen.NewSubBranchID() + wf.currentBranch = wf.idGen.CurrentSubBranchID() +} + +func (wf *Workflow) getStepResult() *stepResult { + logger.Debugf("getStepResult: %s %v", wf.currentBranch+"-"+wf.currentOp, wf.progresses[wf.currentBranch+"-"+wf.currentOp]) + return wf.progresses[wf.currentBranch+"-"+wf.currentOp] +} diff --git a/dtmgrpc/workflow/rpc.go b/dtmgrpc/workflow/rpc.go new file mode 100644 index 0000000..f76076f --- /dev/null +++ b/dtmgrpc/workflow/rpc.go @@ -0,0 +1,78 @@ +package workflow + +import ( + "context" + "strings" + + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" + "google.golang.org/protobuf/types/known/emptypb" +) + +func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) { + if wf.Protocol == dtmimp.ProtocolGRPC { + var reply dtmgpb.DtmProgressesReply + err := dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/Progresses", + dtmgimp.GetDtmRequest(wf.TransBase), &reply) + if err == nil { + return reply.Progresses, nil + } + return nil, err + } + resp, err := dtmimp.RestyClient.R().SetQueryParam("gid", wf.Gid).Get(wf.Dtm + "/progresses") + var progresses []*dtmgpb.DtmProgress + if err == nil { + dtmimp.MustUnmarshal(resp.Body(), &progresses) + } + return progresses, err +} + +func (wf *Workflow) submit(status string) error { + if wf.Protocol == dtmimp.ProtocolHTTP { + m := map[string]interface{}{ + "gid": wf.Gid, + "trans_type": wf.TransType, + "req_extra": map[string]string{ + "status": status, + }, + } + _, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit") + return err + } + req := dtmgimp.GetDtmRequest(wf.TransBase) + req.ReqExtra = map[string]string{ + "status": status, + } + reply := emptypb.Empty{} + return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply) +} + +func (wf *Workflow) registerBranch(res []byte, branchID string, op string, status string) error { + logger.Errorf("registerBranch: %s %s %s", branchID, op, status) + if wf.Protocol == dtmimp.ProtocolHTTP { + return dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{ + "data": string(res), + "branch_id": branchID, + "op": op, + "status": status, + }, "registerBranch") + } + _, err := dtmgimp.MustGetDtmClient(wf.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{ + Gid: wf.Gid, + TransType: wf.TransType, + BranchID: branchID, + BusiPayload: res, + Data: map[string]string{"status": status, "op": op}, + }) + return err +} + +func (wf *Workflow) prepare() error { + operation := "prepare" + if wf.Protocol == dtmimp.ProtocolGRPC { + return dtmgimp.DtmGrpcCall(wf.TransBase, strings.Title(operation)) + } + return dtmimp.TransCallDtm(wf.TransBase, operation) +} diff --git a/dtmgrpc/workflow/server.go b/dtmgrpc/workflow/server.go new file mode 100644 index 0000000..ab2ebd6 --- /dev/null +++ b/dtmgrpc/workflow/server.go @@ -0,0 +1,26 @@ +package workflow + +import ( + "context" + + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmgrpc" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" +) + +type workflowServer struct { + wfpb.UnimplementedWorkflowServer +} + +func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*emptypb.Empty, error) { + if defaultFac.protocol != dtmimp.ProtocolGRPC { + return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first") + } + tb := dtmgimp.TransBaseFromGrpc(ctx) + err := defaultFac.execute(tb.Op, tb.Gid, wd.Data) + return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err) +} diff --git a/dtmgrpc/workflow/utils.go b/dtmgrpc/workflow/utils.go new file mode 100644 index 0000000..ae0a0d3 --- /dev/null +++ b/dtmgrpc/workflow/utils.go @@ -0,0 +1,138 @@ +package workflow + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" + "strconv" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/reflect/protoreflect" +) + +const HBranchID = "dtm-branch-id" +const HBranchOp = "dtm-branch-op" + +func statusToCode(status string) int { + if status == "succeed" { + return 200 + } + return 409 +} + +func wfErrorToStatus(err error) string { + if err == nil { + return dtmcli.StatusSucceed + } else if errors.Is(err, dtmcli.ErrFailure) { + return dtmcli.StatusFailed + } + return "" +} + +type stepResult struct { + Error error // if Error != nil || Status == "", result will not be saved + Status string // succeed | failed | "" + // if status == succeed, data is the result. + // if status == failed, data is the error message + Data []byte +} + +type roundTripper struct { + old http.RoundTripper + wf *Workflow +} + +func newJSONResponse(status int, result []byte) *http.Response { + return &http.Response{ + Status: strconv.Itoa(status), + StatusCode: status, + Body: NewRespBodyFromBytes(result), + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + ContentLength: -1, + } +} + +func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + wf := r.wf + if wf.currentOp != dtmimp.OpAction { // in phase 2, do not save, because it is saved outer + return r.old.RoundTrip(req) + } + sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { + resp, err := r.old.RoundTrip(req) + return stepResultFromHttp(resp, err) + }) + return stepResultToHttp(sr) +} + +func NewRoundTripper(old http.RoundTripper, wf *Workflow) http.RoundTripper { + return &roundTripper{old: old, wf: wf} +} + +func stepResultFromLocal(data []byte, err error) *stepResult { + return &stepResult{ + Error: err, + Status: wfErrorToStatus(err), + Data: data, + } +} + +func stepResultToLocal(s *stepResult) ([]byte, error) { + if s.Error != nil { + return nil, s.Error + } else if s.Status == dtmcli.StatusFailed { + return nil, fmt.Errorf("%s. %w", string(s.Data), dtmcli.ErrFailure) + } + return s.Data, nil +} + +func stepResultFromGrpc(reply interface{}, err error) *stepResult { + sr := &stepResult{} + st, ok := status.FromError(err) + if err == nil { + sr.Status = dtmcli.StatusSucceed + sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage)) + } else if ok && st.Code() == codes.Aborted { + sr.Status = dtmcli.StatusFailed + sr.Data = []byte(st.Message()) + } else { + sr.Error = err + } + return sr +} + +func stepResultToGrpc(s *stepResult, reply interface{}) error { + if s.Error != nil { + return s.Error + } else if s.Status == dtmcli.StatusSucceed { + dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage)) + return nil + } + return status.New(codes.Aborted, string(s.Data)).Err() +} + +func stepResultFromHttp(resp *http.Response, err error) *stepResult { + sr := &stepResult{Error: err} + if err == nil { + sr.Data, sr.Error = ioutil.ReadAll(resp.Body) + if resp.StatusCode == http.StatusOK { + sr.Status = dtmcli.StatusSucceed + } else if resp.StatusCode == http.StatusConflict { + sr.Status = dtmcli.StatusFailed + } + } + return sr +} + +func stepResultToHttp(s *stepResult) (*http.Response, error) { + if s.Error != nil { + return nil, s.Error + } + return newJSONResponse(statusToCode(s.Status), s.Data), nil +} diff --git a/dtmgrpc/workflow/wfpb/wf.pb.go b/dtmgrpc/workflow/wfpb/wf.pb.go new file mode 100644 index 0000000..fa8a278 --- /dev/null +++ b/dtmgrpc/workflow/wfpb/wf.pb.go @@ -0,0 +1,153 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.17.3 +// source: dtmgrpc/workflow/wfpb/wf.proto + +package wfpb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WorkflowData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"` +} + +func (x *WorkflowData) Reset() { + *x = WorkflowData{} + if protoimpl.UnsafeEnabled { + mi := &file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WorkflowData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkflowData) ProtoMessage() {} + +func (x *WorkflowData) ProtoReflect() protoreflect.Message { + mi := &file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkflowData.ProtoReflect.Descriptor instead. +func (*WorkflowData) Descriptor() ([]byte, []int) { + return file_dtmgrpc_workflow_wfpb_wf_proto_rawDescGZIP(), []int{0} +} + +func (x *WorkflowData) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_dtmgrpc_workflow_wfpb_wf_proto protoreflect.FileDescriptor + +var file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc = []byte{ + 0x0a, 0x1e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, + 0x6f, 0x77, 0x2f, 0x77, 0x66, 0x70, 0x62, 0x2f, 0x77, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x22, 0x0a, 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x66, + 0x6c, 0x6f, 0x77, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x32, 0x47, 0x0a, 0x08, 0x57, + 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x3b, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, + 0x74, 0x65, 0x12, 0x16, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x57, 0x6f, + 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x77, 0x66, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_dtmgrpc_workflow_wfpb_wf_proto_rawDescOnce sync.Once + file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData = file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc +) + +func file_dtmgrpc_workflow_wfpb_wf_proto_rawDescGZIP() []byte { + file_dtmgrpc_workflow_wfpb_wf_proto_rawDescOnce.Do(func() { + file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData = protoimpl.X.CompressGZIP(file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData) + }) + return file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData +} + +var file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_dtmgrpc_workflow_wfpb_wf_proto_goTypes = []interface{}{ + (*WorkflowData)(nil), // 0: workflow.WorkflowData + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty +} +var file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs = []int32{ + 0, // 0: workflow.Workflow.Execute:input_type -> workflow.WorkflowData + 1, // 1: workflow.Workflow.Execute:output_type -> google.protobuf.Empty + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_dtmgrpc_workflow_wfpb_wf_proto_init() } +func file_dtmgrpc_workflow_wfpb_wf_proto_init() { + if File_dtmgrpc_workflow_wfpb_wf_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WorkflowData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_dtmgrpc_workflow_wfpb_wf_proto_goTypes, + DependencyIndexes: file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs, + MessageInfos: file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes, + }.Build() + File_dtmgrpc_workflow_wfpb_wf_proto = out.File + file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc = nil + file_dtmgrpc_workflow_wfpb_wf_proto_goTypes = nil + file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs = nil +} diff --git a/dtmgrpc/workflow/wfpb/wf.proto b/dtmgrpc/workflow/wfpb/wf.proto new file mode 100644 index 0000000..10cfeb8 --- /dev/null +++ b/dtmgrpc/workflow/wfpb/wf.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option go_package = "./wfpb"; +import "google/protobuf/empty.proto"; + +package workflow; + +// The Workflow service definition. +service Workflow { + rpc Execute(WorkflowData) returns (google.protobuf.Empty) {} +} + +message WorkflowData { + bytes Data = 1; +} diff --git a/dtmgrpc/workflow/wfpb/wf_grpc.pb.go b/dtmgrpc/workflow/wfpb/wf_grpc.pb.go new file mode 100644 index 0000000..deb3abf --- /dev/null +++ b/dtmgrpc/workflow/wfpb/wf_grpc.pb.go @@ -0,0 +1,106 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.17.3 +// source: dtmgrpc/workflow/wfpb/wf.proto + +package wfpb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// WorkflowClient is the client API for Workflow service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type WorkflowClient interface { + Execute(ctx context.Context, in *WorkflowData, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type workflowClient struct { + cc grpc.ClientConnInterface +} + +func NewWorkflowClient(cc grpc.ClientConnInterface) WorkflowClient { + return &workflowClient{cc} +} + +func (c *workflowClient) Execute(ctx context.Context, in *WorkflowData, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/workflow.Workflow/Execute", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// WorkflowServer is the server API for Workflow service. +// All implementations must embed UnimplementedWorkflowServer +// for forward compatibility +type WorkflowServer interface { + Execute(context.Context, *WorkflowData) (*emptypb.Empty, error) + mustEmbedUnimplementedWorkflowServer() +} + +// UnimplementedWorkflowServer must be embedded to have forward compatible implementations. +type UnimplementedWorkflowServer struct { +} + +func (UnimplementedWorkflowServer) Execute(context.Context, *WorkflowData) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Execute not implemented") +} +func (UnimplementedWorkflowServer) mustEmbedUnimplementedWorkflowServer() {} + +// UnsafeWorkflowServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to WorkflowServer will +// result in compilation errors. +type UnsafeWorkflowServer interface { + mustEmbedUnimplementedWorkflowServer() +} + +func RegisterWorkflowServer(s grpc.ServiceRegistrar, srv WorkflowServer) { + s.RegisterService(&Workflow_ServiceDesc, srv) +} + +func _Workflow_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WorkflowData) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WorkflowServer).Execute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/workflow.Workflow/Execute", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WorkflowServer).Execute(ctx, req.(*WorkflowData)) + } + return interceptor(ctx, in, info, handler) +} + +// Workflow_ServiceDesc is the grpc.ServiceDesc for Workflow service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Workflow_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "workflow.Workflow", + HandlerType: (*WorkflowServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Execute", + Handler: _Workflow_Execute_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "dtmgrpc/workflow/wfpb/wf.proto", +} diff --git a/dtmgrpc/workflow/workflow.go b/dtmgrpc/workflow/workflow.go new file mode 100644 index 0000000..98b1081 --- /dev/null +++ b/dtmgrpc/workflow/workflow.go @@ -0,0 +1,190 @@ +package workflow + +import ( + "context" + "database/sql" + "fmt" + "net/url" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb" + "github.com/go-resty/resty/v2" + "google.golang.org/grpc" +) + +// InitHttp will init Workflow engine to use http +// param httpDtm specify the dtm address +// param callback specify the url for dtm to callback if a workflow timeout +func InitHttp(httpDtm string, callback string) { + defaultFac.protocol = dtmimp.ProtocolHTTP + defaultFac.httpDtm = httpDtm + defaultFac.httpCallback = callback +} + +// InitHttp will init Workflow engine to use grpc +// param dtm specify the dtm address +// param clientHost specify the client host for dtm to callback if a workflow timeout +// param grpcServer specify the grpc server +func InitGrpc(grpcDtm string, clientHost string, grpcServer *grpc.Server) { + defaultFac.protocol = dtmimp.ProtocolGRPC + defaultFac.grpcDtm = grpcDtm + wfpb.RegisterWorkflowServer(grpcServer, &workflowServer{}) + defaultFac.grpcCallback = clientHost + "/workflow.Workflow/Execute" +} + +// SetProtocolForTest change protocol directly. only used by test +func SetProtocolForTest(protocol string) { + defaultFac.protocol = protocol +} + +// Register will register a workflow with the specified name +func Register(name string, handler WfFunc) error { + return defaultFac.register(name, handler) +} + +// Execute will execute a workflow with the gid and specified params +// if the workflow with the gid does not exist, then create a new workflow and execute it +// if the workflow with the gid exists, resume to execute it +func Execute(name string, gid string, data []byte) error { + return defaultFac.execute(name, gid, data) +} + +// ExecuteByQS is like Execute, but name and gid will be obtained from qs +func ExecuteByQS(qs url.Values, body []byte) error { + return defaultFac.executeByQS(qs, body) +} + +// WorkflowOptions is for specifying workflow options +type WorkflowOptions struct { + // if this flag is set true, then Workflow's restyClient will keep the origin http response + // or else, Workflow's restyClient will convert http reponse to error if status code is not 200 + DisalbeAutoError bool +} + +// Workflow is the type for a workflow +type Workflow struct { + // The name of the workflow + Name string + Options WorkflowOptions + *dtmimp.TransBase + workflowImp +} + +// WfFunc is the type for workflow function +type WfFunc func(wf *Workflow, data []byte) error + +// WfPhase2Func is the type for phase 2 function +// param bb is a BranchBarrier, which is introduced by http://d.dtm.pub/practice/barrier.html +type WfPhase2Func func(bb *dtmcli.BranchBarrier) error + +// NewRequest return a new resty request, whose progress will be recorded +func (wf *Workflow) NewRequest() *resty.Request { + return wf.restyClient.R().SetContext(wf.Context) +} + +// DefineSagaPhase2 will define a saga branch transaction +// param compensate specify a function for the compensation of next workflow action +func (wf *Workflow) DefineSagaPhase2(compensate WfPhase2Func) { + branchID := wf.currentBranch + wf.failedOps = append(wf.failedOps, workflowPhase2Item{ + branchID: branchID, + op: dtmimp.OpRollback, + fn: compensate, + }) +} + +// DefineSagaPhase2 will define a tcc branch transaction +// param confirm, concel specify the confirm and cancel operation of next workflow action +func (wf *Workflow) DefineTccPhase2(confirm, cancel WfPhase2Func) { + branchID := wf.currentBranch + wf.failedOps = append(wf.failedOps, workflowPhase2Item{ + branchID: branchID, + op: dtmimp.OpRollback, + fn: cancel, + }) + wf.succeededOps = append(wf.succeededOps, workflowPhase2Item{ + branchID: branchID, + op: dtmimp.OpCommit, + fn: confirm, + }) +} + +// DoAction will do an action which will be recored +func (wf *Workflow) DoAction(fn func(bb *dtmcli.BranchBarrier) ([]byte, error)) ([]byte, error) { + res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { + r, e := fn(bb) + return stepResultFromLocal(r, e) + }) + return stepResultToLocal(res) +} + +func (wf *Workflow) DoXaAction(dbConf dtmcli.DBConf, fn func(db *sql.DB) ([]byte, error)) ([]byte, error) { + branchID := wf.currentBranch + res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { + sBusi := "business" + k := bb.BranchID + "-" + sBusi + if wf.progresses[k] != nil { + return &stepResult{ + Error: fmt.Errorf("error occur at prepare, not resumable, to rollback. %w", dtmcli.ErrFailure), + } + } + sr := &stepResult{} + wf.TransBase.BranchID = branchID + wf.TransBase.Op = sBusi + err := dtmimp.XaHandleLocalTrans(wf.TransBase, dbConf, func(d *sql.DB) error { + r, e := fn(d) + sr.Data = r + if e == nil { + e = wf.saveResult(branchID, sBusi, &stepResult{Status: dtmcli.StatusSucceed}) + } + return e + }) + sr.Error = err + sr.Status = wfErrorToStatus(err) + return sr + }) + phase2 := func(bb *dtmcli.BranchBarrier) error { + return dtmimp.XaHandlePhase2(bb.Gid, dbConf, bb.BranchID, bb.Op) + } + wf.succeededOps = append(wf.succeededOps, workflowPhase2Item{ + branchID: branchID, + op: dtmimp.OpCommit, + fn: phase2, + }) + wf.failedOps = append(wf.failedOps, workflowPhase2Item{ + branchID: branchID, + op: dtmimp.OpRollback, + fn: phase2, + }) + return res.Data, res.Error +} + +// Interceptor is the middleware for workflow to capture grpc call result +func Interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req)) + wf := ctx.Value(wfMeta{}).(*Workflow) + + origin := func() error { + ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm) + err := invoker(ctx1, method, req, reply, cc, opts...) + res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v", + cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err) + if err != nil { + logger.Errorf("%s", res) + } else { + logger.Debugf("%s", res) + } + return err + } + if wf.currentOp != dtmimp.OpAction { + return origin() + } + sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { + err := origin() + return stepResultFromGrpc(reply, err) + }) + return stepResultToGrpc(sr, reply) +} diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 831884a..42fd8cd 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -20,6 +20,9 @@ var Version = "" func svcSubmit(t *TransGlobal) interface{} { t.Status = dtmcli.StatusSubmitted + if t.ReqExtra != nil && t.ReqExtra["status"] != "" { + t.Status = t.ReqExtra["status"] + } branches, err := t.saveNew() if err == storage.ErrUniqueConflict { @@ -82,6 +85,10 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st branches[0].URL = data["url"] branches[1].Op = dtmimp.OpCommit branches[1].URL = data["url"] + } else if transType == "workflow" { + branches = []TransBranch{*branch} + branches[0].Status = data["status"] + branches[0].Op = data["op"] } else { return fmt.Errorf("unknow trans type: %s", transType) } diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go index 8bc840a..f62ad91 100644 --- a/dtmsvr/api_grpc.go +++ b/dtmsvr/api_grpc.go @@ -48,3 +48,19 @@ func (s *dtmServer) RegisterBranch(ctx context.Context, in *pb.DtmBranchRequest) }, in.Data) return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(r) } + +func (s *dtmServer) Progresses(ctx context.Context, in *pb.DtmRequest) (*pb.DtmProgressesReply, error) { + branches := GetStore().FindBranches(in.Gid) + reply := &pb.DtmProgressesReply{ + Progresses: []*pb.DtmProgress{}, + } + for _, b := range branches { + reply.Progresses = append(reply.Progresses, &pb.DtmProgress{ + Status: b.Status, + BranchID: b.BranchID, + Op: b.Op, + BinData: b.BinData, + }) + } + return reply, nil +} diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index 19aa7c6..52e993e 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -31,6 +31,7 @@ func addRoute(engine *gin.Engine) { engine.POST("/api/dtmsvr/registerXaBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk engine.POST("/api/dtmsvr/registerTccBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query)) + engine.GET("/api/dtmsvr/progresses", dtmutil.WrapHandler2(progresses)) engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all)) engine.GET("/api/dtmsvr/resetCronTime", dtmutil.WrapHandler2(resetCronTime)) @@ -85,6 +86,11 @@ func query(c *gin.Context) interface{} { return map[string]interface{}{"transaction": trans, "branches": branches} } +func progresses(c *gin.Context) interface{} { + gid := c.Query("gid") + return GetStore().FindBranches(gid) +} + func all(c *gin.Context) interface{} { position := c.Query("position") sLimit := dtmimp.OrString(c.Query("limit"), "100") diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index a768fa1..685d2cd 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -35,7 +35,7 @@ func CronTransOnce() (gid string) { trans.WaitResult = true branches := GetStore().FindBranches(gid) err := trans.Process(branches) - dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err) + dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure) && !errors.Is(err, dtmcli.ErrOngoing), err) return } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index bccc401..d61a777 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmsvr/storage" @@ -389,11 +388,7 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS err := s.boltDb.Update(func(t *bolt.Tx) error { cursor := t.Bucket(bucketIndex).Cursor() toDelete := [][]byte{} - for trans == nil || trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed { - k, v := cursor.First() - if k == nil || string(k) > min { - return storage.ErrNotFound - } + for k, v := cursor.First(); k != nil && string(k) <= min && (trans == nil || trans.IsFinished()); k, v = cursor.Next() { trans = tGetGlobal(t, string(v)) toDelete = append(toDelete, k) } @@ -401,14 +396,14 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS err := t.Bucket(bucketIndex).Delete(k) dtmimp.E2P(err) } - trans.NextCronTime = &next - tPutGlobal(t, trans) - tPutIndex(t, next.Unix(), trans.Gid) + if trans != nil && !trans.IsFinished() { + trans.NextCronTime = &next + tPutGlobal(t, trans) + // this put should be after delete, because the data may be the same + tPutIndex(t, next.Unix(), trans.Gid) + } return nil }) - if err == storage.ErrNotFound { - return nil - } dtmimp.E2P(err) return trans } diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index 04b13c4..2df43be 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -51,12 +51,16 @@ func (g *TransGlobalStore) String() string { return dtmimp.MustMarshalString(g) } +func (g *TransGlobalStore) IsFinished() bool { + return g.Status == dtmcli.StatusFailed || g.Status == dtmcli.StatusSucceed +} + // TransBranchStore branch transaction type TransBranchStore struct { dtmutil.ModelBase - Gid string `json:"gid,omitempty"` - URL string `json:"url,omitempty"` - BinData []byte + Gid string `json:"gid,omitempty"` + URL string `json:"url,omitempty"` + BinData []byte `json:"bin_data,omitempty"` BranchID string `json:"branch_id,omitempty"` Op string `json:"op,omitempty"` Status string `json:"status,omitempty"` diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 1e239c0..2659315 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -16,6 +16,7 @@ import ( // TransGlobal global transaction type TransGlobal struct { storage.TransGlobalStore + ReqExtra map[string]string `json:"req_extra"` Context context.Context lastTouched time.Time // record the start time of process updateBranchSync bool @@ -109,6 +110,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal RequestTimeout: o.RequestTimeout, }, }} + r.ReqExtra = c.ReqExtra if c.Steps != "" { dtmimp.MustUnmarshalString(c.Steps, &r.Steps) } diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go index 7a8fe46..5186d84 100644 --- a/dtmsvr/trans_process.go +++ b/dtmsvr/trans_process.go @@ -7,6 +7,7 @@ package dtmsvr import ( + "errors" "fmt" "time" @@ -34,7 +35,7 @@ func (t *TransGlobal) process(branches []TransBranch) error { if !t.WaitResult { go func() { err := t.processInner(branches) - if err != nil { + if err != nil && !errors.Is(err, dtmimp.ErrOngoing) { logger.Errorf("processInner err: %v", err) } }() @@ -54,7 +55,7 @@ func (t *TransGlobal) process(branches []TransBranch) error { func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) { defer handlePanic(&rerr) defer func() { - if rerr != nil && rerr != dtmcli.ErrOngoing { + if rerr != nil && !errors.Is(rerr, dtmcli.ErrOngoing) { logger.Errorf("processInner got error: %s", rerr.Error()) } if TransProcessedTestChan != nil { diff --git a/dtmsvr/trans_type_workflow.go b/dtmsvr/trans_type_workflow.go new file mode 100644 index 0000000..d076a81 --- /dev/null +++ b/dtmsvr/trans_type_workflow.go @@ -0,0 +1,43 @@ +package dtmsvr + +import ( + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb" +) + +type transWorkflowProcessor struct { + *TransGlobal +} + +func init() { + registorProcessorCreator("workflow", func(trans *TransGlobal) transProcessor { return &transWorkflowProcessor{TransGlobal: trans} }) +} + +func (t *transWorkflowProcessor) GenBranches() []TransBranch { + return []TransBranch{} +} + +type cWorkflowCustom struct { + Name string `json:"name"` + Data []byte `json:"data"` +} + +func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error { + if t.Status == dtmcli.StatusSubmitted { // client workflow finished + t.changeStatus(dtmcli.StatusSucceed) + return nil + } else if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { + return nil + } + + cmc := cWorkflowCustom{} + dtmimp.MustUnmarshalString(t.CustomData, &cmc) + data := cmc.Data + if t.Protocol == dtmimp.ProtocolGRPC { + wd := wfpb.WorkflowData{Data: cmc.Data} + data = dtmgimp.MustProtoMarshal(&wd) + } + return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data) +} diff --git a/helper/test-cover.sh b/helper/test-cover.sh index e490032..22de5ca 100755 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -1,13 +1,15 @@ set -x -echo "" > coverage.txt -for store in redis mysql boltdb postgres; do +echo "mode: count" coverage.txt +for store in redis boltdb mysql postgres; do for d in $(go list ./... | grep -v vendor); do - TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 + TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/workflow,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 if [ -f profile.out ]; then - cat profile.out >> coverage.txt + cat profile.out | grep -v 'mode:' >> coverage.txt echo > profile.out fi done done +# go tool cover -html=coverage.txt + curl -s https://codecov.io/bash | bash diff --git a/sqls/dtmsvr.storage.mysql.sql b/sqls/dtmsvr.storage.mysql.sql index 283bb75..e2489d8 100644 --- a/sqls/dtmsvr.storage.mysql.sql +++ b/sqls/dtmsvr.storage.mysql.sql @@ -7,14 +7,14 @@ CREATE TABLE if not EXISTS dtm.trans_global ( `gid` varchar(128) NOT NULL COMMENT 'global transaction id', `trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg', `status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked', - `query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for 2-phase message', + `query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow', `protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, `options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout', - `custom_data` varchar(256) DEFAULT '' COMMENT 'custom data for transaction', + `custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction', `next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job', `next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job', `owner` varchar(128) not null default '' comment 'who is locking this trans', diff --git a/sqls/dtmsvr.storage.postgres.sql b/sqls/dtmsvr.storage.postgres.sql index 2a1cf67..83a5ed3 100644 --- a/sqls/dtmsvr.storage.postgres.sql +++ b/sqls/dtmsvr.storage.postgres.sql @@ -13,7 +13,7 @@ CREATE TABLE if not EXISTS trans_global ( finish_time timestamp(0) with time zone DEFAULT NULL, rollback_time timestamp(0) with time zone DEFAULT NULL, options varchar(1024) DEFAULT '', - custom_data varchar(256) DEFAULT '', + custom_data varchar(1024) DEFAULT '', next_cron_interval int default null, next_cron_time timestamp(0) with time zone default null, owner varchar(128) not null default '', diff --git a/sqls/dtmsvr.storage.tdsql.sql b/sqls/dtmsvr.storage.tdsql.sql index ded809f..de124c5 100644 --- a/sqls/dtmsvr.storage.tdsql.sql +++ b/sqls/dtmsvr.storage.tdsql.sql @@ -14,7 +14,7 @@ CREATE TABLE if not EXISTS dtm.trans_global ( `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, `options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout', - `custom_data` varchar(256) DEFAULT '' COMMENT 'custom data for transaction', + `custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction', `next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job', `next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job', `owner` varchar(128) not null default '' comment 'who is locking this trans', diff --git a/test/busi/base_grpc.go b/test/busi/base_grpc.go index ff7daf1..d52d907 100644 --- a/test/busi/base_grpc.go +++ b/test/busi/base_grpc.go @@ -21,6 +21,7 @@ import ( "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" grpc "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -32,22 +33,31 @@ var BusiGrpc = fmt.Sprintf("localhost:%d", BusiGrpcPort) // DtmClient grpc client for dtm var DtmClient dtmgpb.DtmClient +var BusiCli BusiClient + // GrpcStartup for grpc -func GrpcStartup() { +func GrpcStartup() *grpc.Server { conn, err := grpc.Dial(dtmutil.DefaultGrpcServer, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(dtmgimp.GrpcClientLog)) logger.FatalIfError(err) DtmClient = dtmgpb.NewDtmClient(conn) logger.Debugf("dtm client inited") - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort)) + conn1, err := grpc.Dial(BusiGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(workflow.Interceptor)) logger.FatalIfError(err) + BusiCli = NewBusiClient(conn1) + s := grpc.NewServer(grpc.UnaryInterceptor(dtmgimp.GrpcServerLog)) RegisterBusiServer(s, &busiServer{}) - go func() { - logger.Debugf("busi grpc listening at %v", lis.Addr()) - err := s.Serve(lis) - logger.FatalIfError(err) - }() + return s +} + +// GrpcServe start to serve grpc +func GrpcServe(server *grpc.Server) { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort)) + logger.FatalIfError(err) + logger.Debugf("busi grpc listening at %v", lis.Addr()) + err = server.Serve(lis) + logger.FatalIfError(err) } // busiServer is used to implement busi.BusiServer. diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 11bc525..3f1cf11 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -10,10 +10,12 @@ import ( "database/sql" "errors" "fmt" + "io/ioutil" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" "github.com/dtm-labs/dtm/dtmutil" "github.com/gin-gonic/gin" "gorm.io/driver/mysql" @@ -77,6 +79,11 @@ func BaseAppStartup() *gin.Engine { // BaseAddRoute add base route handler func BaseAddRoute(app *gin.Engine) { + app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} { + data, err := ioutil.ReadAll(ctx.Request.Body) + logger.FatalIfError(err) + return workflow.ExecuteByQS(ctx.Request.URL.Query(), data) + })) app.POST(BusiAPI+"/TransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} { return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn") })) diff --git a/test/busi/base_workflow.go b/test/busi/base_workflow.go new file mode 100644 index 0000000..bb44a8a --- /dev/null +++ b/test/busi/base_workflow.go @@ -0,0 +1,12 @@ +package busi + +import ( + "github.com/dtm-labs/dtm/dtmgrpc/workflow" + "github.com/dtm-labs/dtm/dtmutil" + "google.golang.org/grpc" +) + +func WorkflowStarup(server *grpc.Server) { + workflow.InitHttp(dtmServer, Busi+"/workflow/resume") + workflow.InitGrpc(dtmutil.DefaultGrpcServer, BusiGrpc, server) +} diff --git a/test/busi/busi.go b/test/busi/data.go similarity index 85% rename from test/busi/busi.go rename to test/busi/data.go index 7d24985..de72f0d 100644 --- a/test/busi/busi.go +++ b/test/busi/data.go @@ -9,6 +9,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmutil" "github.com/gin-gonic/gin" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -16,6 +17,21 @@ import ( status "google.golang.org/grpc/status" ) +// PopulateDB populate example mysql data +func PopulateDB(skipDrop bool) { + resetXaData() + file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) + dtmutil.RunSQLScript(BusiConf, file, skipDrop) + file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) + dtmutil.RunSQLScript(BusiConf, file, skipDrop) + file = fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) + dtmutil.RunSQLScript(BusiConf, file, skipDrop) + _, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear + dtmimp.E2P(err) + SetRedisBothAccount(10000, 10000) + SetupMongoBarrierAndBusi() +} + // TransOutUID 1 const TransOutUID = 1 diff --git a/test/busi/startup.go b/test/busi/startup.go index 08de468..a658061 100644 --- a/test/busi/startup.go +++ b/test/busi/startup.go @@ -1,31 +1,14 @@ package busi import ( - "context" - "fmt" - - "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmutil" "github.com/gin-gonic/gin" ) // Startup startup the busi's grpc and http service func Startup() *gin.Engine { - GrpcStartup() - return BaseAppStartup() -} - -// PopulateDB populate example mysql data -func PopulateDB(skipDrop bool) { - resetXaData() - file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) - dtmutil.RunSQLScript(BusiConf, file, skipDrop) - file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) - dtmutil.RunSQLScript(BusiConf, file, skipDrop) - file = fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) - dtmutil.RunSQLScript(BusiConf, file, skipDrop) - _, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear - dtmimp.E2P(err) - SetRedisBothAccount(10000, 10000) - SetupMongoBarrierAndBusi() + svr := GrpcStartup() + app := BaseAppStartup() + WorkflowStarup(svr) + go GrpcServe(svr) + return app } diff --git a/test/busi/utils.go b/test/busi/utils.go index 21b54a9..db07113 100644 --- a/test/busi/utils.go +++ b/test/busi/utils.go @@ -84,7 +84,7 @@ func SetGrpcHeaderForHeadersYes(ctx context.Context, method string, req, reply i // SetHTTPHeaderForHeadersYes interceptor to set head for HeadersYes func SetHTTPHeaderForHeadersYes(c *resty.Client, r *resty.Request) error { - if b, ok := r.Body.(*dtmcli.Saga); ok && strings.HasSuffix(b.Gid, "HeadersYes") { + if b, ok := r.Body.(*dtmimp.TransBase); ok && strings.HasSuffix(b.Gid, "HeadersYes") { logger.Debugf("set test_header for url: %s", r.URL) r.SetHeader("test_header", "yes") } diff --git a/test/main_test.go b/test/main_test.go index a9f48d8..3e9bae5 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmgrpc" "github.com/dtm-labs/dtm/dtmsvr" @@ -39,7 +40,7 @@ func TestMain(m *testing.M) { dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHTTPHeaderForHeadersYes) dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil }) - tenv := os.Getenv("TEST_STORE") + tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis) conf.Store.Host = "localhost" conf.Store.Driver = tenv if tenv == "boltdb" { diff --git a/test/msg_jrpc_test.go b/test/msg_jrpc_test.go index 0ee0c23..5562783 100644 --- a/test/msg_jrpc_test.go +++ b/test/msg_jrpc_test.go @@ -130,7 +130,7 @@ func TestMsgJprcAbnormal(t *testing.T) { func TestMsgJprcAbnormal2(t *testing.T) { tb := dtmimp.NewTransBase(dtmimp.GetFuncName(), "msg", dtmutil.DefaultJrpcServer, "01") tb.Protocol = "json-rpc" - err := dtmimp.TransCallDtm(tb, "", "newGid") + _, err := dtmimp.TransCallDtmExt(tb, "", "newGid") assert.Nil(t, err) } diff --git a/test/workflow_test.go b/test/workflow_test.go new file mode 100644 index 0000000..6a1d9ef --- /dev/null +++ b/test/workflow_test.go @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2021 yedf. All rights reserved. + * Use of this source code is governed by a BSD-style + * license that can be found in the LICENSE file. + */ + +package test + +import ( + "database/sql" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" + "github.com/dtm-labs/dtm/dtmgrpc/workflow" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowNormal(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + req := busi.GenTransReq(30, false, false) + gid := dtmimp.GetFuncName() + + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.TransReq + dtmimp.MustUnmarshal(data, &req) + _, err := wf.NewRequest().SetBody(req).Post(Busi + "/TransOut") + if err != nil { + return err + } + _, err = wf.NewRequest().SetBody(req).Post(Busi + "/TransIn") + if err != nil { + return err + } + return nil + }) + + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} + +func TestWorkflowRollback(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + + req := busi.GenTransReq(30, false, true) + gid := dtmimp.GetFuncName() + + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.TransReq + dtmimp.MustUnmarshal(data, &req) + wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { + _, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom") + return err + }) + _, err := wf.DoAction(func(bb *dtmcli.BranchBarrier) ([]byte, error) { + return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "") + }) + }) + if err != nil { + return err + } + wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { + return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "") + }) + }) + _, err = wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn") + if err != nil { + return err + } + return nil + }) + + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) + assert.Error(t, err, dtmcli.ErrFailure) + assert.Equal(t, StatusFailed, getTransStatus(gid)) + waitTransProcessed(gid) +} + +func TestWorkflowGrpcNormal(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { + _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) + return err + }) + _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) + if err != nil { + return err + } + wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { + _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) + return err + }) + _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) + return err + }) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Error(t, err, dtmcli.ErrFailure) + assert.Equal(t, StatusFailed, getTransStatus(gid)) + waitTransProcessed(gid) +} + +var ongoingStep = 0 + +func fetchOngoingStep(dest int) bool { + c := ongoingStep + logger.Debugf("ongoing step is: %d", c) + if c == dest { + ongoingStep++ + return true + } + return false +} + +func TestWorkflowGrpcRollbackResume(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + gid := dtmimp.GetFuncName() + ongoingStep = 0 + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + var req busi.BusiReq + dtmgimp.MustProtoUnmarshal(data, &req) + if fetchOngoingStep(0) { + return dtmcli.ErrOngoing + } + wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { + if fetchOngoingStep(4) { + return dtmcli.ErrOngoing + } + _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) + return err + }) + _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) + if fetchOngoingStep(1) { + return dtmcli.ErrOngoing + } + if err != nil { + return err + } + wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { + if fetchOngoingStep(3) { + return dtmcli.ErrOngoing + } + _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) + return err + }) + _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) + if fetchOngoingStep(2) { + return dtmcli.ErrOngoing + } + return err + }) + req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"} + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Error(t, err, dtmcli.ErrOngoing) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + // next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan + go waitTransProcessed(gid) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusFailed, getTransStatus(gid)) +} + +func TestWorkflowXaAction(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + _, err = wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) + }) + return err + }) + err := workflow.Execute(gid, gid, nil) + assert.Nil(t, err) + waitTransProcessed(gid) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +} + +func TestWorkflowXaRollback(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + _, err = wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) + logger.FatalIfError(e) + return nil, dtmcli.ErrFailure + }) + return err + }) + err := workflow.Execute(gid, gid, nil) + assert.Equal(t, dtmcli.ErrFailure, err) + waitTransProcessed(gid) + assert.Equal(t, StatusFailed, getTransStatus(gid)) +} + +func TestWorkflowXaResume(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) + ongoingStep = 0 + gid := dtmimp.GetFuncName() + workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + if fetchOngoingStep(0) { + return nil, dtmcli.ErrOngoing + } + return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + _, err = wf.DoXaAction(busi.BusiConf, func(db *sql.DB) ([]byte, error) { + if fetchOngoingStep(1) { + return nil, dtmcli.ErrOngoing + } + return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) + }) + if err != nil { + return err + } + if fetchOngoingStep(2) { + return dtmcli.ErrOngoing + } + + return err + }) + err := workflow.Execute(gid, gid, nil) + assert.Equal(t, dtmcli.ErrOngoing, err) + + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusPrepared, getTransStatus(gid)) + // next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan + go waitTransProcessed(gid) + cronTransOnceForwardNow(t, gid, 1000) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +}