Browse Source

optimize error message

pull/330/head
yedf2 4 years ago
parent
commit
1e6e8c2b2e
  1. 3
      client/dtmcli/dtmimp/vars.go
  2. 10
      client/dtmcli/types.go
  3. 2
      client/dtmcli/types_test.go
  4. 9
      client/dtmcli/utils.go
  5. 5
      client/dtmgrpc/type.go
  6. 4
      client/workflow/imp.go
  7. 13
      client/workflow/rpc.go
  8. 11
      client/workflow/utils.go
  9. 2
      dtmsvr/api.go

3
client/dtmcli/dtmimp/vars.go

@ -24,9 +24,6 @@ var ErrOngoing = errors.New("ONGOING")
// if QueryPrepared executed before call. then DoAndSubmit return this error
var ErrDuplicated = errors.New("DUPLICATED")
// XaSQLTimeoutMs milliseconds for Xa sql to timeout
var XaSQLTimeoutMs = 15000
// MapSuccess HTTP result of SUCCESS
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}

10
client/dtmcli/types.go

@ -30,16 +30,6 @@ func GetCurrentDBType() string {
return dtmimp.GetCurrentDBType()
}
// SetXaSQLTimeoutMs set XaSQLTimeoutMs
func SetXaSQLTimeoutMs(ms int) {
dtmimp.XaSQLTimeoutMs = ms
}
// GetXaSQLTimeoutMs get XaSQLTimeoutMs
func GetXaSQLTimeoutMs() int {
return dtmimp.XaSQLTimeoutMs
}
// SetBarrierTableName sets barrier table name
func SetBarrierTableName(tablename string) {
dtmimp.BarrierTableName = tablename

2
client/dtmcli/types_test.go

@ -26,7 +26,5 @@ func TestTypes(t *testing.T) {
}
func TestXaSqlTimeout(t *testing.T) {
old := GetXaSQLTimeoutMs()
SetXaSQLTimeoutMs(old)
SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func
}

9
client/dtmcli/utils.go

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/go-resty/resty/v2"
@ -29,6 +30,14 @@ func String2DtmError(str string) error {
}[str]
}
// ErrorMessage2Error return an error fmt.Errorf("%s. %w", errMsg, err) but trim out duplicate wrap
// eg. ErrorMessage2Error("an error. FAILURE", ErrFailure) return an error with message: "an error. FAILURE",
// no additional ". FAILURE" added
func ErrorMessage2Error(errMsg string, err error) error {
errMsg = strings.TrimSuffix(errMsg, ". "+err.Error())
return fmt.Errorf("%s. %w", errMsg, err)
}
// Result2HttpJSON return the http code and json result
// if result is error, the return proper code, else return StatusOK
func Result2HttpJSON(result interface{}) (code int, res interface{}) {

5
client/dtmgrpc/type.go

@ -9,7 +9,6 @@ package dtmgrpc
import (
context "context"
"errors"
"fmt"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -40,9 +39,9 @@ func GrpcError2DtmError(err error) error {
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ErrOngoing
}
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure)
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure)
} else if st != nil && st.Code() == codes.FailedPrecondition {
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrOngoing)
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing)
}
return err
}

4
client/workflow/imp.go

@ -109,7 +109,7 @@ func (wf *Workflow) process(handler WfFunc, data []byte) (err error) {
err = wf.processPhase2(err)
}
if err == nil || errors.Is(err, dtmcli.ErrFailure) {
err1 := wf.submit(wfErrorToStatus(err))
err1 := wf.submit(err)
if err1 != nil {
return err1
}
@ -178,7 +178,7 @@ func (wf *Workflow) recordedDoInner(fn func(bb *dtmcli.BranchBarrier) *stepResul
}
r := wf.getStepResult()
if r != nil {
logger.Debugf("progress restored: %s %s %v %s %s", branchID, wf.currentOp, r.Error, r.Status, r.Data)
logger.Debugf("progress restored: '%s' '%s' '%v' '%s' '%s'", branchID, wf.currentOp, r.Error, r.Status, r.Data)
return r
}
bb := &dtmcli.BranchBarrier{

13
client/workflow/rpc.go

@ -27,13 +27,19 @@ func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) {
return reply.Progresses, err
}
func (wf *Workflow) submit(status string) error {
func (wf *Workflow) submit(err error) error {
status := wfErrorToStatus(err)
reason := ""
if err != nil {
reason = err.Error()
}
if wf.Protocol == dtmimp.ProtocolHTTP {
m := map[string]interface{}{
"gid": wf.Gid,
"trans_type": wf.TransType,
"req_extra": map[string]string{
"status": status,
"status": status,
"rollback_reason": reason,
},
}
_, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit")
@ -41,7 +47,8 @@ func (wf *Workflow) submit(status string) error {
}
req := dtmgimp.GetDtmRequest(wf.TransBase)
req.ReqExtra = map[string]string{
"status": status,
"status": status,
"rollback_reason": reason,
}
reply := emptypb.Empty{}
return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply)

11
client/workflow/utils.go

@ -3,7 +3,6 @@ package workflow
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strconv"
@ -75,9 +74,9 @@ func HTTPResp2DtmError(resp *http.Response) ([]byte, error) {
data, err := ioutil.ReadAll(resp.Body)
resp.Body = ioutil.NopCloser(bytes.NewBuffer(data))
if code == http.StatusTooEarly {
return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrOngoing)
return data, dtmcli.ErrorMessage2Error(string(data), dtmcli.ErrOngoing)
} else if code == http.StatusConflict {
return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrFailure)
return data, dtmcli.ErrorMessage2Error(string(data), dtmcli.ErrFailure)
} else if err == nil && code != http.StatusOK {
return data, errors.New(string(data))
}
@ -88,9 +87,9 @@ func HTTPResp2DtmError(resp *http.Response) ([]byte, error) {
func GrpcError2DtmError(err error) error {
st, _ := status.FromError(err)
if st != nil && st.Code() == codes.Aborted {
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure)
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure)
} else if st != nil && st.Code() == codes.FailedPrecondition {
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrOngoing)
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing)
}
return err
}
@ -113,7 +112,7 @@ func (wf *Workflow) stepResultFromGrpc(reply interface{}, err error) *stepResult
if sr.Error == nil {
sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage))
} else if sr.Status == dtmcli.StatusFailed {
sr.Data = []byte(sr.Error.Error())
sr.Data = []byte(err.Error())
}
return sr
}

2
dtmsvr/api.go

@ -22,7 +22,7 @@ var Version = ""
func svcSubmit(t *TransGlobal) interface{} {
if t.TransType == "workflow" {
t.Status = dtmcli.StatusPrepared
t.changeStatus(t.ReqExtra["status"])
t.changeStatus(t.ReqExtra["status"], withRollbackReason(t.ReqExtra["rollback_reason"]))
return nil
}
t.Status = dtmcli.StatusSubmitted

Loading…
Cancel
Save