diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index 879ffc2..4ec247b 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -65,6 +65,7 @@ type TransBase struct { Op string `json:"-"` // used in XA/TCC QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG + Protocol string `json:"protocol"` } // NewTransBase new a TransBase @@ -93,6 +94,25 @@ func TransCallDtm(tb *TransBase, body interface{}, operation string) error { if tb.RequestTimeout != 0 { RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) } + if tb.Protocol == "json-rpc" { + var result map[string]interface{} + resp, err := RestyClient.R(). + SetBody(map[string]interface{}{ + "jsonrpc": "2.0", + "id": "no-use", + "method": operation, + "params": body, + }). + SetResult(&result). + Post(tb.Dtm) + if err != nil { + return err + } + if resp.StatusCode() != http.StatusOK || result["error"] != nil { + return errors.New(resp.String()) + } + return nil + } resp, err := RestyClient.R(). SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { @@ -118,6 +138,9 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin // TransRequestBranch TransBase request branch result func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) { + if url == "" { + return nil, nil + } resp, err := RestyClient.R(). SetBody(body). SetQueryParams(map[string]string{ diff --git a/dtmsvr/api_json_rpc.go b/dtmsvr/api_json_rpc.go index 8f9a571..d9f3936 100644 --- a/dtmsvr/api_json_rpc.go +++ b/dtmsvr/api_json_rpc.go @@ -25,13 +25,13 @@ type jrpcReq struct { func addJrpcRouter(engine *gin.Engine) { type jrpcFunc = func(interface{}) interface{} handlers := map[string]jrpcFunc{ - "dtmserver.newGid": jrpcNewGid, - "dtmserver.prepare": jrpcPrepare, - "dtmserver.submit": jrpcSubmit, - "dtmserver.abort": jrpcAbort, - "dtmserver.registerBranch": jrpcRegisterBranch, + "newGid": jrpcNewGid, + "prepare": jrpcPrepare, + "submit": jrpcSubmit, + "abort": jrpcAbort, + "registerBranch": jrpcRegisterBranch, } - engine.POST("/", func(c *gin.Context) { + engine.POST("/api/json-rpc", func(c *gin.Context) { began := time.Now() var err error var req jrpcReq @@ -105,6 +105,7 @@ func addJrpcRouter(engine *gin.Engine) { func TransFromJrpcParams(params interface{}) *TransGlobal { t := TransGlobal{} dtmimp.MustRemarshal(params, &t) + t.setupPayloads() return &t } diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index 49d5583..4fafd89 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -43,6 +43,7 @@ func StartSvr() { app := dtmutil.GetGinApp() app = httpMetrics(app) addRoute(app) + addJrpcRouter(app) logger.Infof("dtmsvr http listen at: %d", conf.HTTPPort) go func() { err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort)) @@ -51,18 +52,6 @@ func StartSvr() { } }() - // start json-rpc server - jrpcApp := dtmutil.GetGinApp() - jrpcApp = httpMetrics(jrpcApp) - addJrpcRouter(jrpcApp) - logger.Infof("dtmsvr json-rpc listen at: %d", conf.JSONRPCPort) - go func() { - err := jrpcApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort)) - if err != nil { - logger.Errorf("start server err: %v", err) - } - }() - // start grpc server lis, err := net.Listen("tcp", fmt.Sprintf(":%d", conf.GrpcPort)) logger.FatalIfError(err) diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 8aa1b60..2eae58c 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -26,6 +26,22 @@ type TransGlobal struct { updateBranchSync bool } +func (t *TransGlobal) setupPayloads() { + // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal + for _, p := range t.Payloads { + t.BinPayloads = append(t.BinPayloads, []byte(p)) + } + for _, d := range t.Steps { + if d["data"] != "" { + t.BinPayloads = append(t.BinPayloads, []byte(d["data"])) + } + } + if t.Protocol == "" { + t.Protocol = "http" + } + +} + // TransBranch branch transaction type TransBranch = storage.TransBranchStore @@ -61,17 +77,7 @@ func TransFromContext(c *gin.Context) *TransGlobal { m := TransGlobal{} dtmimp.MustUnmarshal(b, &m) logger.Debugf("creating trans in prepare") - // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal - for _, p := range m.Payloads { - m.BinPayloads = append(m.BinPayloads, []byte(p)) - } - for _, d := range m.Steps { - if d["data"] != "" { - m.BinPayloads = append(m.BinPayloads, []byte(d["data"])) - } - } - m.Protocol = "http" - + m.setupPayloads() m.Ext.Headers = map[string]string{} if len(m.PassthroughHeaders) > 0 { for _, h := range m.PassthroughHeaders { diff --git a/dtmutil/consts.go b/dtmutil/consts.go index 08388e2..d8c4345 100644 --- a/dtmutil/consts.go +++ b/dtmutil/consts.go @@ -9,8 +9,8 @@ package dtmutil const ( // DefaultHTTPServer default url for http server. used by test and examples DefaultHTTPServer = "http://localhost:36789/api/dtmsvr" + // DefaultJrpcServer default url for http json-rpc server. used by test and examples + DefaultJrpcServer = "http://localhost:36789/api/json-rpc" // DefaultGrpcServer default url for grpc server. used by test and examples DefaultGrpcServer = "localhost:36790" - // DefaultJrpcServer default url for http json-rpc server. used by test and examples - DefaultJrpcServer = "http://localhost:36791" ) diff --git a/test/msg_jrpc_test.go b/test/msg_jrpc_test.go index 16d04bb..f94c2c8 100644 --- a/test/msg_jrpc_test.go +++ b/test/msg_jrpc_test.go @@ -10,17 +10,62 @@ import ( "testing" "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmutil" + "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" ) func TestMsgJrpcNormal(t *testing.T) { - resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ + msg := genJrpcMsg(dtmimp.GetFuncName()) + msg.Submit() + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) +} + +func TestMsgJrpcRepeated(t *testing.T) { + msg := genJrpcMsg(dtmimp.GetFuncName()) + msg.Submit() + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) + err := msg.Submit() + assert.Error(t, err) +} +func TestMsgJprcAbnormal(t *testing.T) { + id := "no-use" + resp, err := dtmcli.GetRestyClient().R().SetBody("hello").Post(dtmutil.DefaultJrpcServer) + assert.Nil(t, err) + assert.Contains(t, resp.String(), "-32700") + + resp, err = dtmcli.GetRestyClient().R().SetBody(map[string]string{ + "jsonrpc": "1.0", + "method": "newGid", + "params": "", + "id": id, + }).Post(dtmutil.DefaultJrpcServer) + assert.Nil(t, err) + assert.Contains(t, resp.String(), "-32600") + + resp, err = dtmcli.GetRestyClient().R().SetBody(map[string]string{ "jsonrpc": "2.0", - "method": "dtmserver.newGid", + "method": "not-exists", "params": "", - "id": "TestMsgJrpcNormal", + "id": id, }).Post(dtmutil.DefaultJrpcServer) assert.Nil(t, err) - assert.Contains(t, resp.String(), "gid") + assert.Contains(t, resp.String(), "-32601") +} + +func genJrpcMsg(gid string) *dtmcli.Msg { + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid). + Add(busi.Busi+"/TransOut", &req). + Add(busi.Busi+"/TransIn", &req) + msg.QueryPrepared = busi.Busi + "/QueryPrepared" + msg.Protocol = "json-rpc" + return msg }