From 49825fdde319cef61f6ffb36eadcf5b6abe3223a Mon Sep 17 00:00:00 2001 From: "goss.beta" Date: Fri, 28 Jan 2022 01:16:57 +0900 Subject: [PATCH 1/2] Enable concurrent for SAGA grpc --- dtmcli/saga.go | 6 +++++- dtmgrpc/saga.go | 1 + dtmsvr/trans_class.go | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/dtmcli/saga.go b/dtmcli/saga.go index 2ac9c1c..b768d76 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -43,8 +43,12 @@ func (s *Saga) EnableConcurrent() *Saga { // Submit submit the saga trans func (s *Saga) Submit() error { + s.AddConcurrentContext() + return dtmimp.TransCallDtm(&s.TransBase, s, "submit") +} + +func (s *Saga) AddConcurrentContext() { if s.concurrent { s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.concurrent}) } - return dtmimp.TransCallDtm(&s.TransBase, s, "submit") } diff --git a/dtmgrpc/saga.go b/dtmgrpc/saga.go index af09038..9fca5d9 100644 --- a/dtmgrpc/saga.go +++ b/dtmgrpc/saga.go @@ -43,5 +43,6 @@ func (s *SagaGrpc) EnableConcurrent() *SagaGrpc { // Submit submit the saga trans func (s *SagaGrpc) Submit() error { + s.Saga.AddConcurrentContext() return dtmgimp.DtmGrpcCall(&s.Saga.TransBase, "Submit") } diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 50f9b01..b6e080b 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -96,6 +96,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal QueryPrepared: c.QueryPrepared, Protocol: "grpc", BinPayloads: c.BinPayloads, + CustomData: c.CustomedData, TransOptions: dtmcli.TransOptions{ WaitResult: o.WaitResult, TimeoutToFail: o.TimeoutToFail, From 55373dc9de798446e8568612605e765acf193018 Mon Sep 17 00:00:00 2001 From: "goss.beta" Date: Fri, 28 Jan 2022 22:54:03 +0900 Subject: [PATCH 2/2] fix lint --- dtmcli/saga.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dtmcli/saga.go b/dtmcli/saga.go index b768d76..87cf08f 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -47,6 +47,7 @@ func (s *Saga) Submit() error { return dtmimp.TransCallDtm(&s.TransBase, s, "submit") } +// AddConcurrentContext adds concurrent options to the request context func (s *Saga) AddConcurrentContext() { if s.concurrent { s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.concurrent})