From d91370086cb8413d58a41abb1edacdf68fc50237 Mon Sep 17 00:00:00 2001 From: conan8737 Date: Tue, 23 Dec 2025 10:03:46 +0800 Subject: [PATCH] feat(workflow): add exponential backoff support --- dtmsvr/trans_type_workflow.go | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/dtmsvr/trans_type_workflow.go b/dtmsvr/trans_type_workflow.go index 925323c..c79e7b4 100644 --- a/dtmsvr/trans_type_workflow.go +++ b/dtmsvr/trans_type_workflow.go @@ -2,11 +2,15 @@ package dtmsvr import ( "context" + "math" + "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/client/workflow/wfpb" + "github.com/dtm-labs/logger" + "github.com/gin-gonic/gin" ) type transWorkflowProcessor struct { @@ -38,5 +42,30 @@ func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []Tra wd := wfpb.WorkflowData{Data: cmc.Data} data = dtmgimp.MustProtoMarshal(&wd) } - return t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) + err := t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) + // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval + if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || + t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { + t.touchCronTime(cronReset, 0) + } else if err == dtmimp.ErrOngoing { + t.touchCronTime(cronKeep, 0) + } else if err != nil { + t.touchCronTime(cronBackoff, 0) + v := t.NextCronInterval / t.getNextCronInterval(cronReset) + retryCount := int64(math.Log2(float64(v))) + logger.Debugf("origin: %d v: %d retryCount: %d", t.getNextCronInterval(cronReset), v, retryCount) + if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" { + _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{ + "gid": t.Gid, + "status": t.Status, + "branch": "", + "error": err.Error(), + "retry_count": retryCount, + }).Post(conf.AlertWebHook) + if err2 != nil { + logger.Errorf("alerting webhook error: %v", err2) + } + } + } + return err }