Browse Source

add jrpc tests

pull/234/head
yedf2 4 years ago
parent
commit
623eef5ae1
  1. 23
      dtmcli/dtmimp/trans_base.go
  2. 13
      dtmsvr/api_json_rpc.go
  3. 13
      dtmsvr/svr.go
  4. 28
      dtmsvr/trans_class.go
  5. 4
      dtmutil/consts.go
  6. 53
      test/msg_jrpc_test.go

23
dtmcli/dtmimp/trans_base.go

@ -65,6 +65,7 @@ type TransBase struct {
Op string `json:"-"` // used in XA/TCC
QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG
Protocol string `json:"protocol"`
}
// NewTransBase new a TransBase
@ -93,6 +94,25 @@ func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
if tb.RequestTimeout != 0 {
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
}
if tb.Protocol == "json-rpc" {
var result map[string]interface{}
resp, err := RestyClient.R().
SetBody(map[string]interface{}{
"jsonrpc": "2.0",
"id": "no-use",
"method": operation,
"params": body,
}).
SetResult(&result).
Post(tb.Dtm)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK || result["error"] != nil {
return errors.New(resp.String())
}
return nil
}
resp, err := RestyClient.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
if err != nil {
@ -118,6 +138,9 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
// TransRequestBranch TransBase request branch result
func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
if url == "" {
return nil, nil
}
resp, err := RestyClient.R().
SetBody(body).
SetQueryParams(map[string]string{

13
dtmsvr/api_json_rpc.go

@ -25,13 +25,13 @@ type jrpcReq struct {
func addJrpcRouter(engine *gin.Engine) {
type jrpcFunc = func(interface{}) interface{}
handlers := map[string]jrpcFunc{
"dtmserver.newGid": jrpcNewGid,
"dtmserver.prepare": jrpcPrepare,
"dtmserver.submit": jrpcSubmit,
"dtmserver.abort": jrpcAbort,
"dtmserver.registerBranch": jrpcRegisterBranch,
"newGid": jrpcNewGid,
"prepare": jrpcPrepare,
"submit": jrpcSubmit,
"abort": jrpcAbort,
"registerBranch": jrpcRegisterBranch,
}
engine.POST("/", func(c *gin.Context) {
engine.POST("/api/json-rpc", func(c *gin.Context) {
began := time.Now()
var err error
var req jrpcReq
@ -105,6 +105,7 @@ func addJrpcRouter(engine *gin.Engine) {
func TransFromJrpcParams(params interface{}) *TransGlobal {
t := TransGlobal{}
dtmimp.MustRemarshal(params, &t)
t.setupPayloads()
return &t
}

13
dtmsvr/svr.go

@ -43,6 +43,7 @@ func StartSvr() {
app := dtmutil.GetGinApp()
app = httpMetrics(app)
addRoute(app)
addJrpcRouter(app)
logger.Infof("dtmsvr http listen at: %d", conf.HTTPPort)
go func() {
err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
@ -51,18 +52,6 @@ func StartSvr() {
}
}()
// start json-rpc server
jrpcApp := dtmutil.GetGinApp()
jrpcApp = httpMetrics(jrpcApp)
addJrpcRouter(jrpcApp)
logger.Infof("dtmsvr json-rpc listen at: %d", conf.JSONRPCPort)
go func() {
err := jrpcApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort))
if err != nil {
logger.Errorf("start server err: %v", err)
}
}()
// start grpc server
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", conf.GrpcPort))
logger.FatalIfError(err)

28
dtmsvr/trans_class.go

@ -26,6 +26,22 @@ type TransGlobal struct {
updateBranchSync bool
}
func (t *TransGlobal) setupPayloads() {
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range t.Payloads {
t.BinPayloads = append(t.BinPayloads, []byte(p))
}
for _, d := range t.Steps {
if d["data"] != "" {
t.BinPayloads = append(t.BinPayloads, []byte(d["data"]))
}
}
if t.Protocol == "" {
t.Protocol = "http"
}
}
// TransBranch branch transaction
type TransBranch = storage.TransBranchStore
@ -61,17 +77,7 @@ func TransFromContext(c *gin.Context) *TransGlobal {
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
logger.Debugf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))
}
for _, d := range m.Steps {
if d["data"] != "" {
m.BinPayloads = append(m.BinPayloads, []byte(d["data"]))
}
}
m.Protocol = "http"
m.setupPayloads()
m.Ext.Headers = map[string]string{}
if len(m.PassthroughHeaders) > 0 {
for _, h := range m.PassthroughHeaders {

4
dtmutil/consts.go

@ -9,8 +9,8 @@ package dtmutil
const (
// DefaultHTTPServer default url for http server. used by test and examples
DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"
// DefaultJrpcServer default url for http json-rpc server. used by test and examples
DefaultJrpcServer = "http://localhost:36789/api/json-rpc"
// DefaultGrpcServer default url for grpc server. used by test and examples
DefaultGrpcServer = "localhost:36790"
// DefaultJrpcServer default url for http json-rpc server. used by test and examples
DefaultJrpcServer = "http://localhost:36791"
)

53
test/msg_jrpc_test.go

@ -10,17 +10,62 @@ import (
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgJrpcNormal(t *testing.T) {
resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{
msg := genJrpcMsg(dtmimp.GetFuncName())
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgJrpcRepeated(t *testing.T) {
msg := genJrpcMsg(dtmimp.GetFuncName())
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
err := msg.Submit()
assert.Error(t, err)
}
func TestMsgJprcAbnormal(t *testing.T) {
id := "no-use"
resp, err := dtmcli.GetRestyClient().R().SetBody("hello").Post(dtmutil.DefaultJrpcServer)
assert.Nil(t, err)
assert.Contains(t, resp.String(), "-32700")
resp, err = dtmcli.GetRestyClient().R().SetBody(map[string]string{
"jsonrpc": "1.0",
"method": "newGid",
"params": "",
"id": id,
}).Post(dtmutil.DefaultJrpcServer)
assert.Nil(t, err)
assert.Contains(t, resp.String(), "-32600")
resp, err = dtmcli.GetRestyClient().R().SetBody(map[string]string{
"jsonrpc": "2.0",
"method": "dtmserver.newGid",
"method": "not-exists",
"params": "",
"id": "TestMsgJrpcNormal",
"id": id,
}).Post(dtmutil.DefaultJrpcServer)
assert.Nil(t, err)
assert.Contains(t, resp.String(), "gid")
assert.Contains(t, resp.String(), "-32601")
}
func genJrpcMsg(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
msg.Protocol = "json-rpc"
return msg
}

Loading…
Cancel
Save