From 8cd01342e04f972d0c06a9ce6394ec92926aa0ea Mon Sep 17 00:00:00 2001 From: xyctruth <398041993@qq.com> Date: Wed, 23 Feb 2022 18:52:28 +0800 Subject: [PATCH] add needDelay func, fix name --- dtmcli/msg.go | 4 ++-- dtmgrpc/msg.go | 6 +++--- dtmsvr/cron.go | 2 +- dtmsvr/trans_class.go | 12 +----------- dtmsvr/trans_status.go | 4 ++++ dtmsvr/trans_type_msg.go | 2 +- test/msg_delay_test.go | 11 ++++++++--- test/types.go | 7 +++++++ 8 files changed, 27 insertions(+), 21 deletions(-) diff --git a/dtmcli/msg.go b/dtmcli/msg.go index 069824a..0bc6dc6 100644 --- a/dtmcli/msg.go +++ b/dtmcli/msg.go @@ -31,8 +31,8 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { return s } -// EnableDelay delay call branch, unit second -func (s *Msg) EnableDelay(delay uint64) *Msg { +// SetDelay delay call branch, unit second +func (s *Msg) SetDelay(delay uint64) *Msg { s.delay = delay return s } diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go index 9188b54..be5b304 100644 --- a/dtmgrpc/msg.go +++ b/dtmgrpc/msg.go @@ -33,9 +33,9 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc { return s } -// EnableDelay delay call branch, unit second -func (s *MsgGrpc) EnableDelay(delay uint64) *MsgGrpc { - s.Msg.EnableDelay(delay) +// SetDelay delay call branch, unit second +func (s *MsgGrpc) SetDelay(delay uint64) *MsgGrpc { + s.Msg.SetDelay(delay) return s } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index c1b0692..a768fa1 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -55,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { return nil } logger.Infof("cron job return a trans: %s", global.String()) - return &TransGlobal{TransGlobalStore: *global, triggerType: triggerCron} + return &TransGlobal{TransGlobalStore: *global} } func handlePanic(perr *error) { diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 47bf492..b6e080b 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -19,20 +19,11 @@ import ( "github.com/gin-gonic/gin" ) -// triggerType trigger transaction type -type triggerType int - -const ( - triggerManual triggerType = iota //triggerManual manual trigger - triggerCron //triggerCron cron trigger -) - // TransGlobal global transaction type TransGlobal struct { storage.TransGlobalStore lastTouched time.Time // record the start time of process updateBranchSync bool - triggerType triggerType } // TransBranch branch transaction @@ -113,8 +104,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal PassthroughHeaders: o.PassthroughHeaders, BranchHeaders: o.BranchHeaders, }, - }, - triggerType: triggerManual} + }} if c.Steps != "" { dtmimp.MustUnmarshalString(c.Steps, &r.Steps) } diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 6546e06..8d56af6 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -79,6 +79,10 @@ func (t *TransGlobal) isTimeout() bool { return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second } +func (t *TransGlobal) needDelay(delay uint64) bool { + return time.Since(*t.CreateTime)+CronForwardDuration < time.Duration(delay)*time.Second +} + func (t *TransGlobal) needProcess() bool { return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout() } diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go index 3a10196..731318b 100644 --- a/dtmsvr/trans_type_msg.go +++ b/dtmsvr/trans_type_msg.go @@ -71,7 +71,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error { dtmimp.MustUnmarshalString(t.CustomData, &cmc) } - if cmc.Delay > 0 && t.triggerType == triggerManual { + if cmc.Delay > 0 && t.needDelay(cmc.Delay) { t.delayCronTime(cmc.Delay) return nil } diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go index bd60e28..9b9d030 100644 --- a/test/msg_delay_test.go +++ b/test/msg_delay_test.go @@ -5,6 +5,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" @@ -14,7 +15,7 @@ func genMsgDelay(gid string) *dtmcli.Msg { req := busi.GenTransReq(30, false, false) msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid). Add(busi.Busi+"/TransOut", &req). - Add(busi.Busi+"/TransIn", &req).EnableDelay(10) + Add(busi.Busi+"/TransIn", &req).SetDelay(10) msg.QueryPrepared = busi.Busi + "/QueryPrepared" return msg } @@ -22,8 +23,12 @@ func genMsgDelay(gid string) *dtmcli.Msg { func TestMsgDelayNormal(t *testing.T) { gid := dtmimp.GetFuncName() msg := genMsgDelay(gid) - msg.Submit() - waitTransProcessed(msg.Gid) + submitForwardCron(0, func() { + msg.Submit() + waitTransProcessed(msg.Gid) + }) + + dtmsvr.NowForwardDuration = 0 assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) cronTransOnceForwardCron(t, "", 0) diff --git a/test/types.go b/test/types.go index 2d1d545..0f3ed43 100644 --- a/test/types.go +++ b/test/types.go @@ -68,6 +68,13 @@ func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) { dtmsvr.CronForwardDuration = old } +func submitForwardCron(seconds int, fn func()) { + old := dtmsvr.CronForwardDuration + dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second + fn() + dtmsvr.CronForwardDuration = old +} + const ( // StatusPrepared status for global/branch trans status. StatusPrepared = dtmcli.StatusPrepared