diff --git a/client/dtmcli/dtmimp/vars.go b/client/dtmcli/dtmimp/vars.go index 3728b6c..a34d29f 100644 --- a/client/dtmcli/dtmimp/vars.go +++ b/client/dtmcli/dtmimp/vars.go @@ -24,9 +24,6 @@ var ErrOngoing = errors.New("ONGOING") // if QueryPrepared executed before call. then DoAndSubmit return this error var ErrDuplicated = errors.New("DUPLICATED") -// XaSQLTimeoutMs milliseconds for Xa sql to timeout -var XaSQLTimeoutMs = 15000 - // MapSuccess HTTP result of SUCCESS var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess} diff --git a/client/dtmcli/types.go b/client/dtmcli/types.go index 1263a17..be7fda1 100644 --- a/client/dtmcli/types.go +++ b/client/dtmcli/types.go @@ -30,16 +30,6 @@ func GetCurrentDBType() string { return dtmimp.GetCurrentDBType() } -// SetXaSQLTimeoutMs set XaSQLTimeoutMs -func SetXaSQLTimeoutMs(ms int) { - dtmimp.XaSQLTimeoutMs = ms -} - -// GetXaSQLTimeoutMs get XaSQLTimeoutMs -func GetXaSQLTimeoutMs() int { - return dtmimp.XaSQLTimeoutMs -} - // SetBarrierTableName sets barrier table name func SetBarrierTableName(tablename string) { dtmimp.BarrierTableName = tablename diff --git a/client/dtmcli/types_test.go b/client/dtmcli/types_test.go index b3cfbaf..f7c27b3 100644 --- a/client/dtmcli/types_test.go +++ b/client/dtmcli/types_test.go @@ -26,7 +26,5 @@ func TestTypes(t *testing.T) { } func TestXaSqlTimeout(t *testing.T) { - old := GetXaSQLTimeoutMs() - SetXaSQLTimeoutMs(old) SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func } diff --git a/client/dtmcli/utils.go b/client/dtmcli/utils.go index a67c275..3483719 100644 --- a/client/dtmcli/utils.go +++ b/client/dtmcli/utils.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/http" + "strings" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/go-resty/resty/v2" @@ -29,6 +30,14 @@ func String2DtmError(str string) error { }[str] } +// ErrorMessage2Error return an error fmt.Errorf("%s. %w", errMsg, err) but trim out duplicate wrap +// eg. ErrorMessage2Error("an error. FAILURE", ErrFailure) return an error with message: "an error. FAILURE", +// no additional ". FAILURE" added +func ErrorMessage2Error(errMsg string, err error) error { + errMsg = strings.TrimSuffix(errMsg, ". "+err.Error()) + return fmt.Errorf("%s. %w", errMsg, err) +} + // Result2HttpJSON return the http code and json result // if result is error, the return proper code, else return StatusOK func Result2HttpJSON(result interface{}) (code int, res interface{}) { diff --git a/client/dtmgrpc/type.go b/client/dtmgrpc/type.go index 795164c..883504e 100644 --- a/client/dtmgrpc/type.go +++ b/client/dtmgrpc/type.go @@ -9,7 +9,6 @@ package dtmgrpc import ( context "context" "errors" - "fmt" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -40,9 +39,9 @@ func GrpcError2DtmError(err error) error { if st.Message() == dtmcli.ResultOngoing { return dtmcli.ErrOngoing } - return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure) + return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure) } else if st != nil && st.Code() == codes.FailedPrecondition { - return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrOngoing) + return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing) } return err } diff --git a/client/workflow/imp.go b/client/workflow/imp.go index a987807..55d08bb 100644 --- a/client/workflow/imp.go +++ b/client/workflow/imp.go @@ -109,7 +109,7 @@ func (wf *Workflow) process(handler WfFunc, data []byte) (err error) { err = wf.processPhase2(err) } if err == nil || errors.Is(err, dtmcli.ErrFailure) { - err1 := wf.submit(wfErrorToStatus(err)) + err1 := wf.submit(err) if err1 != nil { return err1 } @@ -178,7 +178,7 @@ func (wf *Workflow) recordedDoInner(fn func(bb *dtmcli.BranchBarrier) *stepResul } r := wf.getStepResult() if r != nil { - logger.Debugf("progress restored: %s %s %v %s %s", branchID, wf.currentOp, r.Error, r.Status, r.Data) + logger.Debugf("progress restored: '%s' '%s' '%v' '%s' '%s'", branchID, wf.currentOp, r.Error, r.Status, r.Data) return r } bb := &dtmcli.BranchBarrier{ diff --git a/client/workflow/rpc.go b/client/workflow/rpc.go index 98b7e9e..29dc96e 100644 --- a/client/workflow/rpc.go +++ b/client/workflow/rpc.go @@ -27,13 +27,19 @@ func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) { return reply.Progresses, err } -func (wf *Workflow) submit(status string) error { +func (wf *Workflow) submit(err error) error { + status := wfErrorToStatus(err) + reason := "" + if err != nil { + reason = err.Error() + } if wf.Protocol == dtmimp.ProtocolHTTP { m := map[string]interface{}{ "gid": wf.Gid, "trans_type": wf.TransType, "req_extra": map[string]string{ - "status": status, + "status": status, + "rollback_reason": reason, }, } _, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit") @@ -41,7 +47,8 @@ func (wf *Workflow) submit(status string) error { } req := dtmgimp.GetDtmRequest(wf.TransBase) req.ReqExtra = map[string]string{ - "status": status, + "status": status, + "rollback_reason": reason, } reply := emptypb.Empty{} return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply) diff --git a/client/workflow/utils.go b/client/workflow/utils.go index f2e3ca9..b8d6739 100644 --- a/client/workflow/utils.go +++ b/client/workflow/utils.go @@ -3,7 +3,6 @@ package workflow import ( "bytes" "errors" - "fmt" "io/ioutil" "net/http" "strconv" @@ -75,9 +74,9 @@ func HTTPResp2DtmError(resp *http.Response) ([]byte, error) { data, err := ioutil.ReadAll(resp.Body) resp.Body = ioutil.NopCloser(bytes.NewBuffer(data)) if code == http.StatusTooEarly { - return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrOngoing) + return data, dtmcli.ErrorMessage2Error(string(data), dtmcli.ErrOngoing) } else if code == http.StatusConflict { - return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrFailure) + return data, dtmcli.ErrorMessage2Error(string(data), dtmcli.ErrFailure) } else if err == nil && code != http.StatusOK { return data, errors.New(string(data)) } @@ -88,9 +87,9 @@ func HTTPResp2DtmError(resp *http.Response) ([]byte, error) { func GrpcError2DtmError(err error) error { st, _ := status.FromError(err) if st != nil && st.Code() == codes.Aborted { - return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure) + return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure) } else if st != nil && st.Code() == codes.FailedPrecondition { - return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrOngoing) + return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing) } return err } @@ -113,7 +112,7 @@ func (wf *Workflow) stepResultFromGrpc(reply interface{}, err error) *stepResult if sr.Error == nil { sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage)) } else if sr.Status == dtmcli.StatusFailed { - sr.Data = []byte(sr.Error.Error()) + sr.Data = []byte(err.Error()) } return sr } diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 13d5e20..694e806 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -22,7 +22,7 @@ var Version = "" func svcSubmit(t *TransGlobal) interface{} { if t.TransType == "workflow" { t.Status = dtmcli.StatusPrepared - t.changeStatus(t.ReqExtra["status"]) + t.changeStatus(t.ReqExtra["status"], withRollbackReason(t.ReqExtra["rollback_reason"])) return nil } t.Status = dtmcli.StatusSubmitted