Browse Source

feat(workflow): add exponential backoff support

pull/573/head
conan8737 3 months ago
parent
commit
d91370086c
  1. 31
      dtmsvr/trans_type_workflow.go

31
dtmsvr/trans_type_workflow.go

@ -2,11 +2,15 @@ package dtmsvr
import (
"context"
"math"
"time"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/client/workflow/wfpb"
"github.com/dtm-labs/logger"
"github.com/gin-gonic/gin"
)
type transWorkflowProcessor struct {
@ -38,5 +42,30 @@ func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []Tra
wd := wfpb.WorkflowData{Data: cmc.Data}
data = dtmgimp.MustProtoMarshal(&wd)
}
return t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data)
err := t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data)
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) {
t.touchCronTime(cronReset, 0)
} else if err == dtmimp.ErrOngoing {
t.touchCronTime(cronKeep, 0)
} else if err != nil {
t.touchCronTime(cronBackoff, 0)
v := t.NextCronInterval / t.getNextCronInterval(cronReset)
retryCount := int64(math.Log2(float64(v)))
logger.Debugf("origin: %d v: %d retryCount: %d", t.getNextCronInterval(cronReset), v, retryCount)
if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" {
_, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
"gid": t.Gid,
"status": t.Status,
"branch": "",
"error": err.Error(),
"retry_count": retryCount,
}).Post(conf.AlertWebHook)
if err2 != nil {
logger.Errorf("alerting webhook error: %v", err2)
}
}
}
return err
}

Loading…
Cancel
Save