Browse Source

Merge pull request #313 from xyctruth/main

Support rollback_reason for JSONRPC
pull/316/head
yedf2 4 years ago
committed by GitHub
parent
commit
c4002a5aa0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dtmcli/dtmimp/utils.go
  2. 103
      dtmsvr/trans_status.go

17
dtmcli/dtmimp/utils.go

@ -233,6 +233,23 @@ func RespAsErrorCompatible(resp *resty.Response) error {
return nil
}
// RespAsErrorByJSONRPC translate json rpc resty response to error
func RespAsErrorByJSONRPC(resp *resty.Response) error {
str := resp.String()
var result map[string]interface{}
MustUnmarshalString(str, &result)
if result["error"] != nil {
rerr := result["error"].(map[string]interface{})
if rerr["code"] == JrpcCodeFailure {
return fmt.Errorf("%s. %w", str, ErrFailure)
} else if rerr["code"] == JrpcCodeOngoing {
return ErrOngoing
}
return errors.New(resp.String())
}
return nil
}
// DeferDo a common defer do used in dtmcli/dtmgrpc
func DeferDo(rerr *error, success func() error, fail func() error) {
if x := recover(); x != nil {

103
dtmsvr/trans_status.go

@ -113,59 +113,60 @@ func (t *TransGlobal) getURLResult(uri string, branchID, op string, branchPayloa
dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second)
}
if t.Protocol == "json-rpc" && strings.Contains(uri, "method") {
var params map[string]interface{}
dtmimp.MustUnmarshal(branchPayload, &params)
u, err := url.Parse(uri)
dtmimp.E2P(err)
params["gid"] = t.Gid
params["trans_type"] = t.TransType
params["branch_id"] = branchID
params["op"] = op
resp, err := dtmimp.RestyClient.R().SetBody(map[string]interface{}{
"params": params,
"jsonrpc": "2.0",
"method": u.Query().Get("method"),
"id": shortuuid.New(),
}).
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Post(uri)
if err == nil {
err = dtmimp.RespAsErrorCompatible(resp)
}
var result map[string]interface{}
if err == nil {
dtmimp.MustUnmarshalString(resp.String(), &result)
if result["error"] != nil {
rerr := result["error"].(map[string]interface{})
if rerr["code"] == dtmimp.JrpcCodeFailure {
return dtmcli.ErrFailure
} else if rerr["code"] == dtmimp.JrpcCodeOngoing {
return dtmcli.ErrOngoing
}
return errors.New(resp.String())
}
}
return err
return t.getJSONRPCResult(uri, branchID, op, branchPayload)
}
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,
"trans_type": t.TransType,
"branch_id": branchID,
"op": op,
}).
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), uri)
if err != nil {
return err
}
return dtmimp.RespAsErrorCompatible(resp)
return t.getHTTPResult(uri, branchID, op, branchPayload)
}
return t.getGrpcResult(uri, branchID, op, branchPayload)
}
func (t *TransGlobal) getHTTPResult(uri string, branchID, op string, branchPayload []byte) error {
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,
"trans_type": t.TransType,
"branch_id": branchID,
"op": op,
}).
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), uri)
if err != nil {
return err
}
dtmimp.PanicIf(t.Protocol == dtmimp.ProtocolHTTP, fmt.Errorf("bad url for http: %s", uri))
return dtmimp.RespAsErrorCompatible(resp)
}
func (t *TransGlobal) getJSONRPCResult(uri string, branchID, op string, branchPayload []byte) error {
var params map[string]interface{}
dtmimp.MustUnmarshal(branchPayload, &params)
u, err := url.Parse(uri)
dtmimp.E2P(err)
params["gid"] = t.Gid
params["trans_type"] = t.TransType
params["branch_id"] = branchID
params["op"] = op
resp, err := dtmimp.RestyClient.R().SetBody(map[string]interface{}{
"params": params,
"jsonrpc": "2.0",
"method": u.Query().Get("method"),
"id": shortuuid.New(),
}).
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Post(uri)
if err == nil {
err = dtmimp.RespAsErrorCompatible(resp)
}
if err == nil {
err = dtmimp.RespAsErrorByJSONRPC(resp)
}
return err
}
func (t *TransGlobal) getGrpcResult(uri string, branchID, op string, branchPayload []byte) error {
// grpc handler
server, method, err := dtmdriver.GetDriver().ParseServerMethod(uri)
if err != nil {

Loading…
Cancel
Save