Browse Source

add json-rpc callback judge

pull/237/head
yedf2 4 years ago
parent
commit
9e0e3de159
  1. 5
      dtmcli/dtmimp/consts.go
  2. 7
      dtmsvr/api_json_rpc.go
  3. 51
      dtmsvr/trans_status.go
  4. 1
      test/busi/base_http.go
  5. 37
      test/busi/base_jrpc.go
  6. 4
      test/busi/base_types.go
  7. 21
      test/msg_jrpc_test.go

5
dtmcli/dtmimp/consts.go

@ -21,4 +21,9 @@ const (
DBTypeRedis = "redis"
// Jrpc const for json-rpc
Jrpc = "json-rpc"
// JrpcCodeFailure const for json-rpc failure
JrpcCodeFailure = -32901
// JrpcCodeOngoing const for json-rpc ongoing
JrpcCodeOngoing = -32902
)

7
dtmsvr/api_json_rpc.go

@ -12,9 +12,6 @@ import (
"github.com/gin-gonic/gin"
)
const jrpcCodeFailure = -32901
const jrpcCodeOngoing = -32902
type jrpcReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
@ -68,7 +65,7 @@ func addJrpcRouter(engine *gin.Engine) {
if err != nil {
if errors.Is(err, dtmcli.ErrFailure) {
jerr = map[string]interface{}{
"code": jrpcCodeFailure,
"code": dtmimp.JrpcCodeFailure,
"message": err.Error(),
}
//// following is commented for server
@ -93,7 +90,7 @@ func addJrpcRouter(engine *gin.Engine) {
}
b, _ := json.Marshal(result)
cont := string(b)
if jerr == nil || jerr["code"] == jrpcCodeOngoing {
if jerr == nil || jerr["code"] == dtmimp.JrpcCodeOngoing {
logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
} else {
logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)

51
dtmsvr/trans_status.go

@ -9,6 +9,7 @@ package dtmsvr
import (
"errors"
"fmt"
"net/url"
"strings"
"time"
@ -19,6 +20,7 @@ import (
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtmdriver"
"github.com/lithammer/shortuuid/v3"
"google.golang.org/grpc/metadata"
)
@ -89,14 +91,51 @@ func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) error {
if url == "" { // empty url is success
func (t *TransGlobal) getURLResult(uri string, branchID, op string, branchPayload []byte) error {
if uri == "" { // empty url is success
return nil
}
if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
if strings.HasPrefix(uri, "http://") || strings.HasPrefix(uri, "https://") {
if t.RequestTimeout != 0 {
dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second)
}
if t.Protocol == "json-rpc" {
var params map[string]interface{}
dtmimp.MustUnmarshal(branchPayload, &params)
u, err := url.Parse(uri)
dtmimp.E2P(err)
params["gid"] = t.Gid
params["trans_type"] = t.TransType
params["branch_id"] = branchID
params["op"] = op
resp, err := dtmimp.RestyClient.R().SetBody(map[string]interface{}{
"params": params,
"jsonrpc": "2.0",
"method": u.Query().Get("method"),
"id": shortuuid.New(),
}).
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Post(uri)
if err == nil {
err = dtmimp.RespAsErrorCompatible(resp)
}
var result map[string]interface{}
if err == nil {
dtmimp.MustUnmarshalString(resp.String(), &result)
if result["error"] != nil {
rerr := result["error"].(map[string]interface{})
if rerr["code"] == dtmimp.JrpcCodeFailure {
return dtmcli.ErrFailure
} else if rerr["code"] == dtmimp.JrpcCodeOngoing {
return dtmcli.ErrOngoing
}
return errors.New(resp.String())
}
}
return err
}
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,
@ -107,15 +146,15 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
SetHeader("Content-type", "application/json").
SetHeaders(t.Ext.Headers).
SetHeaders(t.TransOptions.BranchHeaders).
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url)
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), uri)
if err != nil {
return err
}
return dtmimp.RespAsErrorCompatible(resp)
}
dtmimp.PanicIf(t.Protocol == "http", fmt.Errorf("bad url for http: %s", url))
dtmimp.PanicIf(t.Protocol == "http", fmt.Errorf("bad url for http: %s", uri))
// grpc handler
server, method, err := dtmdriver.GetDriver().ParseServerMethod(url)
server, method, err := dtmdriver.GetDriver().ParseServerMethod(uri)
if err != nil {
return err
}

1
test/busi/base_http.go

@ -72,6 +72,7 @@ func BaseAppStartup() *gin.Engine {
logger.FatalIfError(err)
BaseAddRoute(app)
addJrpcRoute(app)
for k, v := range setupFuncs {
logger.Debugf("initing %s", k)
v(app)

37
test/busi/base_jrpc.go

@ -0,0 +1,37 @@
package busi
import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
)
var BusiJrpcUrl = fmt.Sprintf("http://localhost:%d/api/json-rpc?method=", BusiPort)
func addJrpcRoute(app *gin.Engine) {
app.POST("/api/json-rpc", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
var data map[string]interface{}
err := c.BindJSON(&data)
dtmimp.E2P(err)
logger.Debugf("method is: %s", data["method"])
var rerr map[string]interface{}
r := MainSwitch.JrpcResult.Fetch()
if r != "" {
rerr = map[string]interface{}{
"code": map[string]int{
"FAILURE": dtmimp.JrpcCodeFailure,
"ONGOING": dtmimp.JrpcCodeOngoing,
"OTHER": -23977,
},
}
}
return map[string]interface{}{
"jsonrpc": "2.0",
"error": rerr,
"id": data["id"],
}
}))
}

4
test/busi/base_types.go

@ -126,6 +126,9 @@ func (s *AutoEmptyString) SetOnce(v string) {
func (s *AutoEmptyString) Fetch() string {
v := s.value
s.value = ""
if v != "" {
logger.Debugf("fetch obtain not empty value: %s", v)
}
return v
}
@ -138,6 +141,7 @@ type mainSwitchType struct {
TransOutRevertResult AutoEmptyString
QueryPreparedResult AutoEmptyString
NextResult AutoEmptyString
JrpcResult AutoEmptyString
}
// MainSwitch controls busi success or fail

21
test/msg_jrpc_test.go

@ -27,6 +27,25 @@ func TestMsgJrpcNormal(t *testing.T) {
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgJrpcResults(t *testing.T) {
msg := genJrpcMsg(dtmimp.GetFuncName())
busi.MainSwitch.JrpcResult.SetOnce("OTHER")
err := msg.Submit()
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
busi.MainSwitch.JrpcResult.SetOnce("ONGOING")
cronTransOnceForwardNow(t, msg.Gid, 180)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
busi.MainSwitch.JrpcResult.SetOnce("FAILURE")
cronTransOnceForwardNow(t, msg.Gid, 180)
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
cronTransOnceForwardNow(t, msg.Gid, 180)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgJrpcDoAndSubmit(t *testing.T) {
before := getBeforeBalances("mysql")
gid := dtmimp.GetFuncName()
@ -119,7 +138,7 @@ func genJrpcMsg(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)
Add(busi.BusiJrpcUrl+"TransIn", &req)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
msg.Protocol = dtmimp.Jrpc
return msg

Loading…
Cancel
Save