Browse Source

Merge branch 'main' of github.com:dtm-labs/dtm into reset-cron-time

pull/232/head
xyctruth 4 years ago
parent
commit
61b7bc7ba4
  1. 2
      .github/workflows/release.yml
  2. 1
      conf.sample.yml
  3. 23
      dtmcli/dtmimp/trans_base.go
  4. 17
      dtmgrpc/dtmgimp/utils.go
  5. 130
      dtmgrpc/dtmgpb/dtmgimp.pb.go
  6. 1
      dtmgrpc/dtmgpb/dtmgimp.proto
  7. 4
      dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go
  8. 138
      dtmsvr/api_json_rpc.go
  9. 1
      dtmsvr/config/config.go
  10. 18
      dtmsvr/storage/registry/registry.go
  11. 6
      dtmsvr/storage/sql/sql.go
  12. 12
      dtmsvr/svr.go
  13. 29
      dtmsvr/trans_class.go
  14. 2
      dtmsvr/trans_status.go
  15. 2
      dtmutil/consts.go
  16. 2
      test/busi/busi.pb.go
  17. 4
      test/busi/busi_grpc.pb.go
  18. 71
      test/msg_jrpc_test.go
  19. 11
      test/saga_grpc_test.go

2
.github/workflows/release.yml

@ -1,6 +1,6 @@
name: Release name: Release
on: on:
create: push:
tags: tags:
- 'v*.*.*' - 'v*.*.*'

1
conf.sample.yml

@ -56,6 +56,7 @@
# HttpPort: 36789 # HttpPort: 36789
# GrpcPort: 36790 # GrpcPort: 36790
# JsonRpcPort: 36791
### advanced options ### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status # UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status

23
dtmcli/dtmimp/trans_base.go

@ -65,6 +65,7 @@ type TransBase struct {
Op string `json:"-"` // used in XA/TCC Op string `json:"-"` // used in XA/TCC
QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG
Protocol string `json:"protocol"`
} }
// NewTransBase new a TransBase // NewTransBase new a TransBase
@ -93,6 +94,25 @@ func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
if tb.RequestTimeout != 0 { if tb.RequestTimeout != 0 {
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
} }
if tb.Protocol == "json-rpc" {
var result map[string]interface{}
resp, err := RestyClient.R().
SetBody(map[string]interface{}{
"jsonrpc": "2.0",
"id": "no-use",
"method": operation,
"params": body,
}).
SetResult(&result).
Post(tb.Dtm)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK || result["error"] != nil {
return errors.New(resp.String())
}
return nil
}
resp, err := RestyClient.R(). resp, err := RestyClient.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
if err != nil { if err != nil {
@ -118,6 +138,9 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
// TransRequestBranch TransBase request branch result // TransRequestBranch TransBase request branch result
func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) { func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
if url == "" {
return nil, nil
}
resp, err := RestyClient.R(). resp, err := RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{

17
dtmgrpc/dtmgimp/utils.go

@ -36,6 +36,7 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
RetryInterval: s.RetryInterval, RetryInterval: s.RetryInterval,
PassthroughHeaders: s.PassthroughHeaders, PassthroughHeaders: s.PassthroughHeaders,
BranchHeaders: s.BranchHeaders, BranchHeaders: s.BranchHeaders,
RequestTimeout: s.RequestTimeout,
}, },
QueryPrepared: s.QueryPrepared, QueryPrepared: s.QueryPrepared,
CustomedData: s.CustomData, CustomedData: s.CustomData,
@ -100,3 +101,19 @@ func GetMetaFromContext(ctx context.Context, name string) string {
md, _ := metadata.FromIncomingContext(ctx) md, _ := metadata.FromIncomingContext(ctx)
return mdGet(md, name) return mdGet(md, name)
} }
type requestTimeoutKey struct{}
// RequestTimeoutFromContext returns requestTime of transOption option
func RequestTimeoutFromContext(ctx context.Context) int64 {
if v, ok := ctx.Value(requestTimeoutKey{}).(int64); ok {
return v
}
return 0
}
// RequestTimeoutNewContext sets requestTimeout of transOption option to context
func RequestTimeoutNewContext(ctx context.Context, requestTimeout int64) context.Context {
return context.WithValue(ctx, requestTimeoutKey{}, requestTimeout)
}

