From 8f08358c7db3d33f8a896d6afd0131845adf0512 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 5 Jul 2022 17:07:30 +0800 Subject: [PATCH] refactored to use Options.2DtmError --- dtmgrpc/workflow/imp.go | 29 +++++++----- dtmgrpc/workflow/utils.go | 86 ++++++++++++++++++------------------ dtmgrpc/workflow/workflow.go | 23 ++++++---- 3 files changed, 75 insertions(+), 63 deletions(-) diff --git a/dtmgrpc/workflow/imp.go b/dtmgrpc/workflow/imp.go index d58a320..08ecdb1 100644 --- a/dtmgrpc/workflow/imp.go +++ b/dtmgrpc/workflow/imp.go @@ -36,10 +36,14 @@ func (wf *Workflow) loadProgresses() error { if err == nil { wf.progresses = map[string]*stepResult{} for _, p := range progresses { - wf.progresses[p.BranchID+"-"+p.Op] = &stepResult{ + sr := &stepResult{ Status: p.Status, Data: p.BinData, } + if sr.Status == dtmcli.StatusFailed { + sr.Error = fmt.Errorf("%s. %w", string(p.BinData), dtmcli.ErrFailure) + } + wf.progresses[p.BranchID+"-"+p.Op] = sr } } return err @@ -71,6 +75,8 @@ func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Wor "data": data, }) wf.Context = context.WithValue(wf.Context, wfMeta{}, wf) + wf.Options.HTTPResp2DtmError = HTTPResp2DtmError + wf.Options.GRPCError2DtmError = dtmgrpc.GrpcError2DtmError wf.initRestyClient() return wf } @@ -90,11 +96,7 @@ func (wf *Workflow) initRestyClient() { old := wf.restyClient.GetClient().Transport wf.restyClient.GetClient().Transport = newRoundTripper(old, wf) wf.restyClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { - err := dtmimp.AfterResponse(c, r) - if err == nil && !wf.Options.DisalbeAutoError { - err = dtmimp.RespAsErrorCompatible(r) // check for dtm error - } - return err + return dtmimp.AfterResponse(c, r) }) } @@ -119,10 +121,13 @@ func (wf *Workflow) process(handler WfFunc, data []byte) (err error) { } func (wf *Workflow) saveResult(branchID string, op string, sr *stepResult) error { - if sr.Status == "" { - return sr.Error + if sr.Status != "" { + err := wf.registerBranch(sr.Data, branchID, op, sr.Status) + if err != nil { + return err + } } - return wf.registerBranch(sr.Data, branchID, op, sr.Status) + return sr.Error } func (wf *Workflow) processPhase2(err error) error { @@ -151,9 +156,9 @@ func (wf *Workflow) callPhase2(branchID string, fn WfPhase2Func) error { if errors.Is(err, dtmcli.ErrFailure) { panic("should not return ErrFail in phase2") } - return stepResultFromLocal(nil, err) + return wf.stepResultFromLocal(nil, err) }) - _, err := stepResultToLocal(r) + _, err := wf.stepResultToLocal(r) return err } @@ -187,7 +192,7 @@ func (wf *Workflow) recordedDoInner(fn func(bb *dtmcli.BranchBarrier) *stepResul r = fn(bb) err := wf.saveResult(branchID, wf.currentOp, r) if err != nil { - r = stepResultFromLocal(nil, err) + r = wf.stepResultFromLocal(nil, err) } return r } diff --git a/dtmgrpc/workflow/utils.go b/dtmgrpc/workflow/utils.go index e5abc38..da4f967 100644 --- a/dtmgrpc/workflow/utils.go +++ b/dtmgrpc/workflow/utils.go @@ -1,6 +1,7 @@ package workflow import ( + "bytes" "errors" "fmt" "io/ioutil" @@ -10,8 +11,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" ) @@ -58,21 +57,39 @@ func newJSONResponse(status int, result []byte) *http.Response { func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { wf := r.wf + origin := func(bb *dtmcli.BranchBarrier) *stepResult { + resp, err := r.old.RoundTrip(req) + return wf.stepResultFromHTTP(resp, err) + } + var sr *stepResult if wf.currentOp != dtmimp.OpAction { // in phase 2, do not save, because it is saved outer - return r.old.RoundTrip(req) + sr = origin(nil) + } else { + sr = wf.recordedDo(origin) } - sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { - resp, err := r.old.RoundTrip(req) - return stepResultFromHTTP(resp, err) - }) - return stepResultToHTTP(sr) + return wf.stepResultToHTTP(sr) } func newRoundTripper(old http.RoundTripper, wf *Workflow) http.RoundTripper { return &roundTripper{old: old, wf: wf} } -func stepResultFromLocal(data []byte, err error) *stepResult { +// HTTPResp2DtmError check for dtm error and return it +func HTTPResp2DtmError(resp *http.Response) ([]byte, error) { + code := resp.StatusCode + data, err := ioutil.ReadAll(resp.Body) + resp.Body = ioutil.NopCloser(bytes.NewBuffer(data)) + if code == http.StatusTooEarly { + return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrOngoing) + } else if code == http.StatusConflict { + return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrFailure) + } else if err == nil && code != http.StatusOK { + return data, errors.New(string(data)) + } + return data, err +} + +func (wf *Workflow) stepResultFromLocal(data []byte, err error) *stepResult { return &stepResult{ Error: err, Status: wfErrorToStatus(err), @@ -80,56 +97,41 @@ func stepResultFromLocal(data []byte, err error) *stepResult { } } -func stepResultToLocal(s *stepResult) ([]byte, error) { - if s.Error != nil { - return nil, s.Error - } else if s.Status == dtmcli.StatusFailed { - return nil, fmt.Errorf("%s. %w", string(s.Data), dtmcli.ErrFailure) - } - return s.Data, nil +func (wf *Workflow) stepResultToLocal(sr *stepResult) ([]byte, error) { + return sr.Data, sr.Error } -func stepResultFromGrpc(reply interface{}, err error) *stepResult { - sr := &stepResult{} - st, ok := status.FromError(err) +func (wf *Workflow) stepResultFromGrpc(reply interface{}, err error) *stepResult { + sr := &stepResult{Error: err} if err == nil { - sr.Status = dtmcli.StatusSucceed - sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage)) - } else if ok && st.Code() == codes.Aborted { - sr.Status = dtmcli.StatusFailed - sr.Data = []byte(st.Message()) - } else { - sr.Error = err + sr.Error = wf.Options.GRPCError2DtmError(err) + sr.Status = wfErrorToStatus(sr.Error) + if sr.Error == nil { + sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage)) + } else if sr.Status == dtmcli.StatusFailed { + sr.Data = []byte(sr.Error.Error()) + } } return sr } -func stepResultToGrpc(s *stepResult, reply interface{}) error { - if s.Error != nil { - return s.Error - } else if s.Status == dtmcli.StatusSucceed { +func (wf *Workflow) stepResultToGrpc(s *stepResult, reply interface{}) error { + if s.Error == nil && s.Status == dtmcli.StatusSucceed { dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage)) - return nil } - return status.New(codes.Aborted, string(s.Data)).Err() + return s.Error } -func stepResultFromHTTP(resp *http.Response, err error) *stepResult { +func (wf *Workflow) stepResultFromHTTP(resp *http.Response, err error) *stepResult { sr := &stepResult{Error: err} if err == nil { - sr.Data, sr.Error = ioutil.ReadAll(resp.Body) - if resp.StatusCode == http.StatusOK { - sr.Status = dtmcli.StatusSucceed - } else if resp.StatusCode == http.StatusConflict { - sr.Status = dtmcli.StatusFailed - } else { - sr.Error = errors.New(string(sr.Data)) - } + sr.Data, sr.Error = wf.Options.HTTPResp2DtmError(resp) + sr.Status = wfErrorToStatus(sr.Error) } return sr } -func stepResultToHTTP(s *stepResult) (*http.Response, error) { +func (wf *Workflow) stepResultToHTTP(s *stepResult) (*http.Response, error) { if s.Error != nil { return nil, s.Error } diff --git a/dtmgrpc/workflow/workflow.go b/dtmgrpc/workflow/workflow.go index 6fdd131..29116aa 100644 --- a/dtmgrpc/workflow/workflow.go +++ b/dtmgrpc/workflow/workflow.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "net/http" "net/url" "github.com/dtm-labs/dtm/dtmcli" @@ -59,12 +60,16 @@ func ExecuteByQS(qs url.Values, body []byte) error { // Options is for specifying workflow options type Options struct { - // default false: Workflow's restyClient will convert http response to error if status code is not 200 - // if this flag is set true, then Workflow's restyClient will keep the origin http response - DisalbeAutoError bool - // default false: fn registered by OnBranchRollback will not be called for FAILURE branch - // if this flag is set true, then fn will be called. the user should handle null rollback and dangling + // Default: Code 409 => ErrFailure; Code 425 => ErrOngoing + HTTPResp2DtmError func(*http.Response) ([]byte, error) + + // Default: Code Aborted => ErrFailure; Code FailedPrecondition => ErrOngoing + GRPCError2DtmError func(error) error + + // This Option specify whether a branch returning ErrFailure should be compensated on rollback. + // for most idempotent branches, no compensation is needed. + // But for a timeout request, the caller cannot know where the request is successful, so the compensation should be called CompensateErrorBranch bool } @@ -147,9 +152,9 @@ func (wf *Workflow) OnBranchCommit(fn WfPhase2Func) *Workflow { func (wf *Workflow) Do(fn func(bb *dtmcli.BranchBarrier) ([]byte, error)) ([]byte, error) { res := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { r, e := fn(bb) - return stepResultFromLocal(r, e) + return wf.stepResultFromLocal(r, e) }) - return stepResultToLocal(res) + return wf.stepResultToLocal(res) } // DoXa will begin a local xa transaction @@ -217,7 +222,7 @@ func Interceptor(ctx context.Context, method string, req, reply interface{}, cc } sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult { err := origin() - return stepResultFromGrpc(reply, err) + return wf.stepResultFromGrpc(reply, err) }) - return stepResultToGrpc(sr, reply) + return wf.stepResultToGrpc(sr, reply) }