Browse Source

workflow support return values

pull/340/head
yedf2 4 years ago
parent
commit
2d7f17aee3
  1. 72
      client/dtmgrpc/dtmgpb/dtmgimp.pb.go
  2. 1
      client/dtmgrpc/dtmgpb/dtmgimp.proto
  3. 13
      client/workflow/factory.go
  4. 61
      client/workflow/imp.go
  5. 27
      client/workflow/rpc.go
  6. 2
      client/workflow/server.go
  7. 23
      client/workflow/workflow.go
  8. 4
      client/workflow/workflow_test.go
  9. 2
      dtmsvr/api.go
  10. 1
      dtmsvr/storage/trans.go
  11. 12
      dtmsvr/trans_status.go
  12. 3
      sqls/dtmsvr.storage.mysql.sql
  13. 1
      sqls/dtmsvr.storage.postgres.sql
  14. 34
      test/workflow_http_ret_test.go

72
client/dtmgrpc/dtmgpb/dtmgimp.pb.go

@ -417,6 +417,7 @@ type DtmTransaction struct {
Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"`
Status string `protobuf:"bytes,2,opt,name=Status,proto3" json:"Status,omitempty"`
RollbackReason string `protobuf:"bytes,3,opt,name=RollbackReason,proto3" json:"RollbackReason,omitempty"`
Result string `protobuf:"bytes,4,opt,name=Result,proto3" json:"Result,omitempty"`
}
func (x *DtmTransaction) Reset() {
@ -472,6 +473,13 @@ func (x *DtmTransaction) GetRollbackReason() string {
return ""
}
func (x *DtmTransaction) GetResult() string {
if x != nil {
return x.Result
}
return ""
}
type DtmProgress struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -626,45 +634,47 @@ var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x69, 0x6f, 0x6e, 0x12, 0x34, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d,
0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x0a, 0x50,
0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x62, 0x0a, 0x0e, 0x44, 0x74, 0x6d,
0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x7a, 0x0a, 0x0e, 0x44, 0x74, 0x6d,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x10, 0x0a, 0x03, 0x47,
0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x16, 0x0a,
0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x26, 0x0a, 0x0e, 0x52, 0x6f, 0x6c, 0x6c, 0x62, 0x61, 0x63,
0x6b, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x52,
0x6f, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x6b, 0x0a,
0x0b, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06,
0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x74,
0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1a,
0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70,
0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x32, 0xf8, 0x02, 0x0a, 0x03, 0x44,
0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44,
0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06,
0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70,
0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65,
0x6f, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x16, 0x0a,
0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x52,
0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x6b, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67,
0x72, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07,
0x42, 0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x42,
0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x4f, 0x70, 0x32, 0xf8, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65,
0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70,
0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a,
0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74,
0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12,
0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73,
0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67,
0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45,
0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f,
0x77, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70,
0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65,
0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70,
0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63,
0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42,
0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72,
0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67,
0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67,
0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a,
0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
}
var (

1
client/dtmgrpc/dtmgpb/dtmgimp.proto

@ -59,6 +59,7 @@ message DtmTransaction {
string Gid = 1;
string Status = 2;
string RollbackReason = 3;
string Result = 4;
}
message DtmProgress {

13
client/workflow/factory.go

@ -2,7 +2,6 @@ package workflow
import (
"fmt"
"net/url"
"github.com/dtm-labs/logger"
)
@ -20,10 +19,10 @@ var defaultFac = workflowFactory{
handlers: map[string]*wfItem{},
}
func (w *workflowFactory) execute(name string, gid string, data []byte) error {
func (w *workflowFactory) execute(name string, gid string, data []byte) ([]byte, error) {
handler := w.handlers[name]
if handler == nil {
return fmt.Errorf("workflow '%s' not registered. please register at startup", name)
return nil, fmt.Errorf("workflow '%s' not registered. please register at startup", name)
}
wf := w.newWorkflow(name, gid, data)
for _, fn := range handler.custom {
@ -32,13 +31,7 @@ func (w *workflowFactory) execute(name string, gid string, data []byte) error {
return wf.process(handler.fn, data)
}
func (w *workflowFactory) executeByQS(qs url.Values, body []byte) error {
name := qs.Get("op")
gid := qs.Get("gid")
return w.execute(name, gid, body)
}
func (w *workflowFactory) register(name string, handler WfFunc, custom ...func(wf *Workflow)) error {
func (w *workflowFactory) register(name string, handler WfFunc2, custom ...func(wf *Workflow)) error {
e := w.handlers[name]
if e != nil {
return fmt.Errorf("a handler already exists for %s", name)

61
client/workflow/imp.go

@ -2,11 +2,13 @@ package workflow
import (
"context"
"encoding/base64"
"errors"
"fmt"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
"github.com/dtm-labs/logger"
"github.com/go-resty/resty/v2"
)
@ -29,22 +31,18 @@ type workflowPhase2Item struct {
fn WfPhase2Func
}
func (wf *Workflow) loadProgresses() error {
progresses, err := wf.getProgress()
if err == nil {
wf.progresses = map[string]*stepResult{}
for _, p := range progresses {
sr := &stepResult{
Status: p.Status,
Data: p.BinData,
}
if sr.Status == dtmcli.StatusFailed {
sr.Error = dtmcli.ErrorMessage2Error(string(p.BinData), dtmcli.ErrFailure)
}
wf.progresses[p.BranchID+"-"+p.Op] = sr
func (wf *Workflow) initProgress(progresses []*dtmgpb.DtmProgress) {
wf.progresses = map[string]*stepResult{}
for _, p := range progresses {
sr := &stepResult{
Status: p.Status,
Data: p.BinData,
}
if sr.Status == dtmcli.StatusFailed {
sr.Error = dtmcli.ErrorMessage2Error(string(p.BinData), dtmcli.ErrFailure)
}
wf.progresses[p.BranchID+"-"+p.Op] = sr
}
return err
}
type wfMeta struct{}
@ -95,24 +93,33 @@ func (wf *Workflow) initRestyClient() {
wf.restyClient.GetClient().Transport = newRoundTripper(old, wf)
}
func (wf *Workflow) process(handler WfFunc, data []byte) (err error) {
err = wf.loadProgresses()
if err == nil {
err = handler(wf, data)
err = wf.Options.GRPCError2DtmError(err)
if err != nil && !errors.Is(err, dtmcli.ErrFailure) {
return err
}
err = wf.processPhase2(err)
func (wf *Workflow) process(handler WfFunc2, data []byte) (res []byte, err error) {
reply, err2 := wf.getProgress()
if err2 != nil {
return nil, err2
}
status := reply.Transaction.Status
if status == dtmcli.StatusSucceed {
return base64.StdEncoding.DecodeString(reply.Transaction.Result)
} else if status == dtmcli.StatusFailed {
return nil, dtmcli.ErrorMessage2Error(reply.Transaction.RollbackReason, dtmcli.ErrFailure)
}
wf.initProgress(reply.Progresses)
res, err = handler(wf, data)
err = wf.Options.GRPCError2DtmError(err)
if err != nil && !errors.Is(err, dtmcli.ErrFailure) {
return
}
err = wf.processPhase2(err)
if err == nil || errors.Is(err, dtmcli.ErrFailure) {
err1 := wf.submit(err)
err1 := wf.submit(res, err)
if err1 != nil {
return err1
return nil, err1
}
}
return err
return
}
func (wf *Workflow) saveResult(branchID string, op string, sr *stepResult) error {

27
client/workflow/rpc.go

@ -2,6 +2,7 @@ package workflow
import (
"context"
"encoding/base64"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -10,47 +11,43 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)
func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) {
func (wf *Workflow) getProgress() (*dtmgpb.DtmProgressesReply, error) {
if wf.Protocol == dtmimp.ProtocolGRPC {
var reply dtmgpb.DtmProgressesReply
err := dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/PrepareWorkflow",
dtmgimp.GetDtmRequest(wf.TransBase), &reply)
if err == nil {
return reply.Progresses, nil
}
return nil, err
return &reply, err
}
resp, err := dtmcli.GetRestyClient().R().SetBody(wf.TransBase).Post(wf.Dtm + "/prepareWorkflow")
var reply dtmgpb.DtmProgressesReply
if err == nil {
dtmimp.MustUnmarshal(resp.Body(), &reply)
}
return reply.Progresses, err
return &reply, err
}
func (wf *Workflow) submit(err error) error {
func (wf *Workflow) submit(result []byte, err error) error {
status := wfErrorToStatus(err)
reason := ""
if err != nil {
reason = err.Error()
}
extra := map[string]string{
"status": status,
"rollback_reason": reason,
"result": base64.StdEncoding.EncodeToString(result),
}
if wf.Protocol == dtmimp.ProtocolHTTP {
m := map[string]interface{}{
"gid": wf.Gid,
"trans_type": wf.TransType,
"req_extra": map[string]string{
"status": status,
"rollback_reason": reason,
},
"req_extra": extra,
}
_, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit")
return err
}
req := dtmgimp.GetDtmRequest(wf.TransBase)
req.ReqExtra = map[string]string{
"status": status,
"rollback_reason": reason,
}
req.ReqExtra = extra
reply := emptypb.Empty{}
return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply)
}

2
client/workflow/server.go

@ -21,6 +21,6 @@ func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*e
return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first")
}
tb := dtmgimp.TransBaseFromGrpc(ctx)
err := defaultFac.execute(tb.Op, tb.Gid, wd.Data)
_, err := defaultFac.execute(tb.Op, tb.Gid, wd.Data)
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
}

23
client/workflow/workflow.go

@ -43,6 +43,13 @@ func SetProtocolForTest(protocol string) {
// Register will register a workflow with the specified name
func Register(name string, handler WfFunc, custom ...func(wf *Workflow)) error {
return defaultFac.register(name, func(wf *Workflow, data []byte) ([]byte, error) {
return nil, handler(wf, data)
}, custom...)
}
// Register2 is the same as Register, but workflow func can return result
func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error {
return defaultFac.register(name, handler, custom...)
}
@ -50,12 +57,21 @@ func Register(name string, handler WfFunc, custom ...func(wf *Workflow)) error {
// if the workflow with the gid does not exist, then create a new workflow and execute it
// if the workflow with the gid exists, resume to execute it
func Execute(name string, gid string, data []byte) error {
_, err := defaultFac.execute(name, gid, data)
return err
}
// Execute2 is the same as Execute, but workflow func can return result
func Execute2(name string, gid string, data []byte) ([]byte, error) {
return defaultFac.execute(name, gid, data)
}
// ExecuteByQS is like Execute, but name and gid will be obtained from qs
func ExecuteByQS(qs url.Values, body []byte) error {
return defaultFac.executeByQS(qs, body)
name := qs.Get("op")
gid := qs.Get("gid")
_, err := defaultFac.execute(name, gid, body)
return err
}
// Options is for specifying workflow options
@ -83,13 +99,16 @@ type Workflow struct {
}
type wfItem struct {
fn WfFunc
fn WfFunc2
custom []func(*Workflow)
}
// WfFunc is the type for workflow function
type WfFunc func(wf *Workflow, data []byte) error
// WfFunc2 is the type for workflow function with return value
type WfFunc2 func(wf *Workflow, data []byte) ([]byte, error)
// WfPhase2Func is the type for phase 2 function
// param bb is a BranchBarrier, which is introduced by http://d.dtm.pub/practice/barrier.html
type WfPhase2Func func(bb *dtmcli.BranchBarrier) error

4
client/workflow/workflow_test.go

@ -10,10 +10,10 @@ import (
func TestAbnormal(t *testing.T) {
fname := dtmimp.GetFuncName()
err := defaultFac.execute(fname, fname, nil)
_, err := defaultFac.execute(fname, fname, nil)
assert.Error(t, err)
err = defaultFac.register(fname, func(wf *Workflow, data []byte) error { return nil })
err = defaultFac.register(fname, func(wf *Workflow, data []byte) ([]byte, error) { return nil, nil })
assert.Nil(t, err)
err = defaultFac.register(fname, nil)
assert.Error(t, err)

2
dtmsvr/api.go

@ -22,7 +22,7 @@ var Version = ""
func svcSubmit(t *TransGlobal) interface{} {
if t.TransType == "workflow" {
t.Status = dtmcli.StatusPrepared
t.changeStatus(t.ReqExtra["status"], withRollbackReason(t.ReqExtra["rollback_reason"]))
t.changeStatus(t.ReqExtra["status"], withRollbackReason(t.ReqExtra["rollback_reason"]), withResult(t.ReqExtra["result"]))
return nil
}
t.Status = dtmcli.StatusSubmitted

1
dtmsvr/storage/trans.go

@ -32,6 +32,7 @@ type TransGlobalStore struct {
Protocol string `json:"protocol,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
Result string `json:"result,omitempty"`
RollbackReason string `json:"rollback_reason,omitempty"`
Options string `json:"options,omitempty"`
CustomData string `json:"custom_data,omitempty"`

12
dtmsvr/trans_status.go

@ -38,7 +38,9 @@ func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
type changeStatusParams struct {
rollbackReason string
result string
}
type changeStatusOption func(c *changeStatusParams)
func withRollbackReason(rollbackReason string) changeStatusOption {
@ -47,6 +49,12 @@ func withRollbackReason(rollbackReason string) changeStatusOption {
}
}
func withResult(result string) changeStatusOption {
return func(c *changeStatusParams) {
c.result = result
}
}
func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) {
statusParams := &changeStatusParams{}
for _, opt := range opts {
@ -65,6 +73,10 @@ func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) {
t.RollbackReason = statusParams.rollbackReason
updates = append(updates, "rollback_reason")
}
if statusParams.result != "" {
t.Result = statusParams.result
updates = append(updates, "result")
}
t.UpdateTime = &now
GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed)
logger.Infof("ChangeGlobalStatus to %s ok for %s", status, t.TransGlobalStore.String())

3
sqls/dtmsvr.storage.mysql.sql

@ -18,7 +18,8 @@ CREATE TABLE if not EXISTS dtm.trans_global (
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',
`ext_data` TEXT comment 'extended data for this trans',
`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
`result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`),

1
sqls/dtmsvr.storage.postgres.sql

@ -18,6 +18,7 @@ CREATE TABLE if not EXISTS trans_global (
next_cron_time timestamp(0) with time zone default null,
owner varchar(128) not null default '',
ext_data text,
result varchar(1024) DEFAULT '',
rollback_reason varchar(1024) DEFAULT '',
PRIMARY KEY (id),
CONSTRAINT gid UNIQUE (gid)

34
test/workflow_http_ret_test.go

@ -0,0 +1,34 @@
package test
import (
"testing"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/workflow"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestWorkflowRet(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
workflow.Register2(gid, func(wf *workflow.Workflow, data []byte) ([]byte, error) {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
return []byte("result of workflow"), err
})
ret, err := workflow.Execute2(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
assert.Equal(t, "result of workflow", string(ret))
assert.Equal(t, StatusSucceed, getTransStatus(gid))
// the second execute will return result directly
ret, err = workflow.Execute2(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
assert.Equal(t, "result of workflow", string(ret))
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
Loading…
Cancel
Save