Browse Source

remove passthroughheader option

pull/343/head
yedf2 4 years ago
parent
commit
d65440d14f
  1. 24
      client/dtmcli/dtmimp/trans_base.go
  2. 3
      client/dtmcli/dtmimp/vars.go
  3. 8
      client/dtmcli/types.go
  4. 11
      client/dtmgrpc/dtmgimp/utils.go
  5. 26
      client/dtmgrpc/dtmgpb/dtmgimp.pb.go
  6. 2
      client/dtmgrpc/dtmgpb/dtmgimp.proto
  7. 29
      dtmsvr/trans_class.go
  8. 11
      test/saga_cover_test.go
  9. 35
      test/saga_grpc_test.go
  10. 35
      test/saga_options_test.go

24
client/dtmcli/dtmimp/trans_base.go

@ -43,13 +43,12 @@ func (g *BranchIDGen) CurrentSubBranchID() string {
// TransOptions transaction options
type TransOptions struct {
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second
RequestTimeout int64 `json:"request_timeout,omitempty" gorm:"-"` // for global trans resets request timeout, unit: second
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` // for inherit the specified gin context headers
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second
RequestTimeout int64 `json:"request_timeout,omitempty" gorm:"-"` // for global trans resets request timeout, unit: second
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
}
// TransBase base for all trans
@ -75,12 +74,11 @@ type TransBase struct {
// NewTransBase new a TransBase
func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {
return &TransBase{
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
Context: context.Background(),
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
Context: context.Background(),
}
}

3
client/dtmcli/dtmimp/vars.go

@ -32,9 +32,6 @@ var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}
// MapFailure HTTP result of FAILURE
var MapFailure = map[string]interface{}{"dtm_result": ResultFailure}
// PassthroughHeaders will be passed to every sub-trans call
var PassthroughHeaders = []string{}
// BarrierTableName the table name of barrier table
var BarrierTableName = "dtm_barrier.barrier"

8
client/dtmcli/types.go

@ -46,11 +46,3 @@ func GetRestyClient() *resty.Client {
func GetRestyClient2(timeout time.Duration) *resty.Client {
return dtmimp.GetRestyClient2(timeout)
}
// SetPassthroughHeaders experimental.
// apply to http header and grpc metadata
// dtm server will save these headers in trans creating request.
// and then passthrough them to sub-trans
func SetPassthroughHeaders(headers []string) {
dtmimp.PassthroughHeaders = headers
}

11
client/dtmgrpc/dtmgimp/utils.go

@ -36,12 +36,11 @@ func GetDtmRequest(s *dtmimp.TransBase) *dtmgpb.DtmRequest {
Gid: s.Gid,
TransType: s.TransType,
TransOptions: &dtmgpb.DtmTransOptions{
WaitResult: s.WaitResult,
TimeoutToFail: s.TimeoutToFail,
RetryInterval: s.RetryInterval,
PassthroughHeaders: s.PassthroughHeaders,
BranchHeaders: s.BranchHeaders,
RequestTimeout: s.RequestTimeout,
WaitResult: s.WaitResult,
TimeoutToFail: s.TimeoutToFail,
RetryInterval: s.RetryInterval,
BranchHeaders: s.BranchHeaders,
RequestTimeout: s.RequestTimeout,
},
QueryPrepared: s.QueryPrepared,
CustomedData: s.CustomData,

26
client/dtmgrpc/dtmgpb/dtmgimp.pb.go

@ -26,12 +26,12 @@ type DtmTransOptions struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
WaitResult bool `protobuf:"varint,1,opt,name=WaitResult,proto3" json:"WaitResult,omitempty"`
TimeoutToFail int64 `protobuf:"varint,2,opt,name=TimeoutToFail,proto3" json:"TimeoutToFail,omitempty"`
RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"`
PassthroughHeaders []string `protobuf:"bytes,4,rep,name=PassthroughHeaders,proto3" json:"PassthroughHeaders,omitempty"`
BranchHeaders map[string]string `protobuf:"bytes,5,rep,name=BranchHeaders,proto3" json:"BranchHeaders,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
RequestTimeout int64 `protobuf:"varint,6,opt,name=RequestTimeout,proto3" json:"RequestTimeout,omitempty"`
WaitResult bool `protobuf:"varint,1,opt,name=WaitResult,proto3" json:"WaitResult,omitempty"`
TimeoutToFail int64 `protobuf:"varint,2,opt,name=TimeoutToFail,proto3" json:"TimeoutToFail,omitempty"`
RetryInterval int64 `protobuf:"varint,3,opt,name=RetryInterval,proto3" json:"RetryInterval,omitempty"`
// repeated string PassthroughHeaders = 4; // depreceated
BranchHeaders map[string]string `protobuf:"bytes,5,rep,name=BranchHeaders,proto3" json:"BranchHeaders,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
RequestTimeout int64 `protobuf:"varint,6,opt,name=RequestTimeout,proto3" json:"RequestTimeout,omitempty"`
}
func (x *DtmTransOptions) Reset() {
@ -87,13 +87,6 @@ func (x *DtmTransOptions) GetRetryInterval() int64 {
return 0
}
func (x *DtmTransOptions) GetPassthroughHeaders() []string {
if x != nil {
return x.PassthroughHeaders
}
return nil
}
func (x *DtmTransOptions) GetBranchHeaders() map[string]string {
if x != nil {
return x.BranchHeaders
@ -558,7 +551,7 @@ var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x1a, 0x1b,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f,
0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xea, 0x02, 0x0a, 0x0f,
0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xba, 0x02, 0x0a, 0x0f,
0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12,
0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12,
@ -566,10 +559,7 @@ var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x54,
0x6f, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e,
0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x52, 0x65,
0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x2e, 0x0a, 0x12, 0x50,
0x61, 0x73, 0x73, 0x74, 0x68, 0x72, 0x6f, 0x75, 0x67, 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72,
0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x12, 0x50, 0x61, 0x73, 0x73, 0x74, 0x68, 0x72,
0x6f, 0x75, 0x67, 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x51, 0x0a, 0x0d, 0x42,
0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x51, 0x0a, 0x0d, 0x42,
0x72, 0x61, 0x6e, 0x63, 0x68, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x42, 0x72, 0x61,

2
client/dtmgrpc/dtmgpb/dtmgimp.proto

@ -19,7 +19,7 @@ message DtmTransOptions {
bool WaitResult = 1;
int64 TimeoutToFail = 2;
int64 RetryInterval = 3;
repeated string PassthroughHeaders = 4;
// repeated string PassthroughHeaders = 4; // depreceated
map<string, string> BranchHeaders = 5;
int64 RequestTimeout = 6;
}

29
dtmsvr/trans_class.go

@ -6,7 +6,6 @@ import (
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/logger"
@ -77,14 +76,6 @@ func TransFromContext(c *gin.Context) *TransGlobal {
logger.Debugf("creating trans in prepare")
m.setupPayloads()
m.Ext.Headers = map[string]string{}
if len(m.PassthroughHeaders) > 0 {
for _, h := range m.PassthroughHeaders {
v := c.GetHeader(h)
if v != "" {
m.Ext.Headers[h] = v
}
}
}
return &m
}
@ -103,26 +94,16 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
CustomData: c.CustomedData,
RollbackReason: c.RollbackReason,
TransOptions: dtmcli.TransOptions{
WaitResult: o.WaitResult,
TimeoutToFail: o.TimeoutToFail,
RetryInterval: o.RetryInterval,
PassthroughHeaders: o.PassthroughHeaders,
BranchHeaders: o.BranchHeaders,
RequestTimeout: o.RequestTimeout,
WaitResult: o.WaitResult,
TimeoutToFail: o.TimeoutToFail,
RetryInterval: o.RetryInterval,
BranchHeaders: o.BranchHeaders,
RequestTimeout: o.RequestTimeout,
},
}}
r.ReqExtra = c.ReqExtra
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
if len(o.PassthroughHeaders) > 0 {
r.Ext.Headers = map[string]string{}
for _, h := range o.PassthroughHeaders {
v := dtmgimp.GetMetaFromContext(ctx, h)
if v != "" {
r.Ext.Headers[h] = v
}
}
}
return &r
}

11
test/saga_cover_test.go

@ -1,11 +0,0 @@
package test
import (
"testing"
"github.com/dtm-labs/dtm/client/dtmcli"
)
func TestSagaCover(t *testing.T) {
dtmcli.SetPassthroughHeaders([]string{})
}

35
test/saga_grpc_test.go

@ -101,17 +101,6 @@ func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
return saga
}
func TestSagaGrpcPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)
sagaYes.WaitResult = true
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil)
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
}
func TestSagaGrpcWithGlobalTransRequestTimeout(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid)
@ -137,30 +126,6 @@ func TestSagaGrpcOptionsRollbackWait(t *testing.T) {
assert.Contains(t, getTrans(saga.Gid).RollbackReason, "Insufficient balance")
}
func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes)
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
func TestSagaGrpcPassthroughHeadersNo(t *testing.T) {
gidNo := dtmimp.GetFuncName()
sagaNo := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidNo)
sagaNo.WaitResult = true
sagaNo.Add(busi.BusiGrpc+"/busi.Busi/TransOutHeaderNo", "", nil)
err := sagaNo.Submit()
assert.Nil(t, err)
waitTransProcessed(gidNo)
}
func TestSagaGrpcHeaders(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gidYes).

35
test/saga_options_test.go

@ -109,41 +109,6 @@ func TestSagaOptionsRollbackWait(t *testing.T) {
assert.Contains(t, getTrans(saga.Gid).RollbackReason, "Insufficient balance")
}
func TestSagaPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)
sagaYes.WaitResult = true
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
}
func TestSagaCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
err := sagaYes.Submit()
assert.Nil(t, err)
waitTransProcessed(gidYes)
assert.Equal(t, StatusSubmitted, getTransStatus(gidYes))
cronTransOnce(t, gidYes)
assert.Equal(t, StatusSucceed, getTransStatus(gidYes))
}
func TestSagaPassthroughHeadersNo(t *testing.T) {
gidNo := dtmimp.GetFuncName()
sagaNo := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidNo)
sagaNo.WaitResult = true
sagaNo.Add(busi.Busi+"/TransOutHeaderNo", "", nil)
err := sagaNo.Submit()
assert.Nil(t, err)
waitTransProcessed(gidNo)
}
func TestSagaHeaders(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)

Loading…
Cancel
Save