Browse Source

json rpc refactored

pull/230/head
yedf2 4 years ago
parent
commit
10b1b898ec
  1. 2
      conf.sample.yml
  2. 137
      dtmsvr/api_json_rpc.go
  3. 110
      dtmsvr/api_json_rpc_http.go
  4. 2
      dtmsvr/config/config.go
  5. 26
      dtmsvr/svr.go
  6. 12
      dtmsvr/trans_class.go
  7. 79
      dtmutil/utils.go

2
conf.sample.yml

@ -56,7 +56,7 @@
# HttpPort: 36789
# GrpcPort: 36790
# JSONRPC: 36791
# JsonRpcPort: 36791
### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status

137
dtmsvr/api_json_rpc.go

@ -0,0 +1,137 @@
package dtmsvr
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/gin-gonic/gin"
)
const jrpcCodeFailure = -32901
const jrpcCodeOngoing = -32902
type jrpcReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
Params interface{} `json:"params"`
ID string `json:"id"`
}
func addJrpcRouter(engine *gin.Engine) {
type jrpcFunc = func(interface{}) interface{}
handlers := map[string]jrpcFunc{
"dtmserver.NewGid": jrpcNewGid,
"dtmserver.Prepare": jrpcPrepare,
"dtmserver.Submit": jrpcSubmit,
"dtmserver.Abort": jrpcAbort,
"dtmserver.RegisterBranch": jrpcRegisterBranch,
}
engine.POST("/", func(c *gin.Context) {
began := time.Now()
var err error
var req jrpcReq
var jerr map[string]interface{}
r := func() interface{} {
defer dtmimp.P2E(&err)
err2 := c.BindJSON(&req)
if err2 != nil {
jerr = map[string]interface{}{
"code": -32700,
"message": fmt.Sprintf("Parse json error: %s", err2.Error()),
}
} else if req.ID == "" || req.Jsonrpc != "2.0" {
jerr = map[string]interface{}{
"code": -32600,
"message": fmt.Sprintf("Bad json request: %s", dtmimp.MustMarshalString(req)),
}
} else if handlers[req.Method] == nil {
jerr = map[string]interface{}{
"code": -32601,
"message": fmt.Sprintf("Method not found: %s", req.Method),
}
} else if handlers[req.Method] != nil {
return handlers[req.Method](req.Params)
}
return nil
}()
// error maybe returned in r, assign it to err
if ne, ok := r.(error); ok && err == nil {
err = ne
}
if err != nil {
if errors.Is(err, dtmcli.ErrFailure) {
jerr = map[string]interface{}{
"code": jrpcCodeFailure,
"message": err.Error(),
}
} else if errors.Is(err, dtmcli.ErrOngoing) {
jerr = map[string]interface{}{
"code": jrpcCodeOngoing,
"message": err.Error(),
}
} else if jerr == nil {
jerr = map[string]interface{}{
"code": -32603,
"message": err.Error(),
}
}
}
result := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
"error": jerr,
"result": r,
}
b, _ := json.Marshal(result)
cont := string(b)
if jerr == nil || jerr["code"] == jrpcCodeOngoing {
logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
} else {
logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
}
c.JSON(200, result)
})
}
// TransFromJrpcParams construct TransGlobal from jrpc params
func TransFromJrpcParams(params interface{}) *TransGlobal {
t := TransGlobal{}
dtmimp.MustRemarshal(params, &t)
return &t
}
func jrpcNewGid(interface{}) interface{} {
return map[string]interface{}{"gid": GenGid()}
}
func jrpcPrepare(params interface{}) interface{} {
return svcPrepare(TransFromJrpcParams(params))
}
func jrpcSubmit(params interface{}) interface{} {
return svcSubmit(TransFromJrpcParams(params))
}
func jrpcAbort(params interface{}) interface{} {
return svcAbort(TransFromJrpcParams(params))
}
func jrpcRegisterBranch(params interface{}) interface{} {
data := map[string]string{}
dtmimp.MustRemarshal(params, &data)
branch := TransBranch{
Gid: data["gid"],
BranchID: data["branch_id"],
Status: dtmcli.StatusPrepared,
BinData: []byte(data["data"]),
}
return svcRegisterBranch(data["trans_type"], &branch, data)
}

110
dtmsvr/api_json_rpc_http.go

