From d91370086cb8413d58a41abb1edacdf68fc50237 Mon Sep 17 00:00:00 2001 From: conan8737 Date: Tue, 23 Dec 2025 10:03:46 +0800 Subject: [PATCH 1/5] feat(workflow): add exponential backoff support --- dtmsvr/trans_type_workflow.go | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/dtmsvr/trans_type_workflow.go b/dtmsvr/trans_type_workflow.go index 925323c..c79e7b4 100644 --- a/dtmsvr/trans_type_workflow.go +++ b/dtmsvr/trans_type_workflow.go @@ -2,11 +2,15 @@ package dtmsvr import ( "context" + "math" + "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/client/workflow/wfpb" + "github.com/dtm-labs/logger" + "github.com/gin-gonic/gin" ) type transWorkflowProcessor struct { @@ -38,5 +42,30 @@ func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []Tra wd := wfpb.WorkflowData{Data: cmc.Data} data = dtmgimp.MustProtoMarshal(&wd) } - return t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) + err := t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) + // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval + if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || + t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { + t.touchCronTime(cronReset, 0) + } else if err == dtmimp.ErrOngoing { + t.touchCronTime(cronKeep, 0) + } else if err != nil { + t.touchCronTime(cronBackoff, 0) + v := t.NextCronInterval / t.getNextCronInterval(cronReset) + retryCount := int64(math.Log2(float64(v))) + logger.Debugf("origin: %d v: %d retryCount: %d", t.getNextCronInterval(cronReset), v, retryCount) + if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" { + _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{ + "gid": t.Gid, + "status": t.Status, + "branch": "", + "error": err.Error(), + "retry_count": retryCount, + }).Post(conf.AlertWebHook) + if err2 != nil { + logger.Errorf("alerting webhook error: %v", err2) + } + } + } + return err } From d9fda5f5087c38fc5466796b8695ab833cdc6664 Mon Sep 17 00:00:00 2001 From: conan8737 Date: Sun, 4 Jan 2026 17:24:47 +0800 Subject: [PATCH 2/5] fix: resolve unit test failures --- dtmsvr/trans_status.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 742a094..8cde388 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -232,11 +232,8 @@ func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branc t.changeBranchStatus(branch, status, branchPos) } branchMetrics(t, branch, status == dtmcli.StatusSucceed) - // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval - if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || - t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { - t.touchCronTime(cronReset, 0) - } else if err == dtmimp.ErrOngoing { + + if err == dtmimp.ErrOngoing { t.touchCronTime(cronKeep, 0) } else if err != nil { t.touchCronTime(cronBackoff, 0) From 4e878719d8a17c194595777f31346f6cd4170a28 Mon Sep 17 00:00:00 2001 From: conan8737 Date: Sun, 4 Jan 2026 17:28:12 +0800 Subject: [PATCH 3/5] fix: resolve unit test failures --- dtmsvr/trans_status.go | 5 ++++- dtmsvr/trans_type_workflow.go | 6 +----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 8cde388..a8d5e00 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -233,7 +233,10 @@ func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branc } branchMetrics(t, branch, status == dtmcli.StatusSucceed) - if err == dtmimp.ErrOngoing { + if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || + t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { + t.touchCronTime(cronReset, 0) + } else if err == dtmimp.ErrOngoing { t.touchCronTime(cronKeep, 0) } else if err != nil { t.touchCronTime(cronBackoff, 0) diff --git a/dtmsvr/trans_type_workflow.go b/dtmsvr/trans_type_workflow.go index c79e7b4..d1de51c 100644 --- a/dtmsvr/trans_type_workflow.go +++ b/dtmsvr/trans_type_workflow.go @@ -3,7 +3,6 @@ package dtmsvr import ( "context" "math" - "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -44,10 +43,7 @@ func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []Tra } err := t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval - if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || - t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { - t.touchCronTime(cronReset, 0) - } else if err == dtmimp.ErrOngoing { + if err == dtmimp.ErrOngoing { t.touchCronTime(cronKeep, 0) } else if err != nil { t.touchCronTime(cronBackoff, 0) From 6c2ecb2c79a619da10417b240ab4dd8944f2eca8 Mon Sep 17 00:00:00 2001 From: conan8737 Date: Sun, 4 Jan 2026 17:40:42 +0800 Subject: [PATCH 4/5] fix: resolve unit test failures --- dtmsvr/trans_status.go | 2 +- dtmsvr/trans_type_workflow.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index a8d5e00..742a094 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -232,7 +232,7 @@ func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branc t.changeBranchStatus(branch, status, branchPos) } branchMetrics(t, branch, status == dtmcli.StatusSucceed) - + // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { t.touchCronTime(cronReset, 0) diff --git a/dtmsvr/trans_type_workflow.go b/dtmsvr/trans_type_workflow.go index d1de51c..da6bd45 100644 --- a/dtmsvr/trans_type_workflow.go +++ b/dtmsvr/trans_type_workflow.go @@ -42,7 +42,6 @@ func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []Tra data = dtmgimp.MustProtoMarshal(&wd) } err := t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) - // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval if err == dtmimp.ErrOngoing { t.touchCronTime(cronKeep, 0) } else if err != nil { From 861ae27c131ef9a55c8bce5a7539910af18131c2 Mon Sep 17 00:00:00 2001 From: conan8737 Date: Tue, 27 Jan 2026 20:30:42 +0800 Subject: [PATCH 5/5] fix: resolve unit test failures --- dtmsvr/trans_type_workflow.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dtmsvr/trans_type_workflow.go b/dtmsvr/trans_type_workflow.go index da6bd45..3e96c35 100644 --- a/dtmsvr/trans_type_workflow.go +++ b/dtmsvr/trans_type_workflow.go @@ -2,6 +2,7 @@ package dtmsvr import ( "context" + "errors" "math" "github.com/dtm-labs/dtm/client/dtmcli" @@ -44,7 +45,7 @@ func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []Tra err := t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data) if err == dtmimp.ErrOngoing { t.touchCronTime(cronKeep, 0) - } else if err != nil { + } else if err != nil && !errors.Is(err, dtmimp.ErrFailure) { t.touchCronTime(cronBackoff, 0) v := t.NextCronInterval / t.getNextCronInterval(cronReset) retryCount := int64(math.Log2(float64(v)))