From cbb42755d38b84be408fd81c9de26b08ff9cfb47 Mon Sep 17 00:00:00 2001 From: pandaLIU <563883861@qq.com> Date: Tue, 22 Feb 2022 22:14:32 +0800 Subject: [PATCH] Added json rpc http --- conf.sample.yml | 1 + dtmsvr/api_json_rpc_http.go | 110 ++++++++++++++++++++++++++++++++++++ dtmsvr/config/config.go | 1 + dtmsvr/svr.go | 12 ++++ dtmsvr/trans_class.go | 11 ++++ 5 files changed, 135 insertions(+) create mode 100644 dtmsvr/api_json_rpc_http.go diff --git a/conf.sample.yml b/conf.sample.yml index a4e869c..d9052a5 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -56,6 +56,7 @@ # HttpPort: 36789 # GrpcPort: 36790 +# JsonRpcHttp: 36791 ### advanced options # UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status diff --git a/dtmsvr/api_json_rpc_http.go b/dtmsvr/api_json_rpc_http.go new file mode 100644 index 0000000..4f174e4 --- /dev/null +++ b/dtmsvr/api_json_rpc_http.go @@ -0,0 +1,110 @@ +package dtmsvr + +import ( + "encoding/json" + "fmt" + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/gin-gonic/gin" + "net/http" +) + +type jsonRpcHttpReq struct { + Method string `json:"method"` + Jsonrpc string `json:"jsonrpc"` + Params interface{} `json:"params"` + Id string `json:"id"` +} + +func addJsonRpcHttpRouter(engine *gin.Engine) { + engine.POST("/", dispatcher) +} + +func dispatcher(c *gin.Context) { + req := new(jsonRpcHttpReq) + err := c.BindJSON(req) + logger.Infof("request:%s\n", req) + if err != nil { + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}}) + return + } + if req.Method == "dtmserver.NewGid" { + res, err := jsonRpcHttpNewGid() + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": err}) + return + } + + if req.Method == "dtmserver.Prepare" { + res := jsonRpcHttpPrepare(req.Params) + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil}) + return + } + + if req.Method == "dtmserver.Submit" { + res := jsonRpcHttpSubmit(req.Params) + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil}) + return + } + + if req.Method == "dtmserver.Abort" { + res := jsonRpcHttpAbort(req.Params) + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil}) + return + } + + if req.Method == "dtmserver.RegisterBranch" { + res := jsonRpcHttpRegisterBranch(req.Params) + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil}) + return + } + c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}}) + return +} + +func jsonRpcHttpNewGid() (interface{}, error) { + return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}, nil +} + +func jsonRpcHttpPrepare(params interface{}) interface{} { + res := svcPrepare(TransFromJsonRpcHttpContext(params)) + if res == nil { + return map[string]string{"dtm_result": "SUCCESS"} + } + return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)} +} + +func jsonRpcHttpSubmit(params interface{}) interface{} { + res := svcSubmit(TransFromJsonRpcHttpContext(params)) + if res == nil { + return map[string]string{"dtm_result": "SUCCESS"} + } + return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)} +} + +func jsonRpcHttpAbort(params interface{}) interface{} { + res := svcAbort(TransFromJsonRpcHttpContext(params)) + if res == nil { + return map[string]string{"dtm_result": "SUCCESS"} + } + return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)} +} + +func jsonRpcHttpRegisterBranch(params interface{}) interface{} { + data := map[string]string{} + paramsJson, _ := json.Marshal(params) + err := json.Unmarshal(paramsJson, &data) + if err != nil { + return map[string]string{"dtm_result": "FAILURE", "message": err.Error()} + } + branch := TransBranch{ + Gid: data["gid"], + BranchID: data["branch_id"], + Status: dtmcli.StatusPrepared, + BinData: []byte(data["data"]), + } + res := svcRegisterBranch(data["trans_type"], &branch, data) + if res == nil { + return map[string]string{"dtm_result": "SUCCESS"} + } + return map[string]string{"dtm_result": "FAILURE", "message": res.Error()} +} diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go index 06a8fda..4f9ce1c 100644 --- a/dtmsvr/config/config.go +++ b/dtmsvr/config/config.go @@ -76,6 +76,7 @@ type configType struct { RequestTimeout int64 `yaml:"RequestTimeout" default:"3"` HTTPPort int64 `yaml:"HttpPort" default:"36789"` GrpcPort int64 `yaml:"GrpcPort" default:"36790"` + JsonRpcHttpPort int64 `yaml:"JsonRpcHttpPort" default:"36791"` MicroService MicroService `yaml:"MicroService"` UpdateBranchSync int64 `yaml:"UpdateBranchSync"` UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"` diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index 1816087..7989986 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -66,6 +66,18 @@ func StartSvr() { logger.FatalIfError(err) err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint) logger.FatalIfError(err) + + // start json-rpc server + jsonRpcHttpApp := dtmutil.GetGinApp() + jsonRpcHttpApp = httpMetrics(jsonRpcHttpApp) + addJsonRpcHttpRouter(jsonRpcHttpApp) + logger.Infof("dtmsvr listen at: %d", conf.JsonRpcHttpPort) + go func() { + err := jsonRpcHttpApp.Run(fmt.Sprintf(":%d", conf.JsonRpcHttpPort)) + if err != nil { + logger.Errorf("start server err: %v", err) + } + }() } // PopulateDB setup mysql data diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index b6e080b..8b039b9 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -8,6 +8,7 @@ package dtmsvr import ( "context" + "encoding/json" "time" "github.com/dtm-labs/dtm/dtmcli" @@ -84,6 +85,16 @@ func TransFromContext(c *gin.Context) *TransGlobal { return &m } +func TransFromJsonRpcHttpContext(params interface{}) *TransGlobal { + jsonStr, _ := json.Marshal(params) + m := TransGlobal{} + err := json.Unmarshal(jsonStr, &m) + if err != nil { + return nil + } + return &m +} + // TransFromDtmRequest TransFromContext func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal { o := &dtmgpb.DtmTransOptions{}