@ -1,110 +0,0 @@
package dtmsvr
import (
"encoding/json"
"fmt"
"net/http"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/gin-gonic/gin"
)
type jsonRPCReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
Params interface{} `json:"params"`
ID string `json:"id"`
}
func addJSONRPCRouter(engine *gin.Engine) {
engine.POST("/", dispatcher)
}
func dispatcher(c *gin.Context) {
req := new(jsonRPCReq)
err := c.BindJSON(req)
logger.Infof("request:%s\n", req)
if err != nil {
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}})
return
}
if req.Method == "dtmserver.NewGid" {
res := jsonRPCNewGid()
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": err})
return
}
if req.Method == "dtmserver.Prepare" {
res := jsonRPCPrepare(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.Submit" {
res := jsonRPCSubmit(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.Abort" {
res := jsonRPCAbort(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.RegisterBranch" {
res := jsonRPCRegisterBranch(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}})
}
func jsonRPCNewGid() interface{} {
return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}
}
func jsonRPCPrepare(params interface{}) interface{} {
res := svcPrepare(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
func jsonRPCSubmit(params interface{}) interface{} {
res := svcSubmit(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
func jsonRPCAbort(params interface{}) interface{} {
res := svcAbort(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
func jsonRPCRegisterBranch(params interface{}) interface{} {
data := map[string]string{}
paramsJSON, _ := json.Marshal(params)
err := json.Unmarshal(paramsJSON, &data)
if err != nil {
return map[string]string{"dtm_result": "FAILURE", "message": err.Error()}
}
branch := TransBranch{
Gid: data["gid"],
BranchID: data["branch_id"],
Status: dtmcli.StatusPrepared,
BinData: []byte(data["data"]),
}
res := svcRegisterBranch(data["trans_type"], &branch, data)
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": res.Error()}
}

2
dtmsvr/config/config.go

@ -76,7 +76,7 @@ type configType struct {
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
JSONRPCPort int64 `yaml:"JSONRPCPort" default:"36791"`
JSONRPCPort int64 `yaml:"JsonRpcPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`

26
dtmsvr/svr.go

@ -38,7 +38,7 @@ func StartSvr() {
app := dtmutil.GetGinApp()
app = httpMetrics(app)
addRoute(app)
logger.Infof("dtmsvr listen at: %d", conf.HTTPPort)
logger.Infof("dtmsvr http listen at: %d", conf.HTTPPort)
go func() {
err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
if err != nil {
@ -46,6 +46,18 @@ func StartSvr() {
}
}()
// start json-rpc server
jrpcApp := dtmutil.GetGinApp()
jrpcApp = httpMetrics(jrpcApp)
addJrpcRouter(jrpcApp)
logger.Infof("dtmsvr json-rpc listen at: %d", conf.JSONRPCPort)
go func() {
err := jrpcApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort))
if err != nil {
logger.Errorf("start server err: %v", err)
}
}()
// start grpc server
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", conf.GrpcPort))
logger.FatalIfError(err)
@ -66,18 +78,6 @@ func StartSvr() {
logger.FatalIfError(err)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
// start json-rpc server
jsonRPCApp := dtmutil.GetGinApp()
jsonRPCApp = httpMetrics(jsonRPCApp)
addJSONRPCRouter(jsonRPCApp)
logger.Infof("dtmsvr listen at: %d", conf.JSONRPCPort)
go func() {
err := jsonRPCApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort))
if err != nil {
logger.Errorf("start server err: %v", err)
}
}()
}
// PopulateDB setup mysql data

12
dtmsvr/trans_class.go

@ -8,7 +8,6 @@ package dtmsvr
import (
"context"
"encoding/json"
"time"
"github.com/dtm-labs/dtm/dtmcli"
@ -85,17 +84,6 @@ func TransFromContext(c *gin.Context) *TransGlobal {
return &m
}
// TransFromJSONRPCContext 1
func TransFromJSONRPCContext(params interface{}) *TransGlobal {
jsonStr, _ := json.Marshal(params)
m := TransGlobal{}
err := json.Unmarshal(jsonStr, &m)
if err != nil {
return nil
}
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal {
o := &dtmgpb.DtmTransOptions{}

79
dtmutil/utils.go

@ -10,6 +10,7 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
@ -105,6 +106,84 @@ func WrapHandler2(fn func(*gin.Context) interface{}) gin.HandlerFunc {
}
}
const jrpcCodeFailure = -32901
const jrpcCodeOngoing = -32902
// JrpcReq json-rpc request
type JrpcReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
Params interface{} `json:"params"`
ID string `json:"id"`
}
// WrapJrpcHandler wrap a gin func to be a gin handler func
func WrapJrpcHandler(fn func(*JrpcReq) interface{}) gin.HandlerFunc {
return func(c *gin.Context) {
began := time.Now()
var err error
var req JrpcReq
var jerr map[string]interface{}
r := func() interface{} {
defer dtmimp.P2E(&err)
err2 := c.BindJSON(&req)
if err2 != nil {
jerr = map[string]interface{}{
"code": -32700,
"message": fmt.Sprintf("Parse json error: %s", err2.Error()),
}
} else if req.ID == "" || req.Jsonrpc != "2.0" {
jerr = map[string]interface{}{
"code": -32600,
"message": fmt.Sprintf("Bad json request: %s", dtmimp.MustMarshalString(req)),
}
} else {
return fn(&req)
}
return nil
}()
// error maybe returned in r, assign it to err
if ne, ok := r.(error); ok && err == nil {
err = ne
}
if err != nil {
if errors.Is(err, dtmcli.ErrFailure) {
jerr = map[string]interface{}{
"code": jrpcCodeFailure,
"message": err.Error(),
}
} else if errors.Is(err, dtmcli.ErrOngoing) {
jerr = map[string]interface{}{
"code": jrpcCodeOngoing,
"message": err.Error(),
}
} else if jerr == nil {
jerr = map[string]interface{}{
"code": -32603,
"message": err.Error(),
}
}
}
result := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
"error": jerr,
"result": r,
}
b, _ := json.Marshal(result)
cont := string(b)
if jerr == nil || jerr["code"] == jrpcCodeOngoing {
logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
} else {
logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
}
c.JSON(200, result)
}
}
// MustGetwd must version of os.Getwd
func MustGetwd() string {
wd, err := os.Getwd()

Loading…
Cancel
Save