From 2d7f17aee32933153e27a33065d694c6afe3b0d8 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 11 Aug 2022 15:49:00 +0800 Subject: [PATCH] workflow support return values --- client/dtmgrpc/dtmgpb/dtmgimp.pb.go | 72 ++++++++++++++++------------- client/dtmgrpc/dtmgpb/dtmgimp.proto | 1 + client/workflow/factory.go | 13 ++---- client/workflow/imp.go | 61 +++++++++++++----------- client/workflow/rpc.go | 27 +++++------ client/workflow/server.go | 2 +- client/workflow/workflow.go | 23 ++++++++- client/workflow/workflow_test.go | 4 +- dtmsvr/api.go | 2 +- dtmsvr/storage/trans.go | 1 + dtmsvr/trans_status.go | 12 +++++ sqls/dtmsvr.storage.mysql.sql | 3 +- sqls/dtmsvr.storage.postgres.sql | 1 + test/workflow_http_ret_test.go | 34 ++++++++++++++ 14 files changed, 166 insertions(+), 90 deletions(-) create mode 100644 test/workflow_http_ret_test.go diff --git a/client/dtmgrpc/dtmgpb/dtmgimp.pb.go b/client/dtmgrpc/dtmgpb/dtmgimp.pb.go index 6b7a547..b93c882 100644 --- a/client/dtmgrpc/dtmgpb/dtmgimp.pb.go +++ b/client/dtmgrpc/dtmgpb/dtmgimp.pb.go @@ -417,6 +417,7 @@ type DtmTransaction struct { Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"` Status string `protobuf:"bytes,2,opt,name=Status,proto3" json:"Status,omitempty"` RollbackReason string `protobuf:"bytes,3,opt,name=RollbackReason,proto3" json:"RollbackReason,omitempty"` + Result string `protobuf:"bytes,4,opt,name=Result,proto3" json:"Result,omitempty"` } func (x *DtmTransaction) Reset() { @@ -472,6 +473,13 @@ func (x *DtmTransaction) GetRollbackReason() string { return "" } +func (x *DtmTransaction) GetResult() string { + if x != nil { + return x.Result + } + return "" +} + type DtmProgress struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -626,45 +634,47 @@ var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{ 0x69, 0x6f, 0x6e, 0x12, 0x34, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x0a, 0x50, - 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x62, 0x0a, 0x0e, 0x44, 0x74, 0x6d, + 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x7a, 0x0a, 0x0e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x26, 0x0a, 0x0e, 0x52, 0x6f, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x52, - 0x6f, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x6b, 0x0a, - 0x0b, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1a, - 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x32, 0xf8, 0x02, 0x0a, 0x03, 0x44, - 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, - 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, - 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, - 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, + 0x6f, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x16, 0x0a, + 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x52, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x6b, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, + 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x42, + 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, + 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, + 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x4f, 0x70, 0x32, 0xf8, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, + 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, + 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, + 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, + 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, + 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, - 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, - 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, - 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, - 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, - 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, - 0x77, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, - 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, - 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, + 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, + 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, + 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, + 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, + 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, + 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/client/dtmgrpc/dtmgpb/dtmgimp.proto b/client/dtmgrpc/dtmgpb/dtmgimp.proto index 5e3459a..225064a 100644 --- a/client/dtmgrpc/dtmgpb/dtmgimp.proto +++ b/client/dtmgrpc/dtmgpb/dtmgimp.proto @@ -59,6 +59,7 @@ message DtmTransaction { string Gid = 1; string Status = 2; string RollbackReason = 3; + string Result = 4; } message DtmProgress { diff --git a/client/workflow/factory.go b/client/workflow/factory.go index dc3c379..51e9f12 100644 --- a/client/workflow/factory.go +++ b/client/workflow/factory.go @@ -2,7 +2,6 @@ package workflow import ( "fmt" - "net/url" "github.com/dtm-labs/logger" ) @@ -20,10 +19,10 @@ var defaultFac = workflowFactory{ handlers: map[string]*wfItem{}, } -func (w *workflowFactory) execute(name string, gid string, data []byte) error { +func (w *workflowFactory) execute(name string, gid string, data []byte) ([]byte, error) { handler := w.handlers[name] if handler == nil { - return fmt.Errorf("workflow '%s' not registered. please register at startup", name) + return nil, fmt.Errorf("workflow '%s' not registered. please register at startup", name) } wf := w.newWorkflow(name, gid, data) for _, fn := range handler.custom { @@ -32,13 +31,7 @@ func (w *workflowFactory) execute(name string, gid string, data []byte) error { return wf.process(handler.fn, data) } -func (w *workflowFactory) executeByQS(qs url.Values, body []byte) error { - name := qs.Get("op") - gid := qs.Get("gid") - return w.execute(name, gid, body) -} - -func (w *workflowFactory) register(name string, handler WfFunc, custom ...func(wf *Workflow)) error { +func (w *workflowFactory) register(name string, handler WfFunc2, custom ...func(wf *Workflow)) error { e := w.handlers[name] if e != nil { return fmt.Errorf("a handler already exists for %s", name) diff --git a/client/workflow/imp.go b/client/workflow/imp.go index 38e5b9c..7b9d003 100644 --- a/client/workflow/imp.go +++ b/client/workflow/imp.go @@ -2,11 +2,13 @@ package workflow import ( "context" + "encoding/base64" "errors" "fmt" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb" "github.com/dtm-labs/logger" "github.com/go-resty/resty/v2" ) @@ -29,22 +31,18 @@ type workflowPhase2Item struct { fn WfPhase2Func } -func (wf *Workflow) loadProgresses() error { - progresses, err := wf.getProgress() - if err == nil { - wf.progresses = map[string]*stepResult{} - for _, p := range progresses { - sr := &stepResult{ - Status: p.Status, - Data: p.BinData, - } - if sr.Status == dtmcli.StatusFailed { - sr.Error = dtmcli.ErrorMessage2Error(string(p.BinData), dtmcli.ErrFailure) - } - wf.progresses[p.BranchID+"-"+p.Op] = sr +func (wf *Workflow) initProgress(progresses []*dtmgpb.DtmProgress) { + wf.progresses = map[string]*stepResult{} + for _, p := range progresses { + sr := &stepResult{ + Status: p.Status, + Data: p.BinData, + } + if sr.Status == dtmcli.StatusFailed { + sr.Error = dtmcli.ErrorMessage2Error(string(p.BinData), dtmcli.ErrFailure) } + wf.progresses[p.BranchID+"-"+p.Op] = sr } - return err } type wfMeta struct{} @@ -95,24 +93,33 @@ func (wf *Workflow) initRestyClient() { wf.restyClient.GetClient().Transport = newRoundTripper(old, wf) } -func (wf *Workflow) process(handler WfFunc, data []byte) (err error) { - err = wf.loadProgresses() - if err == nil { - err = handler(wf, data) - err = wf.Options.GRPCError2DtmError(err) - if err != nil && !errors.Is(err, dtmcli.ErrFailure) { - return err - } - err = wf.processPhase2(err) +func (wf *Workflow) process(handler WfFunc2, data []byte) (res []byte, err error) { + reply, err2 := wf.getProgress() + if err2 != nil { + return nil, err2 } + + status := reply.Transaction.Status + if status == dtmcli.StatusSucceed { + return base64.StdEncoding.DecodeString(reply.Transaction.Result) + } else if status == dtmcli.StatusFailed { + return nil, dtmcli.ErrorMessage2Error(reply.Transaction.RollbackReason, dtmcli.ErrFailure) + } + wf.initProgress(reply.Progresses) + res, err = handler(wf, data) + err = wf.Options.GRPCError2DtmError(err) + if err != nil && !errors.Is(err, dtmcli.ErrFailure) { + return + } + err = wf.processPhase2(err) + if err == nil || errors.Is(err, dtmcli.ErrFailure) { - err1 := wf.submit(err) + err1 := wf.submit(res, err) if err1 != nil { - return err1 + return nil, err1 } } - return err - + return } func (wf *Workflow) saveResult(branchID string, op string, sr *stepResult) error { diff --git a/client/workflow/rpc.go b/client/workflow/rpc.go index a6c63e0..bfcdd49 100644 --- a/client/workflow/rpc.go +++ b/client/workflow/rpc.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "encoding/base64" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -10,47 +11,43 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) { +func (wf *Workflow) getProgress() (*dtmgpb.DtmProgressesReply, error) { if wf.Protocol == dtmimp.ProtocolGRPC { var reply dtmgpb.DtmProgressesReply err := dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/PrepareWorkflow", dtmgimp.GetDtmRequest(wf.TransBase), &reply) - if err == nil { - return reply.Progresses, nil - } - return nil, err + return &reply, err } resp, err := dtmcli.GetRestyClient().R().SetBody(wf.TransBase).Post(wf.Dtm + "/prepareWorkflow") var reply dtmgpb.DtmProgressesReply if err == nil { dtmimp.MustUnmarshal(resp.Body(), &reply) } - return reply.Progresses, err + return &reply, err } -func (wf *Workflow) submit(err error) error { +func (wf *Workflow) submit(result []byte, err error) error { status := wfErrorToStatus(err) reason := "" if err != nil { reason = err.Error() } + extra := map[string]string{ + "status": status, + "rollback_reason": reason, + "result": base64.StdEncoding.EncodeToString(result), + } if wf.Protocol == dtmimp.ProtocolHTTP { m := map[string]interface{}{ "gid": wf.Gid, "trans_type": wf.TransType, - "req_extra": map[string]string{ - "status": status, - "rollback_reason": reason, - }, + "req_extra": extra, } _, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit") return err } req := dtmgimp.GetDtmRequest(wf.TransBase) - req.ReqExtra = map[string]string{ - "status": status, - "rollback_reason": reason, - } + req.ReqExtra = extra reply := emptypb.Empty{} return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply) } diff --git a/client/workflow/server.go b/client/workflow/server.go index 1cd51fe..441d2d0 100644 --- a/client/workflow/server.go +++ b/client/workflow/server.go @@ -21,6 +21,6 @@ func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*e return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first") } tb := dtmgimp.TransBaseFromGrpc(ctx) - err := defaultFac.execute(tb.Op, tb.Gid, wd.Data) + _, err := defaultFac.execute(tb.Op, tb.Gid, wd.Data) return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err) } diff --git a/client/workflow/workflow.go b/client/workflow/workflow.go index f16117f..b6a822a 100644 --- a/client/workflow/workflow.go +++ b/client/workflow/workflow.go @@ -43,6 +43,13 @@ func SetProtocolForTest(protocol string) { // Register will register a workflow with the specified name func Register(name string, handler WfFunc, custom ...func(wf *Workflow)) error { + return defaultFac.register(name, func(wf *Workflow, data []byte) ([]byte, error) { + return nil, handler(wf, data) + }, custom...) +} + +// Register2 is the same as Register, but workflow func can return result +func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error { return defaultFac.register(name, handler, custom...) } @@ -50,12 +57,21 @@ func Register(name string, handler WfFunc, custom ...func(wf *Workflow)) error { // if the workflow with the gid does not exist, then create a new workflow and execute it // if the workflow with the gid exists, resume to execute it func Execute(name string, gid string, data []byte) error { + _, err := defaultFac.execute(name, gid, data) + return err +} + +// Execute2 is the same as Execute, but workflow func can return result +func Execute2(name string, gid string, data []byte) ([]byte, error) { return defaultFac.execute(name, gid, data) } // ExecuteByQS is like Execute, but name and gid will be obtained from qs func ExecuteByQS(qs url.Values, body []byte) error { - return defaultFac.executeByQS(qs, body) + name := qs.Get("op") + gid := qs.Get("gid") + _, err := defaultFac.execute(name, gid, body) + return err } // Options is for specifying workflow options @@ -83,13 +99,16 @@ type Workflow struct { } type wfItem struct { - fn WfFunc + fn WfFunc2 custom []func(*Workflow) } // WfFunc is the type for workflow function type WfFunc func(wf *Workflow, data []byte) error +// WfFunc2 is the type for workflow function with return value +type WfFunc2 func(wf *Workflow, data []byte) ([]byte, error) + // WfPhase2Func is the type for phase 2 function // param bb is a BranchBarrier, which is introduced by http://d.dtm.pub/practice/barrier.html type WfPhase2Func func(bb *dtmcli.BranchBarrier) error diff --git a/client/workflow/workflow_test.go b/client/workflow/workflow_test.go index 422df2c..93cc7b0 100644 --- a/client/workflow/workflow_test.go +++ b/client/workflow/workflow_test.go @@ -10,10 +10,10 @@ import ( func TestAbnormal(t *testing.T) { fname := dtmimp.GetFuncName() - err := defaultFac.execute(fname, fname, nil) + _, err := defaultFac.execute(fname, fname, nil) assert.Error(t, err) - err = defaultFac.register(fname, func(wf *Workflow, data []byte) error { return nil }) + err = defaultFac.register(fname, func(wf *Workflow, data []byte) ([]byte, error) { return nil, nil }) assert.Nil(t, err) err = defaultFac.register(fname, nil) assert.Error(t, err) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 557b03b..25c24c9 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -22,7 +22,7 @@ var Version = "" func svcSubmit(t *TransGlobal) interface{} { if t.TransType == "workflow" { t.Status = dtmcli.StatusPrepared - t.changeStatus(t.ReqExtra["status"], withRollbackReason(t.ReqExtra["rollback_reason"])) + t.changeStatus(t.ReqExtra["status"], withRollbackReason(t.ReqExtra["rollback_reason"]), withResult(t.ReqExtra["result"])) return nil } t.Status = dtmcli.StatusSubmitted diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index 152b01f..28fe2b6 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -32,6 +32,7 @@ type TransGlobalStore struct { Protocol string `json:"protocol,omitempty"` FinishTime *time.Time `json:"finish_time,omitempty"` RollbackTime *time.Time `json:"rollback_time,omitempty"` + Result string `json:"result,omitempty"` RollbackReason string `json:"rollback_reason,omitempty"` Options string `json:"options,omitempty"` CustomData string `json:"custom_data,omitempty"` diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index fb89e7f..e17746f 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -38,7 +38,9 @@ func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) { type changeStatusParams struct { rollbackReason string + result string } + type changeStatusOption func(c *changeStatusParams) func withRollbackReason(rollbackReason string) changeStatusOption { @@ -47,6 +49,12 @@ func withRollbackReason(rollbackReason string) changeStatusOption { } } +func withResult(result string) changeStatusOption { + return func(c *changeStatusParams) { + c.result = result + } +} + func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) { statusParams := &changeStatusParams{} for _, opt := range opts { @@ -65,6 +73,10 @@ func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) { t.RollbackReason = statusParams.rollbackReason updates = append(updates, "rollback_reason") } + if statusParams.result != "" { + t.Result = statusParams.result + updates = append(updates, "result") + } t.UpdateTime = &now GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed) logger.Infof("ChangeGlobalStatus to %s ok for %s", status, t.TransGlobalStore.String()) diff --git a/sqls/dtmsvr.storage.mysql.sql b/sqls/dtmsvr.storage.mysql.sql index 13cc0cf..ba31ec4 100644 --- a/sqls/dtmsvr.storage.mysql.sql +++ b/sqls/dtmsvr.storage.mysql.sql @@ -18,7 +18,8 @@ CREATE TABLE if not EXISTS dtm.trans_global ( `next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job', `next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job', `owner` varchar(128) not null default '' comment 'who is locking this trans', - `ext_data` TEXT comment 'extended data for this trans', + `ext_data` TEXT comment 'result for this trans. currently used in workflow pattern', + `result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction', `rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction', PRIMARY KEY (`id`), UNIQUE KEY `gid` (`gid`), diff --git a/sqls/dtmsvr.storage.postgres.sql b/sqls/dtmsvr.storage.postgres.sql index ed68549..750dced 100644 --- a/sqls/dtmsvr.storage.postgres.sql +++ b/sqls/dtmsvr.storage.postgres.sql @@ -18,6 +18,7 @@ CREATE TABLE if not EXISTS trans_global ( next_cron_time timestamp(0) with time zone default null, owner varchar(128) not null default '', ext_data text, + result varchar(1024) DEFAULT '', rollback_reason varchar(1024) DEFAULT '', PRIMARY KEY (id), CONSTRAINT gid UNIQUE (gid) diff --git a/test/workflow_http_ret_test.go b/test/workflow_http_ret_test.go new file mode 100644 index 0000000..434653a --- /dev/null +++ b/test/workflow_http_ret_test.go @@ -0,0 +1,34 @@ +package test + +import ( + "testing" + + "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/client/workflow" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowRet(t *testing.T) { + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + req := busi.GenReqHTTP(30, false, false) + gid := dtmimp.GetFuncName() + + workflow.Register2(gid, func(wf *workflow.Workflow, data []byte) ([]byte, error) { + var req busi.ReqHTTP + dtmimp.MustUnmarshal(data, &req) + _, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") + return []byte("result of workflow"), err + }) + + ret, err := workflow.Execute2(gid, gid, dtmimp.MustMarshal(req)) + assert.Nil(t, err) + assert.Equal(t, "result of workflow", string(ret)) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) + + // the second execute will return result directly + ret, err = workflow.Execute2(gid, gid, dtmimp.MustMarshal(req)) + assert.Nil(t, err) + assert.Equal(t, "result of workflow", string(ret)) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) +}