From 9e0e3de159acb1f3eae87e1b09cc211fb412e4f7 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sun, 27 Feb 2022 11:11:51 +0800 Subject: [PATCH] add json-rpc callback judge --- dtmcli/dtmimp/consts.go | 5 ++++ dtmsvr/api_json_rpc.go | 7 ++---- dtmsvr/trans_status.go | 51 ++++++++++++++++++++++++++++++++++++----- test/busi/base_http.go | 1 + test/busi/base_jrpc.go | 37 ++++++++++++++++++++++++++++++ test/busi/base_types.go | 4 ++++ test/msg_jrpc_test.go | 21 ++++++++++++++++- 7 files changed, 114 insertions(+), 12 deletions(-) create mode 100644 test/busi/base_jrpc.go diff --git a/dtmcli/dtmimp/consts.go b/dtmcli/dtmimp/consts.go index 4248d62..d029552 100644 --- a/dtmcli/dtmimp/consts.go +++ b/dtmcli/dtmimp/consts.go @@ -21,4 +21,9 @@ const ( DBTypeRedis = "redis" // Jrpc const for json-rpc Jrpc = "json-rpc" + // JrpcCodeFailure const for json-rpc failure + JrpcCodeFailure = -32901 + + // JrpcCodeOngoing const for json-rpc ongoing + JrpcCodeOngoing = -32902 ) diff --git a/dtmsvr/api_json_rpc.go b/dtmsvr/api_json_rpc.go index e0ae3ea..fc211d2 100644 --- a/dtmsvr/api_json_rpc.go +++ b/dtmsvr/api_json_rpc.go @@ -12,9 +12,6 @@ import ( "github.com/gin-gonic/gin" ) -const jrpcCodeFailure = -32901 -const jrpcCodeOngoing = -32902 - type jrpcReq struct { Method string `json:"method"` Jsonrpc string `json:"jsonrpc"` @@ -68,7 +65,7 @@ func addJrpcRouter(engine *gin.Engine) { if err != nil { if errors.Is(err, dtmcli.ErrFailure) { jerr = map[string]interface{}{ - "code": jrpcCodeFailure, + "code": dtmimp.JrpcCodeFailure, "message": err.Error(), } //// following is commented for server @@ -93,7 +90,7 @@ func addJrpcRouter(engine *gin.Engine) { } b, _ := json.Marshal(result) cont := string(b) - if jerr == nil || jerr["code"] == jrpcCodeOngoing { + if jerr == nil || jerr["code"] == dtmimp.JrpcCodeOngoing { logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont) } else { logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont) diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 00c0c58..7dc62c9 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -9,6 +9,7 @@ package dtmsvr import ( "errors" "fmt" + "net/url" "strings" "time" @@ -19,6 +20,7 @@ import ( "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtmdriver" + "github.com/lithammer/shortuuid/v3" "google.golang.org/grpc/metadata" ) @@ -89,14 +91,51 @@ func (t *TransGlobal) needProcess() bool { return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout() } -func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) error { - if url == "" { // empty url is success +func (t *TransGlobal) getURLResult(uri string, branchID, op string, branchPayload []byte) error { + if uri == "" { // empty url is success return nil } - if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { + if strings.HasPrefix(uri, "http://") || strings.HasPrefix(uri, "https://") { if t.RequestTimeout != 0 { dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second) } + if t.Protocol == "json-rpc" { + var params map[string]interface{} + dtmimp.MustUnmarshal(branchPayload, ¶ms) + u, err := url.Parse(uri) + dtmimp.E2P(err) + params["gid"] = t.Gid + params["trans_type"] = t.TransType + params["branch_id"] = branchID + params["op"] = op + resp, err := dtmimp.RestyClient.R().SetBody(map[string]interface{}{ + "params": params, + "jsonrpc": "2.0", + "method": u.Query().Get("method"), + "id": shortuuid.New(), + }). + SetHeader("Content-type", "application/json"). + SetHeaders(t.Ext.Headers). + SetHeaders(t.TransOptions.BranchHeaders). + Post(uri) + if err == nil { + err = dtmimp.RespAsErrorCompatible(resp) + } + var result map[string]interface{} + if err == nil { + dtmimp.MustUnmarshalString(resp.String(), &result) + if result["error"] != nil { + rerr := result["error"].(map[string]interface{}) + if rerr["code"] == dtmimp.JrpcCodeFailure { + return dtmcli.ErrFailure + } else if rerr["code"] == dtmimp.JrpcCodeOngoing { + return dtmcli.ErrOngoing + } + return errors.New(resp.String()) + } + } + return err + } resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). SetQueryParams(map[string]string{ "gid": t.Gid, @@ -107,15 +146,15 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa SetHeader("Content-type", "application/json"). SetHeaders(t.Ext.Headers). SetHeaders(t.TransOptions.BranchHeaders). - Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url) + Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), uri) if err != nil { return err } return dtmimp.RespAsErrorCompatible(resp) } - dtmimp.PanicIf(t.Protocol == "http", fmt.Errorf("bad url for http: %s", url)) + dtmimp.PanicIf(t.Protocol == "http", fmt.Errorf("bad url for http: %s", uri)) // grpc handler - server, method, err := dtmdriver.GetDriver().ParseServerMethod(url) + server, method, err := dtmdriver.GetDriver().ParseServerMethod(uri) if err != nil { return err } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 5725694..44bb262 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -72,6 +72,7 @@ func BaseAppStartup() *gin.Engine { logger.FatalIfError(err) BaseAddRoute(app) + addJrpcRoute(app) for k, v := range setupFuncs { logger.Debugf("initing %s", k) v(app) diff --git a/test/busi/base_jrpc.go b/test/busi/base_jrpc.go new file mode 100644 index 0000000..91c35f4 --- /dev/null +++ b/test/busi/base_jrpc.go @@ -0,0 +1,37 @@ +package busi + +import ( + "fmt" + + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmutil" + "github.com/gin-gonic/gin" +) + +var BusiJrpcUrl = fmt.Sprintf("http://localhost:%d/api/json-rpc?method=", BusiPort) + +func addJrpcRoute(app *gin.Engine) { + app.POST("/api/json-rpc", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + var data map[string]interface{} + err := c.BindJSON(&data) + dtmimp.E2P(err) + logger.Debugf("method is: %s", data["method"]) + var rerr map[string]interface{} + r := MainSwitch.JrpcResult.Fetch() + if r != "" { + rerr = map[string]interface{}{ + "code": map[string]int{ + "FAILURE": dtmimp.JrpcCodeFailure, + "ONGOING": dtmimp.JrpcCodeOngoing, + "OTHER": -23977, + }, + } + } + return map[string]interface{}{ + "jsonrpc": "2.0", + "error": rerr, + "id": data["id"], + } + })) +} diff --git a/test/busi/base_types.go b/test/busi/base_types.go index b2a4a32..c57bd86 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -126,6 +126,9 @@ func (s *AutoEmptyString) SetOnce(v string) { func (s *AutoEmptyString) Fetch() string { v := s.value s.value = "" + if v != "" { + logger.Debugf("fetch obtain not empty value: %s", v) + } return v } @@ -138,6 +141,7 @@ type mainSwitchType struct { TransOutRevertResult AutoEmptyString QueryPreparedResult AutoEmptyString NextResult AutoEmptyString + JrpcResult AutoEmptyString } // MainSwitch controls busi success or fail diff --git a/test/msg_jrpc_test.go b/test/msg_jrpc_test.go index 61534ae..3f86d05 100644 --- a/test/msg_jrpc_test.go +++ b/test/msg_jrpc_test.go @@ -27,6 +27,25 @@ func TestMsgJrpcNormal(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) } +func TestMsgJrpcResults(t *testing.T) { + msg := genJrpcMsg(dtmimp.GetFuncName()) + busi.MainSwitch.JrpcResult.SetOnce("OTHER") + err := msg.Submit() + assert.Nil(t, err) + waitTransProcessed(msg.Gid) + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + busi.MainSwitch.JrpcResult.SetOnce("ONGOING") + cronTransOnceForwardNow(t, msg.Gid, 180) + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + busi.MainSwitch.JrpcResult.SetOnce("FAILURE") + cronTransOnceForwardNow(t, msg.Gid, 180) + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + + cronTransOnceForwardNow(t, msg.Gid, 180) + assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) +} + func TestMsgJrpcDoAndSubmit(t *testing.T) { before := getBeforeBalances("mysql") gid := dtmimp.GetFuncName() @@ -119,7 +138,7 @@ func genJrpcMsg(gid string) *dtmcli.Msg { req := busi.GenTransReq(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid). Add(busi.Busi+"/TransOut", &req). - Add(busi.Busi+"/TransIn", &req) + Add(busi.BusiJrpcUrl+"TransIn", &req) msg.QueryPrepared = busi.Busi + "/QueryPrepared" msg.Protocol = dtmimp.Jrpc return msg