diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index a768fa1..c1b0692 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -55,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { return nil } logger.Infof("cron job return a trans: %s", global.String()) - return &TransGlobal{TransGlobalStore: *global} + return &TransGlobal{TransGlobalStore: *global, triggerType: triggerCron} } func handlePanic(perr *error) { diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index b6e080b..e9dc48c 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -19,11 +19,19 @@ import ( "github.com/gin-gonic/gin" ) +type triggerType int + +const ( + triggerManual triggerType = iota //manual trigger + triggerCron //cron trigger +) + // TransGlobal global transaction type TransGlobal struct { storage.TransGlobalStore lastTouched time.Time // record the start time of process updateBranchSync bool + triggerType triggerType } // TransBranch branch transaction @@ -104,7 +112,8 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal PassthroughHeaders: o.PassthroughHeaders, BranchHeaders: o.BranchHeaders, }, - }} + }, + triggerType: triggerManual} if c.Steps != "" { dtmimp.MustUnmarshalString(c.Steps, &r.Steps) } diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go index 3e62c69..fd386c6 100644 --- a/dtmsvr/trans_type_msg.go +++ b/dtmsvr/trans_type_msg.go @@ -71,7 +71,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error { dtmimp.MustUnmarshalString(t.CustomData, &cmc) } - if cmc.Delay > 0 { + if cmc.Delay > 0 && t.triggerType == triggerManual { t.delayCronTime(cmc.Delay) return nil } diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go new file mode 100644 index 0000000..7300530 --- /dev/null +++ b/test/msg_delay_test.go @@ -0,0 +1,36 @@ +package test + +import ( + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr" + "github.com/dtm-labs/dtm/dtmutil" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func genMsgDelay(gid string) *dtmcli.Msg { + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid). + Add(busi.Busi+"/TransOut", &req). + Add(busi.Busi+"/TransIn", &req).EnableDelay(2) + msg.QueryPrepared = busi.Busi + "/QueryPrepared" + return msg +} + +func TestMsgDelayNormal(t *testing.T) { + gid := dtmimp.GetFuncName() + msg := genMsgDelay(gid) + msg.Submit() + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + time.Sleep(2 * time.Second) + dtmsvr.CronForwardDuration = 0 + cronTransOnce(t, gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) +}