diff --git a/conf.sample.yml b/conf.sample.yml index 1fd557e..673c9e9 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -56,7 +56,7 @@ # HttpPort: 36789 # GrpcPort: 36790 -# JSONRPC: 36791 +# JsonRpcPort: 36791 ### advanced options # UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status diff --git a/dtmsvr/api_json_rpc.go b/dtmsvr/api_json_rpc.go new file mode 100644 index 0000000..7ced4e6 --- /dev/null +++ b/dtmsvr/api_json_rpc.go @@ -0,0 +1,137 @@ +package dtmsvr + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/gin-gonic/gin" +) + +const jrpcCodeFailure = -32901 +const jrpcCodeOngoing = -32902 + +type jrpcReq struct { + Method string `json:"method"` + Jsonrpc string `json:"jsonrpc"` + Params interface{} `json:"params"` + ID string `json:"id"` +} + +func addJrpcRouter(engine *gin.Engine) { + type jrpcFunc = func(interface{}) interface{} + handlers := map[string]jrpcFunc{ + "dtmserver.NewGid": jrpcNewGid, + "dtmserver.Prepare": jrpcPrepare, + "dtmserver.Submit": jrpcSubmit, + "dtmserver.Abort": jrpcAbort, + "dtmserver.RegisterBranch": jrpcRegisterBranch, + } + engine.POST("/", func(c *gin.Context) { + began := time.Now() + var err error + var req jrpcReq + var jerr map[string]interface{} + r := func() interface{} { + defer dtmimp.P2E(&err) + err2 := c.BindJSON(&req) + if err2 != nil { + jerr = map[string]interface{}{ + "code": -32700, + "message": fmt.Sprintf("Parse json error: %s", err2.Error()), + } + } else if req.ID == "" || req.Jsonrpc != "2.0" { + jerr = map[string]interface{}{ + "code": -32600, + "message": fmt.Sprintf("Bad json request: %s", dtmimp.MustMarshalString(req)), + } + } else if handlers[req.Method] == nil { + jerr = map[string]interface{}{ + "code": -32601, + "message": fmt.Sprintf("Method not found: %s", req.Method), + } + } else if handlers[req.Method] != nil { + return handlers[req.Method](req.Params) + } + return nil + }() + + // error maybe returned in r, assign it to err + if ne, ok := r.(error); ok && err == nil { + err = ne + } + + if err != nil { + if errors.Is(err, dtmcli.ErrFailure) { + jerr = map[string]interface{}{ + "code": jrpcCodeFailure, + "message": err.Error(), + } + } else if errors.Is(err, dtmcli.ErrOngoing) { + jerr = map[string]interface{}{ + "code": jrpcCodeOngoing, + "message": err.Error(), + } + } else if jerr == nil { + jerr = map[string]interface{}{ + "code": -32603, + "message": err.Error(), + } + } + } + + result := map[string]interface{}{ + "jsonrpc": "2.0", + "id": req.ID, + "error": jerr, + "result": r, + } + b, _ := json.Marshal(result) + cont := string(b) + if jerr == nil || jerr["code"] == jrpcCodeOngoing { + logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont) + } else { + logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont) + } + c.JSON(200, result) + }) +} + +// TransFromJrpcParams construct TransGlobal from jrpc params +func TransFromJrpcParams(params interface{}) *TransGlobal { + t := TransGlobal{} + dtmimp.MustRemarshal(params, &t) + return &t +} + +func jrpcNewGid(interface{}) interface{} { + return map[string]interface{}{"gid": GenGid()} +} + +func jrpcPrepare(params interface{}) interface{} { + return svcPrepare(TransFromJrpcParams(params)) +} + +func jrpcSubmit(params interface{}) interface{} { + return svcSubmit(TransFromJrpcParams(params)) +} + +func jrpcAbort(params interface{}) interface{} { + return svcAbort(TransFromJrpcParams(params)) +} + +func jrpcRegisterBranch(params interface{}) interface{} { + data := map[string]string{} + dtmimp.MustRemarshal(params, &data) + branch := TransBranch{ + Gid: data["gid"], + BranchID: data["branch_id"], + Status: dtmcli.StatusPrepared, + BinData: []byte(data["data"]), + } + return svcRegisterBranch(data["trans_type"], &branch, data) +} diff --git a/dtmsvr/api_json_rpc_http.go b/dtmsvr/api_json_rpc_http.go deleted file mode 100644 index edfc426..0000000 --- a/dtmsvr/api_json_rpc_http.go +++ /dev/null @@ -1,110 +0,0 @@ -package dtmsvr - -import ( - "encoding/json" - "fmt" - "net/http" - - "github.com/dtm-labs/dtm/dtmcli" - "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/gin-gonic/gin" -) - -type jsonRPCReq struct { - Method string `json:"method"` - Jsonrpc string `json:"jsonrpc"` - Params interface{} `json:"params"` - ID string `json:"id"` -} - -func addJSONRPCRouter(engine *gin.Engine) { - engine.POST("/", dispatcher) -} - -func dispatcher(c *gin.Context) { - req := new(jsonRPCReq) - err := c.BindJSON(req) - logger.Infof("request:%s\n", req) - if err != nil { - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}}) - return - } - if req.Method == "dtmserver.NewGid" { - res := jsonRPCNewGid() - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": err}) - return - } - - if req.Method == "dtmserver.Prepare" { - res := jsonRPCPrepare(req.Params) - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil}) - return - } - - if req.Method == "dtmserver.Submit" { - res := jsonRPCSubmit(req.Params) - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil}) - return - } - - if req.Method == "dtmserver.Abort" { - res := jsonRPCAbort(req.Params) - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil}) - return - } - - if req.Method == "dtmserver.RegisterBranch" { - res := jsonRPCRegisterBranch(req.Params) - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil}) - return - } - c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}}) -} - -func jsonRPCNewGid() interface{} { - return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess} -} - -func jsonRPCPrepare(params interface{}) interface{} { - res := svcPrepare(TransFromJSONRPCContext(params)) - if res == nil { - return map[string]string{"dtm_result": "SUCCESS"} - } - return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)} -} - -func jsonRPCSubmit(params interface{}) interface{} { - res := svcSubmit(TransFromJSONRPCContext(params)) - if res == nil { - return map[string]string{"dtm_result": "SUCCESS"} - } - return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)} -} - -func jsonRPCAbort(params interface{}) interface{} { - res := svcAbort(TransFromJSONRPCContext(params)) - if res == nil { - return map[string]string{"dtm_result": "SUCCESS"} - } - return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)} -} - -func jsonRPCRegisterBranch(params interface{}) interface{} { - data := map[string]string{} - paramsJSON, _ := json.Marshal(params) - err := json.Unmarshal(paramsJSON, &data) - if err != nil { - return map[string]string{"dtm_result": "FAILURE", "message": err.Error()} - } - branch := TransBranch{ - Gid: data["gid"], - BranchID: data["branch_id"], - Status: dtmcli.StatusPrepared, - BinData: []byte(data["data"]), - } - res := svcRegisterBranch(data["trans_type"], &branch, data) - if res == nil { - return map[string]string{"dtm_result": "SUCCESS"} - } - return map[string]string{"dtm_result": "FAILURE", "message": res.Error()} -} diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go index 7f69588..750af5e 100644 --- a/dtmsvr/config/config.go +++ b/dtmsvr/config/config.go @@ -76,7 +76,7 @@ type configType struct { RequestTimeout int64 `yaml:"RequestTimeout" default:"3"` HTTPPort int64 `yaml:"HttpPort" default:"36789"` GrpcPort int64 `yaml:"GrpcPort" default:"36790"` - JSONRPCPort int64 `yaml:"JSONRPCPort" default:"36791"` + JSONRPCPort int64 `yaml:"JsonRpcPort" default:"36791"` MicroService MicroService `yaml:"MicroService"` UpdateBranchSync int64 `yaml:"UpdateBranchSync"` UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"` diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index 62b06d6..c4b1aa0 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -38,7 +38,7 @@ func StartSvr() { app := dtmutil.GetGinApp() app = httpMetrics(app) addRoute(app) - logger.Infof("dtmsvr listen at: %d", conf.HTTPPort) + logger.Infof("dtmsvr http listen at: %d", conf.HTTPPort) go func() { err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort)) if err != nil { @@ -46,6 +46,18 @@ func StartSvr() { } }() + // start json-rpc server + jrpcApp := dtmutil.GetGinApp() + jrpcApp = httpMetrics(jrpcApp) + addJrpcRouter(jrpcApp) + logger.Infof("dtmsvr json-rpc listen at: %d", conf.JSONRPCPort) + go func() { + err := jrpcApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort)) + if err != nil { + logger.Errorf("start server err: %v", err) + } + }() + // start grpc server lis, err := net.Listen("tcp", fmt.Sprintf(":%d", conf.GrpcPort)) logger.FatalIfError(err) @@ -66,18 +78,6 @@ func StartSvr() { logger.FatalIfError(err) err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint) logger.FatalIfError(err) - - // start json-rpc server - jsonRPCApp := dtmutil.GetGinApp() - jsonRPCApp = httpMetrics(jsonRPCApp) - addJSONRPCRouter(jsonRPCApp) - logger.Infof("dtmsvr listen at: %d", conf.JSONRPCPort) - go func() { - err := jsonRPCApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort)) - if err != nil { - logger.Errorf("start server err: %v", err) - } - }() } // PopulateDB setup mysql data diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 48538cc..b6e080b 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -8,7 +8,6 @@ package dtmsvr import ( "context" - "encoding/json" "time" "github.com/dtm-labs/dtm/dtmcli" @@ -85,17 +84,6 @@ func TransFromContext(c *gin.Context) *TransGlobal { return &m } -// TransFromJSONRPCContext 1 -func TransFromJSONRPCContext(params interface{}) *TransGlobal { - jsonStr, _ := json.Marshal(params) - m := TransGlobal{} - err := json.Unmarshal(jsonStr, &m) - if err != nil { - return nil - } - return &m -} - // TransFromDtmRequest TransFromContext func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal { o := &dtmgpb.DtmTransOptions{} diff --git a/dtmutil/utils.go b/dtmutil/utils.go index 849b539..718e6d7 100644 --- a/dtmutil/utils.go +++ b/dtmutil/utils.go @@ -10,6 +10,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io/ioutil" "net/http" "os" @@ -105,6 +106,84 @@ func WrapHandler2(fn func(*gin.Context) interface{}) gin.HandlerFunc { } } +const jrpcCodeFailure = -32901 +const jrpcCodeOngoing = -32902 + +// JrpcReq json-rpc request +type JrpcReq struct { + Method string `json:"method"` + Jsonrpc string `json:"jsonrpc"` + Params interface{} `json:"params"` + ID string `json:"id"` +} + +// WrapJrpcHandler wrap a gin func to be a gin handler func +func WrapJrpcHandler(fn func(*JrpcReq) interface{}) gin.HandlerFunc { + return func(c *gin.Context) { + began := time.Now() + var err error + var req JrpcReq + var jerr map[string]interface{} + r := func() interface{} { + defer dtmimp.P2E(&err) + err2 := c.BindJSON(&req) + if err2 != nil { + jerr = map[string]interface{}{ + "code": -32700, + "message": fmt.Sprintf("Parse json error: %s", err2.Error()), + } + } else if req.ID == "" || req.Jsonrpc != "2.0" { + jerr = map[string]interface{}{ + "code": -32600, + "message": fmt.Sprintf("Bad json request: %s", dtmimp.MustMarshalString(req)), + } + } else { + return fn(&req) + } + return nil + }() + + // error maybe returned in r, assign it to err + if ne, ok := r.(error); ok && err == nil { + err = ne + } + + if err != nil { + if errors.Is(err, dtmcli.ErrFailure) { + jerr = map[string]interface{}{ + "code": jrpcCodeFailure, + "message": err.Error(), + } + } else if errors.Is(err, dtmcli.ErrOngoing) { + jerr = map[string]interface{}{ + "code": jrpcCodeOngoing, + "message": err.Error(), + } + } else if jerr == nil { + jerr = map[string]interface{}{ + "code": -32603, + "message": err.Error(), + } + } + } + + result := map[string]interface{}{ + "jsonrpc": "2.0", + "id": req.ID, + "error": jerr, + "result": r, + } + b, _ := json.Marshal(result) + cont := string(b) + if jerr == nil || jerr["code"] == jrpcCodeOngoing { + logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont) + } else { + logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont) + } + c.JSON(200, result) + } +} + // MustGetwd must version of os.Getwd func MustGetwd() string { wd, err := os.Getwd()