130
dtmgrpc/dtmgpb/dtmgimp.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.27.1 // protoc-gen-go v1.27.1
// protoc v3.17.3 // protoc v3.19.4
// source: dtmgrpc/dtmgpb/dtmgimp.proto // source: dtmgrpc/dtmgpb/dtmgimp.proto
package dtmgpb package dtmgpb
@ -31,6 +31,7 @@ type DtmTransOptions struct {
RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"` RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"`
PassthroughHeaders []string `protobuf:"bytes,4,rep,name=PassthroughHeaders,proto3" json:"PassthroughHeaders,omitempty"` PassthroughHeaders []string `protobuf:"bytes,4,rep,name=PassthroughHeaders,proto3" json:"PassthroughHeaders,omitempty"`
BranchHeaders map[string]string `protobuf:"bytes,5,rep,name=BranchHeaders,proto3" json:"BranchHeaders,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` BranchHeaders map[string]string `protobuf:"bytes,5,rep,name=BranchHeaders,proto3" json:"BranchHeaders,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
RequestTimeout int64 `protobuf:"varint,6,opt,name=RequestTimeout,proto3" json:"RequestTimeout,omitempty"`
} }
func (x *DtmTransOptions) Reset() { func (x *DtmTransOptions) Reset() {
@ -100,6 +101,13 @@ func (x *DtmTransOptions) GetBranchHeaders() map[string]string {
return nil return nil
} }
func (x *DtmTransOptions) GetRequestTimeout() int64 {
if x != nil {
return x.RequestTimeout
}
return 0
}
// DtmRequest request sent to dtm server // DtmRequest request sent to dtm server
type DtmRequest struct { type DtmRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
@ -337,7 +345,7 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x2f, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07,
0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc2, 0x02, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xea, 0x02, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74,
0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61,
0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65,
@ -353,66 +361,68 @@ var file_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, 0x61, 0x64, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, 0x61, 0x64,
0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x40, 0x0a, 0x12, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x26, 0x0a, 0x0e, 0x52, 0x65, 0x71, 0x75, 0x65,
0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52,
0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x0e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x1a,
0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x40, 0x0a, 0x12, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x01, 0x22, 0xfc, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47,
0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65,
0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x12, 0x3c, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70,
0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x43, 0x75, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x22,
0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04,
0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x64, 0x44, 0x61,
0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x69, 0x6e, 0x50, 0x61, 0x79, 0x6c,
0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65,
0x09, 0x52, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73, 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65,
0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x53, 0x74,
0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, 0x65, 0x70, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x53, 0x74, 0x65, 0x70, 0x73,
0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x22, 0x1f, 0x0a, 0x0b, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12,
0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69,
0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x64, 0x22, 0x82, 0x02, 0x0a, 0x10, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52,
0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20,
0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e,
0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61,
0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, 0x4f, 0x70, 0x12, 0x37, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b,
0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x32, 0x23, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72,
0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a, 0x0b, 0x42,
0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c,
0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x52, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x37, 0x0a,
0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xb1, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x0a, 0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x00, 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64,
0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41,
0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x62, 0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44,
0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42,
0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x44, 0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f,
0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

1
dtmgrpc/dtmgpb/dtmgimp.proto

@ -20,6 +20,7 @@ message DtmTransOptions {
int64 RetryInterval = 3; int64 RetryInterval = 3;
repeated string PassthroughHeaders = 4; repeated string PassthroughHeaders = 4;
map<string, string> BranchHeaders = 5; map<string, string> BranchHeaders = 5;
int64 RequestTimeout = 6;
} }
// DtmRequest request sent to dtm server // DtmRequest request sent to dtm server

4
dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go

@ -1,4 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: dtmgrpc/dtmgpb/dtmgimp.proto
package dtmgpb package dtmgpb

138
dtmsvr/api_json_rpc.go

@ -0,0 +1,138 @@
package dtmsvr
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/gin-gonic/gin"
)
const jrpcCodeFailure = -32901
const jrpcCodeOngoing = -32902
type jrpcReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
Params interface{} `json:"params"`
ID string `json:"id"`
}
func addJrpcRouter(engine *gin.Engine) {
type jrpcFunc = func(interface{}) interface{}
handlers := map[string]jrpcFunc{
"newGid": jrpcNewGid,
"prepare": jrpcPrepare,
"submit": jrpcSubmit,
"abort": jrpcAbort,
"registerBranch": jrpcRegisterBranch,
}
engine.POST("/api/json-rpc", func(c *gin.Context) {
began := time.Now()
var err error
var req jrpcReq
var jerr map[string]interface{}
r := func() interface{} {
defer dtmimp.P2E(&err)
err2 := c.BindJSON(&req)
if err2 != nil {
jerr = map[string]interface{}{
"code": -32700,
"message": fmt.Sprintf("Parse json error: %s", err2.Error()),
}
} else if req.ID == "" || req.Jsonrpc != "2.0" {
jerr = map[string]interface{}{
"code": -32600,
"message": fmt.Sprintf("Bad json request: %s", dtmimp.MustMarshalString(req)),
}
} else if handlers[req.Method] == nil {
jerr = map[string]interface{}{
"code": -32601,
"message": fmt.Sprintf("Method not found: %s", req.Method),
}
} else if handlers[req.Method] != nil {
return handlers[req.Method](req.Params)
}
return nil
}()
// error maybe returned in r, assign it to err
if ne, ok := r.(error); ok && err == nil {
err = ne
}
if err != nil {
if errors.Is(err, dtmcli.ErrFailure) {
jerr = map[string]interface{}{
"code": jrpcCodeFailure,
"message": err.Error(),
}
} else if errors.Is(err, dtmcli.ErrOngoing) {
jerr = map[string]interface{}{
"code": jrpcCodeOngoing,
"message": err.Error(),
}
} else if jerr == nil {
jerr = map[string]interface{}{
"code": -32603,
"message": err.Error(),
}
}
}
result := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
"error": jerr,
"result": r,
}
b, _ := json.Marshal(result)
cont := string(b)
if jerr == nil || jerr["code"] == jrpcCodeOngoing {
logger.Infof("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
} else {
logger.Errorf("%2dms %d %s %s %s", time.Since(began).Milliseconds(), 200, c.Request.Method, c.Request.RequestURI, cont)
}
c.JSON(200, result)
})
}
// TransFromJrpcParams construct TransGlobal from jrpc params
func TransFromJrpcParams(params interface{}) *TransGlobal {
t := TransGlobal{}
dtmimp.MustRemarshal(params, &t)
t.setupPayloads()
return &t
}
func jrpcNewGid(interface{}) interface{} {
return map[string]interface{}{"gid": GenGid()}
}
func jrpcPrepare(params interface{}) interface{} {
return svcPrepare(TransFromJrpcParams(params))
}
func jrpcSubmit(params interface{}) interface{} {
return svcSubmit(TransFromJrpcParams(params))
}
func jrpcAbort(params interface{}) interface{} {
return svcAbort(TransFromJrpcParams(params))
}
func jrpcRegisterBranch(params interface{}) interface{} {
data := map[string]string{}
dtmimp.MustRemarshal(params, &data)
branch := TransBranch{
Gid: data["gid"],
BranchID: data["branch_id"],
Status: dtmcli.StatusPrepared,
BinData: []byte(data["data"]),
}
return svcRegisterBranch(data["trans_type"], &branch, data)
}

