mirror of https://github.com/dtm-labs/dtm.git
committed by
GitHub
78 changed files with 2324 additions and 368 deletions
@ -0,0 +1,62 @@ |
|||
package dtmcli |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"net/http" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/go-resty/resty/v2" |
|||
) |
|||
|
|||
// MustGenGid generate a new gid
|
|||
func MustGenGid(server string) string { |
|||
res := map[string]string{} |
|||
resp, err := dtmimp.RestyClient.R().SetResult(&res).Get(server + "/newGid") |
|||
if err != nil || res["gid"] == "" { |
|||
panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp)) |
|||
} |
|||
return res["gid"] |
|||
} |
|||
|
|||
// String2DtmError translate string to dtm error
|
|||
func String2DtmError(str string) error { |
|||
return map[string]error{ |
|||
ResultFailure: ErrFailure, |
|||
ResultOngoing: ErrOngoing, |
|||
ResultSuccess: nil, |
|||
"": nil, |
|||
}[str] |
|||
} |
|||
|
|||
// Result2HttpJSON return the http code and json result
|
|||
// if result is error, the return proper code, else return StatusOK
|
|||
func Result2HttpJSON(result interface{}) (code int, res interface{}) { |
|||
err, _ := result.(error) |
|||
if err == nil { |
|||
code = http.StatusOK |
|||
res = result |
|||
} else { |
|||
res = map[string]string{ |
|||
"error": err.Error(), |
|||
} |
|||
if errors.Is(err, ErrFailure) { |
|||
code = http.StatusConflict |
|||
} else if errors.Is(err, ErrOngoing) { |
|||
code = http.StatusTooEarly |
|||
} else if err != nil { |
|||
code = http.StatusInternalServerError |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
// IsRollback returns whether the result is indicating rollback
|
|||
func IsRollback(resp *resty.Response, err error) bool { |
|||
return err == ErrFailure || dtmimp.RespAsErrorCompatible(resp) == ErrFailure |
|||
} |
|||
|
|||
// IsOngoing returns whether the result is indicating ongoing
|
|||
func IsOngoing(resp *resty.Response, err error) bool { |
|||
return err == ErrOngoing || dtmimp.RespAsErrorCompatible(resp) == ErrOngoing |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"bytes" |
|||
"io" |
|||
) |
|||
|
|||
// NewRespBodyFromBytes creates an io.ReadCloser from a byte slice
|
|||
// that is suitable for use as an http response body.
|
|||
func NewRespBodyFromBytes(body []byte) io.ReadCloser { |
|||
return &dummyReadCloser{body: bytes.NewReader(body)} |
|||
} |
|||
|
|||
type dummyReadCloser struct { |
|||
body io.ReadSeeker |
|||
} |
|||
|
|||
func (d *dummyReadCloser) Read(p []byte) (n int, err error) { |
|||
return d.body.Read(p) |
|||
} |
|||
|
|||
func (d *dummyReadCloser) Close() error { |
|||
_, _ = d.body.Seek(0, io.SeekEnd) |
|||
return nil |
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net/url" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli/logger" |
|||
) |
|||
|
|||
type workflowFactory struct { |
|||
protocol string |
|||
httpDtm string |
|||
httpCallback string |
|||
grpcDtm string |
|||
grpcCallback string |
|||
handlers map[string]*wfItem |
|||
} |
|||
|
|||
var defaultFac = workflowFactory{ |
|||
handlers: map[string]*wfItem{}, |
|||
} |
|||
|
|||
func (w *workflowFactory) execute(name string, gid string, data []byte) error { |
|||
handler := w.handlers[name] |
|||
if handler == nil { |
|||
return fmt.Errorf("workflow '%s' not registered. please register at startup", name) |
|||
} |
|||
wf := w.newWorkflow(name, gid, data) |
|||
for _, fn := range handler.custom { |
|||
fn(wf) |
|||
} |
|||
return wf.process(handler.fn, data) |
|||
} |
|||
|
|||
func (w *workflowFactory) executeByQS(qs url.Values, body []byte) error { |
|||
name := qs.Get("op") |
|||
gid := qs.Get("gid") |
|||
return w.execute(name, gid, body) |
|||
} |
|||
|
|||
func (w *workflowFactory) register(name string, handler WfFunc, custom ...func(wf *Workflow)) error { |
|||
e := w.handlers[name] |
|||
if e != nil { |
|||
return fmt.Errorf("a handler already exists for %s", name) |
|||
} |
|||
logger.Debugf("workflow '%s' registered.", name) |
|||
w.handlers[name] = &wfItem{ |
|||
fn: handler, |
|||
custom: custom, |
|||
} |
|||
return nil |
|||
} |
|||
@ -0,0 +1,200 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"fmt" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmcli/logger" |
|||
"github.com/dtm-labs/dtm/dtmgrpc" |
|||
"github.com/go-resty/resty/v2" |
|||
) |
|||
|
|||
type workflowImp struct { |
|||
restyClient *resty.Client //nolint
|
|||
idGen dtmimp.BranchIDGen |
|||
currentBranch string //nolint
|
|||
currentActionAdded bool //nolint
|
|||
currentCommitAdded bool //nolint
|
|||
currentRollbackAdded bool //nolint
|
|||
currentRollbackItem *workflowPhase2Item // nolint
|
|||
progresses map[string]*stepResult //nolint
|
|||
currentOp string |
|||
succeededOps []workflowPhase2Item |
|||
failedOps []workflowPhase2Item |
|||
} |
|||
|
|||
type workflowPhase2Item struct { |
|||
branchID, op string |
|||
fn WfPhase2Func |
|||
} |
|||
|
|||
func (wf *Workflow) loadProgresses() error { |
|||
progresses, err := wf.getProgress() |
|||
if err == nil { |
|||
wf.progresses = map[string]*stepResult{} |
|||
for _, p := range progresses { |
|||
sr := &stepResult{ |
|||
Status: p.Status, |
|||
Data: p.BinData, |
|||
} |
|||
if sr.Status == dtmcli.StatusFailed { |
|||
sr.Error = fmt.Errorf("%s. %w", string(p.BinData), dtmcli.ErrFailure) |
|||
} |
|||
wf.progresses[p.BranchID+"-"+p.Op] = sr |
|||
} |
|||
} |
|||
return err |
|||
} |
|||
|
|||
type wfMeta struct{} |
|||
|
|||
func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Workflow { |
|||
wf := &Workflow{ |
|||
TransBase: dtmimp.NewTransBase(gid, "workflow", "not inited", ""), |
|||
Name: name, |
|||
workflowImp: workflowImp{ |
|||
idGen: dtmimp.BranchIDGen{}, |
|||
succeededOps: []workflowPhase2Item{}, |
|||
failedOps: []workflowPhase2Item{}, |
|||
currentOp: dtmimp.OpAction, |
|||
}, |
|||
} |
|||
wf.Protocol = w.protocol |
|||
if w.protocol == dtmimp.ProtocolGRPC { |
|||
wf.Dtm = w.grpcDtm |
|||
wf.QueryPrepared = w.grpcCallback |
|||
} else { |
|||
wf.Dtm = w.httpDtm |
|||
wf.QueryPrepared = w.httpCallback |
|||
} |
|||
wf.CustomData = dtmimp.MustMarshalString(map[string]interface{}{ |
|||
"name": wf.Name, |
|||
"data": data, |
|||
}) |
|||
wf.Context = context.WithValue(wf.Context, wfMeta{}, wf) |
|||
wf.Options.HTTPResp2DtmError = HTTPResp2DtmError |
|||
wf.Options.GRPCError2DtmError = dtmgrpc.GrpcError2DtmError |
|||
wf.initRestyClient() |
|||
return wf |
|||
} |
|||
|
|||
func (wf *Workflow) initRestyClient() { |
|||
wf.restyClient = resty.New() |
|||
wf.restyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { |
|||
r.SetQueryParams(map[string]string{ |
|||
"gid": wf.Gid, |
|||
"trans_type": wf.TransType, |
|||
"branch_id": wf.currentBranch, |
|||
"op": dtmimp.OpAction, |
|||
}) |
|||
err := dtmimp.BeforeRequest(c, r) |
|||
return err |
|||
}) |
|||
old := wf.restyClient.GetClient().Transport |
|||
wf.restyClient.GetClient().Transport = newRoundTripper(old, wf) |
|||
wf.restyClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { |
|||
return dtmimp.AfterResponse(c, r) |
|||
}) |
|||
} |
|||
|
|||
func (wf *Workflow) process(handler WfFunc, data []byte) (err error) { |
|||
err = wf.loadProgresses() |
|||
if err == nil { |
|||
err = handler(wf, data) |
|||
err = dtmgrpc.GrpcError2DtmError(err) |
|||
if err != nil && !errors.Is(err, dtmcli.ErrFailure) { |
|||
return err |
|||
} |
|||
err = wf.processPhase2(err) |
|||
} |
|||
if err == nil || errors.Is(err, dtmcli.ErrFailure) { |
|||
err1 := wf.submit(wfErrorToStatus(err)) |
|||
if err1 != nil { |
|||
return err1 |
|||
} |
|||
} |
|||
return err |
|||
|
|||
} |
|||
|
|||
func (wf *Workflow) saveResult(branchID string, op string, sr *stepResult) error { |
|||
if sr.Status != "" { |
|||
err := wf.registerBranch(sr.Data, branchID, op, sr.Status) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
return sr.Error |
|||
} |
|||
|
|||
func (wf *Workflow) processPhase2(err error) error { |
|||
ops := wf.succeededOps |
|||
if err == nil { |
|||
wf.currentOp = dtmimp.OpCommit |
|||
} else { |
|||
wf.currentOp = dtmimp.OpRollback |
|||
ops = wf.failedOps |
|||
} |
|||
for i := len(ops) - 1; i >= 0; i-- { |
|||
op := ops[i] |
|||
|
|||
err1 := wf.callPhase2(op.branchID, op.fn) |
|||
if err1 != nil { |
|||
return err1 |
|||
} |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (wf *Workflow) callPhase2(branchID string, fn WfPhase2Func) error { |
|||
wf.currentBranch = branchID |
|||
r := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { |
|||
err := fn(bb) |
|||
dtmimp.PanicIf(errors.Is(err, dtmcli.ErrFailure), errors.New("should not return ErrFail in phase2")) |
|||
return wf.stepResultFromLocal(nil, err) |
|||
}) |
|||
_, err := wf.stepResultToLocal(r) |
|||
return err |
|||
} |
|||
|
|||
func (wf *Workflow) recordedDo(fn func(bb *dtmcli.BranchBarrier) *stepResult) *stepResult { |
|||
sr := wf.recordedDoInner(fn) |
|||
if wf.currentRollbackItem != nil && (sr.Status == dtmcli.StatusSucceed || sr.Status == dtmcli.StatusFailed && wf.Options.CompensateErrorBranch) { |
|||
wf.failedOps = append(wf.failedOps, *wf.currentRollbackItem) |
|||
} |
|||
wf.currentRollbackItem = nil |
|||
return sr |
|||
} |
|||
|
|||
func (wf *Workflow) recordedDoInner(fn func(bb *dtmcli.BranchBarrier) *stepResult) *stepResult { |
|||
branchID := wf.currentBranch |
|||
if wf.currentOp == dtmimp.OpAction { |
|||
dtmimp.PanicIf(wf.currentActionAdded, fmt.Errorf("one branch can have only on action")) |
|||
wf.currentActionAdded = true |
|||
} |
|||
r := wf.getStepResult() |
|||
if r != nil { |
|||
logger.Debugf("progress restored: %s %s %v %s %s", branchID, wf.currentOp, r.Error, r.Status, r.Data) |
|||
return r |
|||
} |
|||
bb := &dtmcli.BranchBarrier{ |
|||
TransType: wf.TransType, |
|||
Gid: wf.Gid, |
|||
BranchID: branchID, |
|||
Op: wf.currentOp, |
|||
} |
|||
r = fn(bb) |
|||
err := wf.saveResult(branchID, wf.currentOp, r) |
|||
if err != nil { |
|||
r = wf.stepResultFromLocal(nil, err) |
|||
} |
|||
return r |
|||
} |
|||
|
|||
func (wf *Workflow) getStepResult() *stepResult { |
|||
logger.Debugf("getStepResult: %s %v", wf.currentBranch+"-"+wf.currentOp, wf.progresses[wf.currentBranch+"-"+wf.currentOp]) |
|||
return wf.progresses[wf.currentBranch+"-"+wf.currentOp] |
|||
} |
|||
@ -0,0 +1,67 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"context" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" |
|||
"google.golang.org/protobuf/types/known/emptypb" |
|||
) |
|||
|
|||
func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) { |
|||
if wf.Protocol == dtmimp.ProtocolGRPC { |
|||
var reply dtmgpb.DtmProgressesReply |
|||
err := dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/PrepareWorkflow", |
|||
dtmgimp.GetDtmRequest(wf.TransBase), &reply) |
|||
if err == nil { |
|||
return reply.Progresses, nil |
|||
} |
|||
return nil, err |
|||
} |
|||
resp, err := dtmimp.RestyClient.R().SetBody(wf.TransBase).Post(wf.Dtm + "/prepareWorkflow") |
|||
var progresses []*dtmgpb.DtmProgress |
|||
if err == nil { |
|||
dtmimp.MustUnmarshal(resp.Body(), &progresses) |
|||
} |
|||
return progresses, err |
|||
} |
|||
|
|||
func (wf *Workflow) submit(status string) error { |
|||
if wf.Protocol == dtmimp.ProtocolHTTP { |
|||
m := map[string]interface{}{ |
|||
"gid": wf.Gid, |
|||
"trans_type": wf.TransType, |
|||
"req_extra": map[string]string{ |
|||
"status": status, |
|||
}, |
|||
} |
|||
_, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit") |
|||
return err |
|||
} |
|||
req := dtmgimp.GetDtmRequest(wf.TransBase) |
|||
req.ReqExtra = map[string]string{ |
|||
"status": status, |
|||
} |
|||
reply := emptypb.Empty{} |
|||
return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply) |
|||
} |
|||
|
|||
func (wf *Workflow) registerBranch(res []byte, branchID string, op string, status string) error { |
|||
if wf.Protocol == dtmimp.ProtocolHTTP { |
|||
return dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{ |
|||
"data": string(res), |
|||
"branch_id": branchID, |
|||
"op": op, |
|||
"status": status, |
|||
}, "registerBranch") |
|||
} |
|||
_, err := dtmgimp.MustGetDtmClient(wf.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{ |
|||
Gid: wf.Gid, |
|||
TransType: wf.TransType, |
|||
BranchID: branchID, |
|||
BusiPayload: res, |
|||
Data: map[string]string{"status": status, "op": op}, |
|||
}) |
|||
return err |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"context" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb" |
|||
"google.golang.org/grpc/codes" |
|||
"google.golang.org/grpc/status" |
|||
"google.golang.org/protobuf/types/known/emptypb" |
|||
) |
|||
|
|||
type workflowServer struct { |
|||
wfpb.UnimplementedWorkflowServer |
|||
} |
|||
|
|||
func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*emptypb.Empty, error) { |
|||
if defaultFac.protocol != dtmimp.ProtocolGRPC { |
|||
return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first") |
|||
} |
|||
tb := dtmgimp.TransBaseFromGrpc(ctx) |
|||
err := defaultFac.execute(tb.Op, tb.Gid, wd.Data) |
|||
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err) |
|||
} |
|||
@ -0,0 +1,129 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"bytes" |
|||
"errors" |
|||
"fmt" |
|||
"io/ioutil" |
|||
"net/http" |
|||
"strconv" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"google.golang.org/protobuf/reflect/protoreflect" |
|||
) |
|||
|
|||
func wfErrorToStatus(err error) string { |
|||
if err == nil { |
|||
return dtmcli.StatusSucceed |
|||
} else if errors.Is(err, dtmcli.ErrFailure) { |
|||
return dtmcli.StatusFailed |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
type stepResult struct { |
|||
Error error // if Error != nil || Status == "", result will not be saved
|
|||
Status string // succeed | failed | ""
|
|||
// if status == succeed, data is the result.
|
|||
// if status == failed, data is the error message
|
|||
Data []byte |
|||
} |
|||
|
|||
type roundTripper struct { |
|||
old http.RoundTripper |
|||
wf *Workflow |
|||
} |
|||
|
|||
func newJSONResponse(status int, result []byte) *http.Response { |
|||
return &http.Response{ |
|||
Status: strconv.Itoa(status), |
|||
StatusCode: status, |
|||
Body: NewRespBodyFromBytes(result), |
|||
Header: http.Header{ |
|||
"Content-Type": []string{"application/json"}, |
|||
}, |
|||
ContentLength: -1, |
|||
} |
|||
} |
|||
|
|||
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { |
|||
wf := r.wf |
|||
origin := func(bb *dtmcli.BranchBarrier) *stepResult { |
|||
resp, err := r.old.RoundTrip(req) |
|||
return wf.stepResultFromHTTP(resp, err) |
|||
} |
|||
var sr *stepResult |
|||
if wf.currentOp != dtmimp.OpAction { // in phase 2, do not save, because it is saved outer
|
|||
sr = origin(nil) |
|||
} else { |
|||
sr = wf.recordedDo(origin) |
|||
} |
|||
return wf.stepResultToHTTP(sr) |
|||
} |
|||
|
|||
func newRoundTripper(old http.RoundTripper, wf *Workflow) http.RoundTripper { |
|||
return &roundTripper{old: old, wf: wf} |
|||
} |
|||
|
|||
// HTTPResp2DtmError check for dtm error and return it
|
|||
func HTTPResp2DtmError(resp *http.Response) ([]byte, error) { |
|||
code := resp.StatusCode |
|||
data, err := ioutil.ReadAll(resp.Body) |
|||
resp.Body = ioutil.NopCloser(bytes.NewBuffer(data)) |
|||
if code == http.StatusTooEarly { |
|||
return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrOngoing) |
|||
} else if code == http.StatusConflict { |
|||
return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrFailure) |
|||
} else if err == nil && code != http.StatusOK { |
|||
return data, errors.New(string(data)) |
|||
} |
|||
return data, err |
|||
} |
|||
|
|||
func (wf *Workflow) stepResultFromLocal(data []byte, err error) *stepResult { |
|||
return &stepResult{ |
|||
Error: err, |
|||
Status: wfErrorToStatus(err), |
|||
Data: data, |
|||
} |
|||
} |
|||
|
|||
func (wf *Workflow) stepResultToLocal(sr *stepResult) ([]byte, error) { |
|||
return sr.Data, sr.Error |
|||
} |
|||
|
|||
func (wf *Workflow) stepResultFromGrpc(reply interface{}, err error) *stepResult { |
|||
sr := &stepResult{Error: wf.Options.GRPCError2DtmError(err)} |
|||
sr.Status = wfErrorToStatus(sr.Error) |
|||
if sr.Error == nil { |
|||
sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage)) |
|||
} else if sr.Status == dtmcli.StatusFailed { |
|||
sr.Data = []byte(sr.Error.Error()) |
|||
} |
|||
return sr |
|||
} |
|||
|
|||
func (wf *Workflow) stepResultToGrpc(s *stepResult, reply interface{}) error { |
|||
if s.Error == nil && s.Status == dtmcli.StatusSucceed { |
|||
dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage)) |
|||
} |
|||
return s.Error |
|||
} |
|||
|
|||
func (wf *Workflow) stepResultFromHTTP(resp *http.Response, err error) *stepResult { |
|||
sr := &stepResult{Error: err} |
|||
if err == nil { |
|||
sr.Data, sr.Error = wf.Options.HTTPResp2DtmError(resp) |
|||
sr.Status = wfErrorToStatus(sr.Error) |
|||
} |
|||
return sr |
|||
} |
|||
|
|||
func (wf *Workflow) stepResultToHTTP(s *stepResult) (*http.Response, error) { |
|||
if s.Error != nil { |
|||
return nil, s.Error |
|||
} |
|||
return newJSONResponse(200, s.Data), nil |
|||
} |
|||
@ -0,0 +1,153 @@ |
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
|||
// versions:
|
|||
// protoc-gen-go v1.28.0
|
|||
// protoc v3.17.3
|
|||
// source: dtmgrpc/workflow/wfpb/wf.proto
|
|||
|
|||
package wfpb |
|||
|
|||
import ( |
|||
protoreflect "google.golang.org/protobuf/reflect/protoreflect" |
|||
protoimpl "google.golang.org/protobuf/runtime/protoimpl" |
|||
emptypb "google.golang.org/protobuf/types/known/emptypb" |
|||
reflect "reflect" |
|||
sync "sync" |
|||
) |
|||
|
|||
const ( |
|||
// Verify that this generated code is sufficiently up-to-date.
|
|||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) |
|||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
|||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) |
|||
) |
|||
|
|||
type WorkflowData struct { |
|||
state protoimpl.MessageState |
|||
sizeCache protoimpl.SizeCache |
|||
unknownFields protoimpl.UnknownFields |
|||
|
|||
Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"` |
|||
} |
|||
|
|||
func (x *WorkflowData) Reset() { |
|||
*x = WorkflowData{} |
|||
if protoimpl.UnsafeEnabled { |
|||
mi := &file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0] |
|||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
|||
ms.StoreMessageInfo(mi) |
|||
} |
|||
} |
|||
|
|||
func (x *WorkflowData) String() string { |
|||
return protoimpl.X.MessageStringOf(x) |
|||
} |
|||
|
|||
func (*WorkflowData) ProtoMessage() {} |
|||
|
|||
func (x *WorkflowData) ProtoReflect() protoreflect.Message { |
|||
mi := &file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0] |
|||
if protoimpl.UnsafeEnabled && x != nil { |
|||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
|||
if ms.LoadMessageInfo() == nil { |
|||
ms.StoreMessageInfo(mi) |
|||
} |
|||
return ms |
|||
} |
|||
return mi.MessageOf(x) |
|||
} |
|||
|
|||
// Deprecated: Use WorkflowData.ProtoReflect.Descriptor instead.
|
|||
func (*WorkflowData) Descriptor() ([]byte, []int) { |
|||
return file_dtmgrpc_workflow_wfpb_wf_proto_rawDescGZIP(), []int{0} |
|||
} |
|||
|
|||
func (x *WorkflowData) GetData() []byte { |
|||
if x != nil { |
|||
return x.Data |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
var File_dtmgrpc_workflow_wfpb_wf_proto protoreflect.FileDescriptor |
|||
|
|||
var file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc = []byte{ |
|||
0x0a, 0x1e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, |
|||
0x6f, 0x77, 0x2f, 0x77, 0x66, 0x70, 0x62, 0x2f, 0x77, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, |
|||
0x12, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, |
|||
0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, |
|||
0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x22, 0x0a, 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x66, |
|||
0x6c, 0x6f, 0x77, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, |
|||
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x32, 0x47, 0x0a, 0x08, 0x57, |
|||
0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x3b, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, |
|||
0x74, 0x65, 0x12, 0x16, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x57, 0x6f, |
|||
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, |
|||
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, |
|||
0x74, 0x79, 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x77, 0x66, 0x70, 0x62, 0x62, 0x06, |
|||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, |
|||
} |
|||
|
|||
var ( |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescOnce sync.Once |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData = file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc |
|||
) |
|||
|
|||
func file_dtmgrpc_workflow_wfpb_wf_proto_rawDescGZIP() []byte { |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescOnce.Do(func() { |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData = protoimpl.X.CompressGZIP(file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData) |
|||
}) |
|||
return file_dtmgrpc_workflow_wfpb_wf_proto_rawDescData |
|||
} |
|||
|
|||
var file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes = make([]protoimpl.MessageInfo, 1) |
|||
var file_dtmgrpc_workflow_wfpb_wf_proto_goTypes = []interface{}{ |
|||
(*WorkflowData)(nil), // 0: workflow.WorkflowData
|
|||
(*emptypb.Empty)(nil), // 1: google.protobuf.Empty
|
|||
} |
|||
var file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs = []int32{ |
|||
0, // 0: workflow.Workflow.Execute:input_type -> workflow.WorkflowData
|
|||
1, // 1: workflow.Workflow.Execute:output_type -> google.protobuf.Empty
|
|||
1, // [1:2] is the sub-list for method output_type
|
|||
0, // [0:1] is the sub-list for method input_type
|
|||
0, // [0:0] is the sub-list for extension type_name
|
|||
0, // [0:0] is the sub-list for extension extendee
|
|||
0, // [0:0] is the sub-list for field type_name
|
|||
} |
|||
|
|||
func init() { file_dtmgrpc_workflow_wfpb_wf_proto_init() } |
|||
func file_dtmgrpc_workflow_wfpb_wf_proto_init() { |
|||
if File_dtmgrpc_workflow_wfpb_wf_proto != nil { |
|||
return |
|||
} |
|||
if !protoimpl.UnsafeEnabled { |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { |
|||
switch v := v.(*WorkflowData); i { |
|||
case 0: |
|||
return &v.state |
|||
case 1: |
|||
return &v.sizeCache |
|||
case 2: |
|||
return &v.unknownFields |
|||
default: |
|||
return nil |
|||
} |
|||
} |
|||
} |
|||
type x struct{} |
|||
out := protoimpl.TypeBuilder{ |
|||
File: protoimpl.DescBuilder{ |
|||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), |
|||
RawDescriptor: file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc, |
|||
NumEnums: 0, |
|||
NumMessages: 1, |
|||
NumExtensions: 0, |
|||
NumServices: 1, |
|||
}, |
|||
GoTypes: file_dtmgrpc_workflow_wfpb_wf_proto_goTypes, |
|||
DependencyIndexes: file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs, |
|||
MessageInfos: file_dtmgrpc_workflow_wfpb_wf_proto_msgTypes, |
|||
}.Build() |
|||
File_dtmgrpc_workflow_wfpb_wf_proto = out.File |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_rawDesc = nil |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_goTypes = nil |
|||
file_dtmgrpc_workflow_wfpb_wf_proto_depIdxs = nil |
|||
} |
|||
@ -0,0 +1,15 @@ |
|||
syntax = "proto3"; |
|||
|
|||
option go_package = "./wfpb"; |
|||
import "google/protobuf/empty.proto"; |
|||
|
|||
package workflow; |
|||
|
|||
// The Workflow service definition. |
|||
service Workflow { |
|||
rpc Execute(WorkflowData) returns (google.protobuf.Empty) {} |
|||
} |
|||
|
|||
message WorkflowData { |
|||
bytes Data = 1; |
|||
} |
|||
@ -0,0 +1,106 @@ |
|||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
|||
// versions:
|
|||
// - protoc-gen-go-grpc v1.2.0
|
|||
// - protoc v3.17.3
|
|||
// source: dtmgrpc/workflow/wfpb/wf.proto
|
|||
|
|||
package wfpb |
|||
|
|||
import ( |
|||
context "context" |
|||
grpc "google.golang.org/grpc" |
|||
codes "google.golang.org/grpc/codes" |
|||
status "google.golang.org/grpc/status" |
|||
emptypb "google.golang.org/protobuf/types/known/emptypb" |
|||
) |
|||
|
|||
// This is a compile-time assertion to ensure that this generated file
|
|||
// is compatible with the grpc package it is being compiled against.
|
|||
// Requires gRPC-Go v1.32.0 or later.
|
|||
const _ = grpc.SupportPackageIsVersion7 |
|||
|
|||
// WorkflowClient is the client API for Workflow service.
|
|||
//
|
|||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
|||
type WorkflowClient interface { |
|||
Execute(ctx context.Context, in *WorkflowData, opts ...grpc.CallOption) (*emptypb.Empty, error) |
|||
} |
|||
|
|||
type workflowClient struct { |
|||
cc grpc.ClientConnInterface |
|||
} |
|||
|
|||
func NewWorkflowClient(cc grpc.ClientConnInterface) WorkflowClient { |
|||
return &workflowClient{cc} |
|||
} |
|||
|
|||
func (c *workflowClient) Execute(ctx context.Context, in *WorkflowData, opts ...grpc.CallOption) (*emptypb.Empty, error) { |
|||
out := new(emptypb.Empty) |
|||
err := c.cc.Invoke(ctx, "/workflow.Workflow/Execute", in, out, opts...) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return out, nil |
|||
} |
|||
|
|||
// WorkflowServer is the server API for Workflow service.
|
|||
// All implementations must embed UnimplementedWorkflowServer
|
|||
// for forward compatibility
|
|||
type WorkflowServer interface { |
|||
Execute(context.Context, *WorkflowData) (*emptypb.Empty, error) |
|||
mustEmbedUnimplementedWorkflowServer() |
|||
} |
|||
|
|||
// UnimplementedWorkflowServer must be embedded to have forward compatible implementations.
|
|||
type UnimplementedWorkflowServer struct { |
|||
} |
|||
|
|||
func (UnimplementedWorkflowServer) Execute(context.Context, *WorkflowData) (*emptypb.Empty, error) { |
|||
return nil, status.Errorf(codes.Unimplemented, "method Execute not implemented") |
|||
} |
|||
func (UnimplementedWorkflowServer) mustEmbedUnimplementedWorkflowServer() {} |
|||
|
|||
// UnsafeWorkflowServer may be embedded to opt out of forward compatibility for this service.
|
|||
// Use of this interface is not recommended, as added methods to WorkflowServer will
|
|||
// result in compilation errors.
|
|||
type UnsafeWorkflowServer interface { |
|||
mustEmbedUnimplementedWorkflowServer() |
|||
} |
|||
|
|||
func RegisterWorkflowServer(s grpc.ServiceRegistrar, srv WorkflowServer) { |
|||
s.RegisterService(&Workflow_ServiceDesc, srv) |
|||
} |
|||
|
|||
func _Workflow_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
|||
in := new(WorkflowData) |
|||
if err := dec(in); err != nil { |
|||
return nil, err |
|||
} |
|||
if interceptor == nil { |
|||
return srv.(WorkflowServer).Execute(ctx, in) |
|||
} |
|||
info := &grpc.UnaryServerInfo{ |
|||
Server: srv, |
|||
FullMethod: "/workflow.Workflow/Execute", |
|||
} |
|||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
|||
return srv.(WorkflowServer).Execute(ctx, req.(*WorkflowData)) |
|||
} |
|||
return interceptor(ctx, in, info, handler) |
|||
} |
|||
|
|||
// Workflow_ServiceDesc is the grpc.ServiceDesc for Workflow service.
|
|||
// It's only intended for direct use with grpc.RegisterService,
|
|||
// and not to be introspected or modified (even as a copy)
|
|||
var Workflow_ServiceDesc = grpc.ServiceDesc{ |
|||
ServiceName: "workflow.Workflow", |
|||
HandlerType: (*WorkflowServer)(nil), |
|||
Methods: []grpc.MethodDesc{ |
|||
{ |
|||
MethodName: "Execute", |
|||
Handler: _Workflow_Execute_Handler, |
|||
}, |
|||
}, |
|||
Streams: []grpc.StreamDesc{}, |
|||
Metadata: "dtmgrpc/workflow/wfpb/wf.proto", |
|||
} |
|||
@ -0,0 +1,225 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"context" |
|||
"database/sql" |
|||
"fmt" |
|||
"net/http" |
|||
"net/url" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmcli/logger" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb" |
|||
"github.com/go-resty/resty/v2" |
|||
"google.golang.org/grpc" |
|||
) |
|||
|
|||
// InitHTTP will init Workflow engine to use http
|
|||
// param httpDtm specify the dtm address
|
|||
// param callback specify the url for dtm to callback if a workflow timeout
|
|||
func InitHTTP(httpDtm string, callback string) { |
|||
defaultFac.protocol = dtmimp.ProtocolHTTP |
|||
defaultFac.httpDtm = httpDtm |
|||
defaultFac.httpCallback = callback |
|||
} |
|||
|
|||
// InitGrpc will init Workflow engine to use grpc
|
|||
// param dtm specify the dtm address
|
|||
// param clientHost specify the client host for dtm to callback if a workflow timeout
|
|||
// param grpcServer specify the grpc server
|
|||
func InitGrpc(grpcDtm string, clientHost string, grpcServer *grpc.Server) { |
|||
defaultFac.protocol = dtmimp.ProtocolGRPC |
|||
defaultFac.grpcDtm = grpcDtm |
|||
wfpb.RegisterWorkflowServer(grpcServer, &workflowServer{}) |
|||
defaultFac.grpcCallback = clientHost + "/workflow.Workflow/Execute" |
|||
} |
|||
|
|||
// SetProtocolForTest change protocol directly. only used by test
|
|||
func SetProtocolForTest(protocol string) { |
|||
defaultFac.protocol = protocol |
|||
} |
|||
|
|||
// Register will register a workflow with the specified name
|
|||
func Register(name string, handler WfFunc, custom ...func(wf *Workflow)) error { |
|||
return defaultFac.register(name, handler, custom...) |
|||
} |
|||
|
|||
// Execute will execute a workflow with the gid and specified params
|
|||
// if the workflow with the gid does not exist, then create a new workflow and execute it
|
|||
// if the workflow with the gid exists, resume to execute it
|
|||
func Execute(name string, gid string, data []byte) error { |
|||
return defaultFac.execute(name, gid, data) |
|||
} |
|||
|
|||
// ExecuteByQS is like Execute, but name and gid will be obtained from qs
|
|||
func ExecuteByQS(qs url.Values, body []byte) error { |
|||
return defaultFac.executeByQS(qs, body) |
|||
} |
|||
|
|||
// Options is for specifying workflow options
|
|||
type Options struct { |
|||
|
|||
// Default: Code 409 => ErrFailure; Code 425 => ErrOngoing
|
|||
HTTPResp2DtmError func(*http.Response) ([]byte, error) |
|||
|
|||
// Default: Code Aborted => ErrFailure; Code FailedPrecondition => ErrOngoing
|
|||
GRPCError2DtmError func(error) error |
|||
|
|||
// This Option specify whether a branch returning ErrFailure should be compensated on rollback.
|
|||
// for most idempotent branches, no compensation is needed.
|
|||
// But for a timeout request, the caller cannot know where the request is successful, so the compensation should be called
|
|||
CompensateErrorBranch bool |
|||
} |
|||
|
|||
// Workflow is the type for a workflow
|
|||
type Workflow struct { |
|||
// The name of the workflow
|
|||
Name string |
|||
Options Options |
|||
*dtmimp.TransBase |
|||
workflowImp |
|||
} |
|||
|
|||
type wfItem struct { |
|||
fn WfFunc |
|||
custom []func(*Workflow) |
|||
} |
|||
|
|||
// WfFunc is the type for workflow function
|
|||
type WfFunc func(wf *Workflow, data []byte) error |
|||
|
|||
// WfPhase2Func is the type for phase 2 function
|
|||
// param bb is a BranchBarrier, which is introduced by http://d.dtm.pub/practice/barrier.html
|
|||
type WfPhase2Func func(bb *dtmcli.BranchBarrier) error |
|||
|
|||
// NewRequest return a new resty request, whose progress will be recorded
|
|||
func (wf *Workflow) NewRequest() *resty.Request { |
|||
return wf.restyClient.R().SetContext(wf.Context) |
|||
} |
|||
|
|||
// NewBranch will start a new branch transaction
|
|||
func (wf *Workflow) NewBranch() *Workflow { |
|||
dtmimp.PanicIf(wf.currentOp != dtmimp.OpAction, fmt.Errorf("should not call NewBranch() in Branch callbacks")) |
|||
wf.idGen.NewSubBranchID() |
|||
wf.currentBranch = wf.idGen.CurrentSubBranchID() |
|||
wf.currentActionAdded = false |
|||
wf.currentCommitAdded = false |
|||
wf.currentRollbackAdded = false |
|||
return wf |
|||
} |
|||
|
|||
// NewBranchCtx will call NewBranch and return a workflow context
|
|||
func (wf *Workflow) NewBranchCtx() context.Context { |
|||
return wf.NewBranch().Context |
|||
} |
|||
|
|||
// OnRollback will set the callback for current branch when rollback happen.
|
|||
// If you are writing a saga transaction, then you should write the compensation here
|
|||
// If you are writing a tcc transaction, then you should write the cancel operation here
|
|||
func (wf *Workflow) OnRollback(compensate WfPhase2Func) *Workflow { |
|||
branchID := wf.currentBranch |
|||
dtmimp.PanicIf(wf.currentRollbackAdded, fmt.Errorf("one branch can only add one rollback callback")) |
|||
wf.currentRollbackAdded = true |
|||
item := workflowPhase2Item{ |
|||
branchID: branchID, |
|||
op: dtmimp.OpRollback, |
|||
fn: compensate, |
|||
} |
|||
wf.currentRollbackItem = &item |
|||
return wf |
|||
} |
|||
|
|||
// OnCommit will will set the callback for current branch when commit happen.
|
|||
// If you are writing a tcc transaction, then you should write the confirm operation here
|
|||
func (wf *Workflow) OnCommit(fn WfPhase2Func) *Workflow { |
|||
branchID := wf.currentBranch |
|||
dtmimp.PanicIf(wf.currentCommitAdded, fmt.Errorf("one branch can only add one commit callback")) |
|||
wf.currentCommitAdded = true |
|||
wf.failedOps = append(wf.succeededOps, workflowPhase2Item{ |
|||
branchID: branchID, |
|||
op: dtmimp.OpCommit, |
|||
fn: fn, |
|||
}) |
|||
return wf |
|||
} |
|||
|
|||
// Do will do an action which will be recored
|
|||
func (wf *Workflow) Do(fn func(bb *dtmcli.BranchBarrier) ([]byte, error)) ([]byte, error) { |
|||
res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { |
|||
r, e := fn(bb) |
|||
return wf.stepResultFromLocal(r, e) |
|||
}) |
|||
return wf.stepResultToLocal(res) |
|||
} |
|||
|
|||
// DoXa will begin a local xa transaction
|
|||
// after the return of workflow function, xa commit/rollback will be called
|
|||
func (wf *Workflow) DoXa(dbConf dtmcli.DBConf, fn func(db *sql.DB) ([]byte, error)) ([]byte, error) { |
|||
branchID := wf.currentBranch |
|||
res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { |
|||
sBusi := "business" |
|||
k := bb.BranchID + "-" + sBusi |
|||
if wf.progresses[k] != nil { |
|||
return &stepResult{ |
|||
Error: fmt.Errorf("error occur at prepare, not resumable, to rollback. %w", dtmcli.ErrFailure), |
|||
} |
|||
} |
|||
sr := &stepResult{} |
|||
wf.TransBase.BranchID = branchID |
|||
wf.TransBase.Op = sBusi |
|||
err := dtmimp.XaHandleLocalTrans(wf.TransBase, dbConf, func(d *sql.DB) error { |
|||
r, e := fn(d) |
|||
sr.Data = r |
|||
if e == nil { |
|||
e = wf.saveResult(branchID, sBusi, &stepResult{Status: dtmcli.StatusSucceed}) |
|||
} |
|||
return e |
|||
}) |
|||
sr.Error = err |
|||
sr.Status = wfErrorToStatus(err) |
|||
return sr |
|||
}) |
|||
phase2 := func(bb *dtmcli.BranchBarrier) error { |
|||
return dtmimp.XaHandlePhase2(bb.Gid, dbConf, bb.BranchID, bb.Op) |
|||
} |
|||
wf.succeededOps = append(wf.succeededOps, workflowPhase2Item{ |
|||
branchID: branchID, |
|||
op: dtmimp.OpCommit, |
|||
fn: phase2, |
|||
}) |
|||
wf.failedOps = append(wf.failedOps, workflowPhase2Item{ |
|||
branchID: branchID, |
|||
op: dtmimp.OpRollback, |
|||
fn: phase2, |
|||
}) |
|||
return res.Data, res.Error |
|||
} |
|||
|
|||
// Interceptor is the middleware for workflow to capture grpc call result
|
|||
func Interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
|||
logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req)) |
|||
wf := ctx.Value(wfMeta{}).(*Workflow) |
|||
|
|||
origin := func() error { |
|||
ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm) |
|||
err := invoker(ctx1, method, req, reply, cc, opts...) |
|||
res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v", |
|||
cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err) |
|||
if err != nil { |
|||
logger.Errorf("%s", res) |
|||
} else { |
|||
logger.Debugf("%s", res) |
|||
} |
|||
return err |
|||
} |
|||
if wf.currentOp != dtmimp.OpAction { |
|||
return origin() |
|||
} |
|||
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { |
|||
err := origin() |
|||
return wf.stepResultFromGrpc(reply, err) |
|||
}) |
|||
return wf.stepResultToGrpc(sr, reply) |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
package workflow |
|||
|
|||
import ( |
|||
"context" |
|||
"testing" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestAbnormal(t *testing.T) { |
|||
fname := dtmimp.GetFuncName() |
|||
err := defaultFac.execute(fname, fname, nil) |
|||
assert.Error(t, err) |
|||
|
|||
err = defaultFac.register(fname, func(wf *Workflow, data []byte) error { return nil }) |
|||
assert.Nil(t, err) |
|||
err = defaultFac.register(fname, nil) |
|||
assert.Error(t, err) |
|||
|
|||
ws := &workflowServer{} |
|||
_, err = ws.Execute(context.Background(), nil) |
|||
assert.Contains(t, err.Error(), "call workflow.InitGrpc first") |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
package dtmsvr |
|||
|
|||
import ( |
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow/wfpb" |
|||
) |
|||
|
|||
type transWorkflowProcessor struct { |
|||
*TransGlobal |
|||
} |
|||
|
|||
func init() { |
|||
registorProcessorCreator("workflow", func(trans *TransGlobal) transProcessor { return &transWorkflowProcessor{TransGlobal: trans} }) |
|||
} |
|||
|
|||
func (t *transWorkflowProcessor) GenBranches() []TransBranch { |
|||
return []TransBranch{} |
|||
} |
|||
|
|||
type cWorkflowCustom struct { |
|||
Name string `json:"name"` |
|||
Data []byte `json:"data"` |
|||
} |
|||
|
|||
func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error { |
|||
if t.Status == dtmcli.StatusSubmitted { // client workflow finished
|
|||
t.changeStatus(dtmcli.StatusSucceed) |
|||
return nil |
|||
} else if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { |
|||
return nil |
|||
} |
|||
|
|||
cmc := cWorkflowCustom{} |
|||
dtmimp.MustUnmarshalString(t.CustomData, &cmc) |
|||
data := cmc.Data |
|||
if t.Protocol == dtmimp.ProtocolGRPC { |
|||
wd := wfpb.WorkflowData{Data: cmc.Data} |
|||
data = dtmgimp.MustProtoMarshal(&wd) |
|||
} |
|||
return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data) |
|||
} |
|||
@ -1,13 +1,15 @@ |
|||
set -x |
|||
echo "" > coverage.txt |
|||
for store in redis mysql boltdb postgres; do |
|||
echo "mode: count" > coverage.txt |
|||
for store in redis boltdb mysql postgres; do |
|||
for d in $(go list ./... | grep -v vendor); do |
|||
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 |
|||
TEST_STORE=$store go test -failfast -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/workflow,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 |
|||
if [ -f profile.out ]; then |
|||
cat profile.out >> coverage.txt |
|||
cat profile.out | grep -v 'mode:' >> coverage.txt |
|||
echo > profile.out |
|||
fi |
|||
done |
|||
done |
|||
|
|||
# go tool cover -html=coverage.txt |
|||
|
|||
curl -s https://codecov.io/bash | bash |
|||
|
|||
@ -0,0 +1,13 @@ |
|||
package busi |
|||
|
|||
import ( |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow" |
|||
"github.com/dtm-labs/dtm/dtmutil" |
|||
"google.golang.org/grpc" |
|||
) |
|||
|
|||
// WorkflowStarup 1
|
|||
func WorkflowStarup(server *grpc.Server) { |
|||
workflow.InitHTTP(dtmServer, Busi+"/workflow/resume") |
|||
workflow.InitGrpc(dtmutil.DefaultGrpcServer, BusiGrpc, server) |
|||
} |
|||
@ -1,31 +1,14 @@ |
|||
package busi |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmutil" |
|||
"github.com/gin-gonic/gin" |
|||
) |
|||
|
|||
// Startup startup the busi's grpc and http service
|
|||
func Startup() *gin.Engine { |
|||
GrpcStartup() |
|||
return BaseAppStartup() |
|||
} |
|||
|
|||
// PopulateDB populate example mysql data
|
|||
func PopulateDB(skipDrop bool) { |
|||
resetXaData() |
|||
file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) |
|||
dtmutil.RunSQLScript(BusiConf, file, skipDrop) |
|||
file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) |
|||
dtmutil.RunSQLScript(BusiConf, file, skipDrop) |
|||
file = fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver) |
|||
dtmutil.RunSQLScript(BusiConf, file, skipDrop) |
|||
_, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear
|
|||
dtmimp.E2P(err) |
|||
SetRedisBothAccount(10000, 10000) |
|||
SetupMongoBarrierAndBusi() |
|||
svr := GrpcStartup() |
|||
app := BaseAppStartup() |
|||
WorkflowStarup(svr) |
|||
go GrpcServe(svr) |
|||
return app |
|||
} |
|||
|
|||
@ -0,0 +1,43 @@ |
|||
/* |
|||
* Copyright (c) 2021 yedf. All rights reserved. |
|||
* Use of this source code is governed by a BSD-style |
|||
* license that can be found in the LICENSE file. |
|||
*/ |
|||
|
|||
package test |
|||
|
|||
import ( |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmsvr" |
|||
"github.com/dtm-labs/dtm/dtmsvr/storage" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestWorkflowBranchConflict(t *testing.T) { |
|||
gid := dtmimp.GetFuncName() |
|||
store := dtmsvr.GetStore() |
|||
now := time.Now() |
|||
g := &storage.TransGlobalStore{ |
|||
Gid: gid, |
|||
Status: dtmcli.StatusPrepared, |
|||
NextCronTime: &now, |
|||
} |
|||
err := store.MaySaveNewTrans(g, []storage.TransBranchStore{ |
|||
{ |
|||
BranchID: "00", |
|||
Op: dtmimp.OpAction, |
|||
}, |
|||
}) |
|||
assert.Nil(t, err) |
|||
err = dtmimp.CatchP(func() { |
|||
store.LockGlobalSaveBranches(gid, dtmcli.StatusPrepared, []storage.TransBranchStore{ |
|||
{BranchID: "00", Op: dtmimp.OpAction}, |
|||
}, -1) |
|||
}) |
|||
assert.Error(t, err) |
|||
store.ChangeGlobalStatus(g, StatusSucceed, []string{}, true) |
|||
} |
|||
@ -0,0 +1,131 @@ |
|||
/* |
|||
* Copyright (c) 2021 yedf. All rights reserved. |
|||
* Use of this source code is governed by a BSD-style |
|||
* license that can be found in the LICENSE file. |
|||
*/ |
|||
|
|||
package test |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"testing" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow" |
|||
"github.com/dtm-labs/dtm/test/busi" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestWorkflowGrpcSimple(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"} |
|||
gid := dtmimp.GetFuncName() |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.BusiReq |
|||
dtmgimp.MustProtoUnmarshal(data, &req) |
|||
_, err := busi.BusiCli.TransOutBSaga(wf.NewBranchCtx(), &req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = busi.BusiCli.TransInBSaga(wf.NewBranchCtx(), &req) |
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) |
|||
assert.Error(t, err, dtmcli.ErrFailure) |
|||
assert.Equal(t, StatusFailed, getTransStatus(gid)) |
|||
waitTransProcessed(gid) |
|||
} |
|||
|
|||
func TestWorkflowGrpcNormal(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) |
|||
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"} |
|||
gid := dtmimp.GetFuncName() |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.BusiReq |
|||
dtmgimp.MustProtoUnmarshal(data, &req) |
|||
wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) |
|||
return err |
|||
}) |
|||
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) |
|||
return err |
|||
}) |
|||
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req) |
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) |
|||
assert.Error(t, err, dtmcli.ErrFailure) |
|||
assert.Equal(t, StatusFailed, getTransStatus(gid)) |
|||
waitTransProcessed(gid) |
|||
} |
|||
|
|||
func TestWorkflowMixed(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
req := &busi.BusiReq{Amount: 30} |
|||
gid := dtmimp.GetFuncName() |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.BusiReq |
|||
dtmgimp.MustProtoUnmarshal(data, &req) |
|||
|
|||
wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) |
|||
return err |
|||
}) |
|||
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = wf.NewBranch().OnCommit(func(bb *dtmcli.BranchBarrier) error { |
|||
_, err := busi.BusiCli.TransInConfirm(wf.Context, &req) |
|||
return err |
|||
}).OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
req2 := &busi.ReqHTTP{Amount: 30} |
|||
_, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert") |
|||
return err |
|||
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { |
|||
err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "") |
|||
return nil, err |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess) |
|||
}) |
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) |
|||
assert.Nil(t, err) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
waitTransProcessed(gid) |
|||
} |
|||
|
|||
func TestWorkflowGrpcError(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) |
|||
req := &busi.BusiReq{Amount: 30} |
|||
gid := dtmimp.GetFuncName() |
|||
busi.MainSwitch.TransOutResult.SetOnce("ERROR") |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.BusiReq |
|||
dtmgimp.MustProtoUnmarshal(data, &req) |
|||
_, err := busi.BusiCli.TransOut(wf.NewBranchCtx(), &req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = busi.BusiCli.TransIn(wf.NewBranchCtx(), &req) |
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) |
|||
assert.Error(t, err) |
|||
go waitTransProcessed(gid) |
|||
cronTransOnceForwardCron(t, gid, 1000) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
@ -0,0 +1,120 @@ |
|||
/* |
|||
* Copyright (c) 2021 yedf. All rights reserved. |
|||
* Use of this source code is governed by a BSD-style |
|||
* license that can be found in the LICENSE file. |
|||
*/ |
|||
|
|||
package test |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"testing" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow" |
|||
"github.com/dtm-labs/dtm/test/busi" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestWorkflowNormal(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
req := busi.GenReqHTTP(30, false, false) |
|||
gid := dtmimp.GetFuncName() |
|||
|
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.ReqHTTP |
|||
dtmimp.MustUnmarshal(data, &req) |
|||
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn") |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
}) |
|||
|
|||
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|||
assert.Nil(t, err) |
|||
waitTransProcessed(gid) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
|
|||
func TestWorkflowRollback(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
|
|||
req := &busi.ReqHTTP{Amount: 30, TransInResult: dtmimp.ResultFailure} |
|||
gid := dtmimp.GetFuncName() |
|||
|
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.ReqHTTP |
|||
dtmimp.MustUnmarshal(data, &req) |
|||
_, err := wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom") |
|||
return err |
|||
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { |
|||
return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { |
|||
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "") |
|||
}) |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { |
|||
return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "") |
|||
}) |
|||
}).NewRequest().SetBody(req).Post(Busi + "/SagaBTransIn") |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
}) |
|||
|
|||
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|||
assert.Error(t, err, dtmcli.ErrFailure) |
|||
assert.Equal(t, StatusFailed, getTransStatus(gid)) |
|||
waitTransProcessed(gid) |
|||
} |
|||
|
|||
func TestWorkflowError(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
req := busi.GenReqHTTP(30, false, false) |
|||
gid := dtmimp.GetFuncName() |
|||
busi.MainSwitch.TransOutResult.SetOnce("ERROR") |
|||
|
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.ReqHTTP |
|||
dtmimp.MustUnmarshal(data, &req) |
|||
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") |
|||
return err |
|||
}) |
|||
|
|||
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|||
assert.Error(t, err) |
|||
go waitTransProcessed(gid) |
|||
cronTransOnceForwardCron(t, gid, 1000) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
|
|||
func TestWorkflowOngoing(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
req := busi.GenReqHTTP(30, false, false) |
|||
gid := dtmimp.GetFuncName() |
|||
busi.MainSwitch.TransOutResult.SetOnce("ONGOING") |
|||
|
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.ReqHTTP |
|||
dtmimp.MustUnmarshal(data, &req) |
|||
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") |
|||
return err |
|||
}) |
|||
|
|||
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|||
assert.Error(t, err) |
|||
go waitTransProcessed(gid) |
|||
cronTransOnceForwardCron(t, gid, 1000) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
@ -0,0 +1,154 @@ |
|||
/* |
|||
* Copyright (c) 2021 yedf. All rights reserved. |
|||
* Use of this source code is governed by a BSD-style |
|||
* license that can be found in the LICENSE file. |
|||
*/ |
|||
|
|||
package test |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"testing" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmcli/logger" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow" |
|||
"github.com/dtm-labs/dtm/test/busi" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
var ongoingStep = 0 |
|||
|
|||
func fetchOngoingStep(dest int) bool { |
|||
c := ongoingStep |
|||
logger.Debugf("ongoing step is: %d", c) |
|||
if c == dest { |
|||
ongoingStep++ |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func TestWorkflowSimpleResume(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|||
req := busi.GenReqHTTP(30, false, false) |
|||
gid := dtmimp.GetFuncName() |
|||
ongoingStep = 0 |
|||
|
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
if fetchOngoingStep(0) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
var req busi.ReqHTTP |
|||
dtmimp.MustUnmarshal(data, &req) |
|||
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut") |
|||
return err |
|||
}) |
|||
|
|||
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|||
assert.Error(t, err) |
|||
go waitTransProcessed(gid) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
|
|||
func TestWorkflowGrpcRollbackResume(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) |
|||
gid := dtmimp.GetFuncName() |
|||
ongoingStep = 0 |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
var req busi.BusiReq |
|||
dtmgimp.MustProtoUnmarshal(data, &req) |
|||
if fetchOngoingStep(0) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
if fetchOngoingStep(4) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) |
|||
return err |
|||
}) |
|||
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) |
|||
if fetchOngoingStep(1) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|||
if fetchOngoingStep(3) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) |
|||
return err |
|||
}) |
|||
_, err = busi.BusiCli.TransInBSaga(wf.Context, &req) |
|||
if fetchOngoingStep(2) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
return err |
|||
}, func(wf *workflow.Workflow) { |
|||
wf.Options.CompensateErrorBranch = true |
|||
}) |
|||
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"} |
|||
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) |
|||
assert.Error(t, err, dtmcli.ErrOngoing) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
|
|||
go waitTransProcessed(gid) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusFailed, getTransStatus(gid)) |
|||
} |
|||
|
|||
func TestWorkflowXaResume(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) |
|||
ongoingStep = 0 |
|||
gid := dtmimp.GetFuncName() |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
if fetchOngoingStep(0) { |
|||
return nil, dtmcli.ErrOngoing |
|||
} |
|||
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
if fetchOngoingStep(1) { |
|||
return nil, dtmcli.ErrOngoing |
|||
} |
|||
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if fetchOngoingStep(2) { |
|||
return dtmcli.ErrOngoing |
|||
} |
|||
|
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, nil) |
|||
assert.Equal(t, dtmcli.ErrOngoing, err) |
|||
|
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusPrepared, getTransStatus(gid)) |
|||
// next cron will make a workflow submit, and do an additional write to chan, so make an additional read chan
|
|||
go waitTransProcessed(gid) |
|||
cronTransOnceForwardNow(t, gid, 1000) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
@ -0,0 +1,63 @@ |
|||
/* |
|||
* Copyright (c) 2021 yedf. All rights reserved. |
|||
* Use of this source code is governed by a BSD-style |
|||
* license that can be found in the LICENSE file. |
|||
*/ |
|||
|
|||
package test |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"testing" |
|||
|
|||
"github.com/dtm-labs/dtm/dtmcli" |
|||
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|||
"github.com/dtm-labs/dtm/dtmcli/logger" |
|||
"github.com/dtm-labs/dtm/dtmgrpc/workflow" |
|||
"github.com/dtm-labs/dtm/test/busi" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestWorkflowXaAction(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) |
|||
gid := dtmimp.GetFuncName() |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) |
|||
}) |
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, nil) |
|||
assert.Nil(t, err) |
|||
waitTransProcessed(gid) |
|||
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|||
} |
|||
|
|||
func TestWorkflowXaRollback(t *testing.T) { |
|||
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) |
|||
gid := dtmimp.GetFuncName() |
|||
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|||
_, err := wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
return nil, busi.SagaAdjustBalance(db, busi.TransOutUID, -30, dtmcli.ResultSuccess) |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { |
|||
e := busi.SagaAdjustBalance(db, busi.TransInUID, 30, dtmcli.ResultSuccess) |
|||
logger.FatalIfError(e) |
|||
return nil, dtmcli.ErrFailure |
|||
}) |
|||
return err |
|||
}) |
|||
err := workflow.Execute(gid, gid, nil) |
|||
assert.Equal(t, dtmcli.ErrFailure, err) |
|||
waitTransProcessed(gid) |
|||
assert.Equal(t, StatusFailed, getTransStatus(gid)) |
|||
} |
|||
Loading…
Reference in new issue