Browse Source

add needDelay func, fix name

pull/223/head
xyctruth 4 years ago
parent
commit
8cd01342e0
  1. 4
      dtmcli/msg.go
  2. 6
      dtmgrpc/msg.go
  3. 2
      dtmsvr/cron.go
  4. 12
      dtmsvr/trans_class.go
  5. 4
      dtmsvr/trans_status.go
  6. 2
      dtmsvr/trans_type_msg.go
  7. 11
      test/msg_delay_test.go
  8. 7
      test/types.go

4
dtmcli/msg.go

@ -31,8 +31,8 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}
// EnableDelay delay call branch, unit second
func (s *Msg) EnableDelay(delay uint64) *Msg {
// SetDelay delay call branch, unit second
func (s *Msg) SetDelay(delay uint64) *Msg {
s.delay = delay
return s
}

6
dtmgrpc/msg.go

@ -33,9 +33,9 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc {
return s
}
// EnableDelay delay call branch, unit second
func (s *MsgGrpc) EnableDelay(delay uint64) *MsgGrpc {
s.Msg.EnableDelay(delay)
// SetDelay delay call branch, unit second
func (s *MsgGrpc) SetDelay(delay uint64) *MsgGrpc {
s.Msg.SetDelay(delay)
return s
}

2
dtmsvr/cron.go

@ -55,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
return nil
}
logger.Infof("cron job return a trans: %s", global.String())
return &TransGlobal{TransGlobalStore: *global, triggerType: triggerCron}
return &TransGlobal{TransGlobalStore: *global}
}
func handlePanic(perr *error) {

12
dtmsvr/trans_class.go

@ -19,20 +19,11 @@ import (
"github.com/gin-gonic/gin"
)
// triggerType trigger transaction type
type triggerType int
const (
triggerManual triggerType = iota //triggerManual manual trigger
triggerCron //triggerCron cron trigger
)
// TransGlobal global transaction
type TransGlobal struct {
storage.TransGlobalStore
lastTouched time.Time // record the start time of process
updateBranchSync bool
triggerType triggerType
}
// TransBranch branch transaction
@ -113,8 +104,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
PassthroughHeaders: o.PassthroughHeaders,
BranchHeaders: o.BranchHeaders,
},
},
triggerType: triggerManual}
}}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}

4
dtmsvr/trans_status.go

@ -79,6 +79,10 @@ func (t *TransGlobal) isTimeout() bool {
return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second
}
func (t *TransGlobal) needDelay(delay uint64) bool {
return time.Since(*t.CreateTime)+CronForwardDuration < time.Duration(delay)*time.Second
}
func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}

2
dtmsvr/trans_type_msg.go

@ -71,7 +71,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
dtmimp.MustUnmarshalString(t.CustomData, &cmc)
}
if cmc.Delay > 0 && t.triggerType == triggerManual {
if cmc.Delay > 0 && t.needDelay(cmc.Delay) {
t.delayCronTime(cmc.Delay)
return nil
}

11
test/msg_delay_test.go

@ -5,6 +5,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
@ -14,7 +15,7 @@ func genMsgDelay(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req).EnableDelay(10)
Add(busi.Busi+"/TransIn", &req).SetDelay(10)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
return msg
}
@ -22,8 +23,12 @@ func genMsgDelay(gid string) *dtmcli.Msg {
func TestMsgDelayNormal(t *testing.T) {
gid := dtmimp.GetFuncName()
msg := genMsgDelay(gid)
msg.Submit()
waitTransProcessed(msg.Gid)
submitForwardCron(0, func() {
msg.Submit()
waitTransProcessed(msg.Gid)
})
dtmsvr.NowForwardDuration = 0
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
cronTransOnceForwardCron(t, "", 0)

7
test/types.go

@ -68,6 +68,13 @@ func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) {
dtmsvr.CronForwardDuration = old
}
func submitForwardCron(seconds int, fn func()) {
old := dtmsvr.CronForwardDuration
dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second
fn()
dtmsvr.CronForwardDuration = old
}
const (
// StatusPrepared status for global/branch trans status.
StatusPrepared = dtmcli.StatusPrepared

Loading…
Cancel
Save