1
dtmsvr/config/config.go

@ -76,6 +76,7 @@ type configType struct {
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"` RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"` HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"` GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
JSONRPCPort int64 `yaml:"JsonRpcPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"` MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"` UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"` UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`

18
dtmsvr/storage/registry/registry.go

@ -18,6 +18,12 @@ type StorageFactory interface {
GetStorage() storage.Store GetStorage() storage.Store
} }
var sqlFac = &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
}
var storeFactorys = map[string]StorageFactory{ var storeFactorys = map[string]StorageFactory{
"boltdb": &SingletonFactory{ "boltdb": &SingletonFactory{
creatorFunction: func() storage.Store { creatorFunction: func() storage.Store {
@ -29,16 +35,8 @@ var storeFactorys = map[string]StorageFactory{
return &redis.Store{} return &redis.Store{}
}, },
}, },
"mysql": &SingletonFactory{ "mysql": sqlFac,
creatorFunction: func() storage.Store { "postgres": sqlFac,
return &sql.Store{}
},
},
"postgres": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
} }
// GetStore returns storage.Store // GetStore returns storage.Store

6
dtmsvr/storage/sql/sql.go

@ -1,3 +1,9 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package sql package sql
import ( import (

12
dtmsvr/svr.go

@ -12,9 +12,10 @@ import (
"net" "net"
"time" "time"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/dtmutil"
@ -29,7 +30,11 @@ func StartSvr() {
dtmcli.GetRestyClient().SetTimeout(time.Duration(conf.RequestTimeout) * time.Second) dtmcli.GetRestyClient().SetTimeout(time.Duration(conf.RequestTimeout) * time.Second)
dtmgrpc.AddUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { dtmgrpc.AddUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx2, cancel := context.WithTimeout(ctx, time.Duration(conf.RequestTimeout)*time.Second) timeout := conf.RequestTimeout
if v := dtmgimp.RequestTimeoutFromContext(ctx); v != 0 {
timeout = v
}
ctx2, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel() defer cancel()
return invoker(ctx2, method, req, reply, cc, opts...) return invoker(ctx2, method, req, reply, cc, opts...)
}) })
@ -38,7 +43,8 @@ func StartSvr() {
app := dtmutil.GetGinApp() app := dtmutil.GetGinApp()
app = httpMetrics(app) app = httpMetrics(app)
addRoute(app) addRoute(app)
logger.Infof("dtmsvr listen at: %d", conf.HTTPPort) addJrpcRouter(app)
logger.Infof("dtmsvr http listen at: %d", conf.HTTPPort)
go func() { go func() {
err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort)) err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
if err != nil { if err != nil {

29
dtmsvr/trans_class.go

@ -26,6 +26,22 @@ type TransGlobal struct {
updateBranchSync bool updateBranchSync bool
} }
func (t *TransGlobal) setupPayloads() {
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range t.Payloads {
t.BinPayloads = append(t.BinPayloads, []byte(p))
}
for _, d := range t.Steps {
if d["data"] != "" {
t.BinPayloads = append(t.BinPayloads, []byte(d["data"]))
}
}
if t.Protocol == "" {
t.Protocol = "http"
}
}
// TransBranch branch transaction // TransBranch branch transaction
type TransBranch = storage.TransBranchStore type TransBranch = storage.TransBranchStore
@ -61,17 +77,7 @@ func TransFromContext(c *gin.Context) *TransGlobal {
m := TransGlobal{} m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m) dtmimp.MustUnmarshal(b, &m)
logger.Debugf("creating trans in prepare") logger.Debugf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal m.setupPayloads()
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))
}
for _, d := range m.Steps {
if d["data"] != "" {
m.BinPayloads = append(m.BinPayloads, []byte(d["data"]))
}
}
m.Protocol = "http"
m.Ext.Headers = map[string]string{} m.Ext.Headers = map[string]string{}
if len(m.PassthroughHeaders) > 0 { if len(m.PassthroughHeaders) > 0 {
for _, h := range m.PassthroughHeaders { for _, h := range m.PassthroughHeaders {
@ -103,6 +109,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
RetryInterval: o.RetryInterval, RetryInterval: o.RetryInterval,
PassthroughHeaders: o.PassthroughHeaders, PassthroughHeaders: o.PassthroughHeaders,
BranchHeaders: o.BranchHeaders, BranchHeaders: o.BranchHeaders,
RequestTimeout: o.RequestTimeout,
}, },
}} }}
if c.Steps != "" { if c.Steps != "" {

2
dtmsvr/trans_status.go

@ -119,11 +119,13 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
if err != nil { if err != nil {
return err return err
} }
conn := dtmgimp.MustGetGrpcConn(server, true) conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "") ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "")
kvs := dtmgimp.Map2Kvs(t.Ext.Headers) kvs := dtmgimp.Map2Kvs(t.Ext.Headers)
kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...) kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...)
ctx = metadata.AppendToOutgoingContext(ctx, kvs...) ctx = metadata.AppendToOutgoingContext(ctx, kvs...)
ctx = dtmgimp.RequestTimeoutNewContext(ctx, t.RequestTimeout)
err = conn.Invoke(ctx, method, branchPayload, &[]byte{}) err = conn.Invoke(ctx, method, branchPayload, &[]byte{})
if err == nil { if err == nil {
return nil return nil

2
dtmutil/consts.go

@ -9,6 +9,8 @@ package dtmutil
const ( const (
// DefaultHTTPServer default url for http server. used by test and examples // DefaultHTTPServer default url for http server. used by test and examples
DefaultHTTPServer = "http://localhost:36789/api/dtmsvr" DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"
// DefaultJrpcServer default url for http json-rpc server. used by test and examples
DefaultJrpcServer = "http://localhost:36789/api/json-rpc"
// DefaultGrpcServer default url for grpc server. used by test and examples // DefaultGrpcServer default url for grpc server. used by test and examples
DefaultGrpcServer = "localhost:36790" DefaultGrpcServer = "localhost:36790"
) )

2
test/busi/busi.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.27.1 // protoc-gen-go v1.27.1
// protoc v3.17.3 // protoc v3.19.4
// source: test/busi/busi.proto // source: test/busi/busi.proto
package busi package busi

4
test/busi/busi_grpc.pb.go

@ -1,4 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: test/busi/busi.proto
package busi package busi

71
test/msg_jrpc_test.go

@ -0,0 +1,71 @@
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package test
import (
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgJrpcNormal(t *testing.T) {
msg := genJrpcMsg(dtmimp.GetFuncName())
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgJrpcRepeated(t *testing.T) {
msg := genJrpcMsg(dtmimp.GetFuncName())
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
err := msg.Submit()
assert.Error(t, err)
}
func TestMsgJprcAbnormal(t *testing.T) {
id := "no-use"
resp, err := dtmcli.GetRestyClient().R().SetBody("hello").Post(dtmutil.DefaultJrpcServer)
assert.Nil(t, err)
assert.Contains(t, resp.String(), "-32700")
resp, err = dtmcli.GetRestyClient().R().SetBody(map[string]string{
"jsonrpc": "1.0",
"method": "newGid",
"params": "",
"id": id,
}).Post(dtmutil.DefaultJrpcServer)
assert.Nil(t, err)
assert.Contains(t, resp.String(), "-32600")
resp, err = dtmcli.GetRestyClient().R().SetBody(map[string]string{
"jsonrpc": "2.0",
"method": "not-exists",
"params": "",
"id": id,
}).Post(dtmutil.DefaultJrpcServer)
assert.Nil(t, err)
assert.Contains(t, resp.String(), "-32601")
}
func genJrpcMsg(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultJrpcServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
msg.Protocol = "json-rpc"
return msg
}

11
test/saga_grpc_test.go

@ -110,6 +110,17 @@ func TestSagaGrpcPassthroughHeadersYes(t *testing.T) {
waitTransProcessed(gidYes) waitTransProcessed(gidYes)
} }
func TestSagaGrpcWithGlobalTransRequestTimeout(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid)
saga.WaitResult = true
saga.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderNo", "", nil)
saga.WithGlobalTransRequestTimeout(6)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(gid)
}
func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) { func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName() gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes) sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)

Loading…
Cancel
Save