Browse Source

merge touchCronTime delayCronTime func

pull/223/head
xyctruth 4 years ago
parent
commit
50c19a5d43
  1. 26
      dtmsvr/trans_status.go
  2. 6
      dtmsvr/trans_type_msg.go

26
dtmsvr/trans_status.go

@ -22,18 +22,20 @@ import (
"google.golang.org/grpc/metadata"
)
func (t *TransGlobal) touchCronTime(ctype cronType) {
// touchCronTime Based on ctype or delay set nextCronTime
// delay = 0 ,use ctype set nextCronTime and nextCronInterval
// delay > 0 ,use delay set nextCronTime ,use ctype set nextCronInterval
func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
t.lastTouched = time.Now()
nextCronInterval := t.getNextCronInterval(ctype)
nextCronTime := dtmutil.GetNextTime(nextCronInterval)
GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
func (t *TransGlobal) delayCronTime(delay uint64) {
t.lastTouched = time.Now()
nextCronInterval := t.getNextCronInterval(cronKeep)
nextCronTime := dtmutil.GetNextTime(int64(delay))
var nextCronTime *time.Time
if delay > 0 {
nextCronTime = dtmutil.GetNextTime(int64(delay))
} else {
nextCronTime = dtmutil.GetNextTime(nextCronInterval)
}
GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
@ -147,11 +149,11 @@ func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval {
t.touchCronTime(cronReset)
t.touchCronTime(cronReset, 0)
} else if err == dtmimp.ErrOngoing {
t.touchCronTime(cronKeep)
t.touchCronTime(cronKeep, 0)
} else if err != nil {
t.touchCronTime(cronBackoff)
t.touchCronTime(cronBackoff, 0)
}
return err
}

6
dtmsvr/trans_type_msg.go

@ -53,10 +53,10 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if errors.Is(err, dtmcli.ErrFailure) {
t.changeStatus(dtmcli.StatusFailed)
} else if errors.Is(err, dtmcli.ErrOngoing) {
t.touchCronTime(cronReset)
t.touchCronTime(cronReset, 0)
} else {
logger.Errorf("getting result failed for %s. error: %v", t.QueryPrepared, err)
t.touchCronTime(cronBackoff)
t.touchCronTime(cronBackoff, 0)
}
}
@ -72,7 +72,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
}
if cmc.Delay > 0 && t.needDelay(cmc.Delay) {
t.delayCronTime(cmc.Delay)
t.touchCronTime(cronKeep, cmc.Delay)
return nil
}

Loading…
Cancel
Save