diff --git a/conf.sample.yml b/conf.sample.yml index c8c69c6..5aa71e9 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -64,4 +64,8 @@ # UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status # TimeZoneOffset: '' #default '' using system default. '+8': Asia/Shanghai; '0': GMT -# ConfigUpdateInterval: 10 # the interval to update configuration in memory such as topics map... (seconds) \ No newline at end of file +# ConfigUpdateInterval: 10 # the interval to update configuration in memory such as topics map... (seconds) +# TimeZoneOffset: '' # default '' using system default. '+8': Asia/Shanghai; '0': GMT +# AlertRetryLimit: 3 # default 3; if a transaction branch has been retried 3 times, the AlertHook will be called +# AlertWebHook: '' # default ''; sample: 'http://localhost:8080/dtm-hook'. this hook will be called like this: +## curl -H "Content-Type: application/json" -d '{"gid":"xxxx","status":"submitted","retry_count":3}' http://localhost:8080/dtm-hook diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go index 857fedf..65a2a90 100644 --- a/dtmsvr/config/config.go +++ b/dtmsvr/config/config.go @@ -99,6 +99,8 @@ type Type struct { Log Log `yaml:"Log"` TimeZoneOffset string `yaml:"TimeZoneOffset"` ConfigUpdateInterval int64 `yaml:"ConfigUpdateInterval" default:"3"` + AlertRetryLimit int64 `yaml:"AlertRetryLimit" default:"3"` + AlertWebHook string `yaml:"AlertWebHook"` } // Config config diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index b34b3cd..9e8b4e5 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -3,6 +3,7 @@ package dtmsvr import ( "errors" "fmt" + "math" "net/url" "strings" "time" @@ -14,6 +15,7 @@ import ( "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtmdriver" "github.com/dtm-labs/logger" + "github.com/gin-gonic/gin" "github.com/lithammer/shortuuid/v3" "google.golang.org/grpc/metadata" ) @@ -221,13 +223,28 @@ func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error { } branchMetrics(t, branch, status == dtmcli.StatusSucceed) // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval - if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || - t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval { + if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond || + t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) { t.touchCronTime(cronReset, 0) } else if err == dtmimp.ErrOngoing { t.touchCronTime(cronKeep, 0) } else if err != nil { t.touchCronTime(cronBackoff, 0) + v := t.NextCronInterval / t.getNextCronInterval(cronReset) + retryCount := int64(math.Log2(float64(v))) + logger.Debugf("origin: %d v: %d retryCount: %d", t.getNextCronInterval(cronReset), v, retryCount) + if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" { + _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{ + "gid": t.Gid, + "status": t.Status, + "branch": branch.URL, + "error": err.Error(), + "retry_count": retryCount, + }).Post(conf.AlertWebHook) + if err2 != nil { + logger.Errorf("alerting webhook error: %v", err2) + } + } } return err } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index f0ce35c..44bbabc 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io/ioutil" + "strings" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -49,6 +50,9 @@ func SetSleepCancelHandler(handler SleepCancelHandler) { sleepCancelHandler = handler } +// WebHookResult 1 +var WebHookResult gin.H + // BaseAppStartup base app startup func BaseAppStartup() *gin.Engine { logger.Infof("examples starting") @@ -221,4 +225,12 @@ func BaseAddRoute(app *gin.Engine) { retryNums = 3 return nil })) + app.POST(BusiAPI+"/AlertWebHook", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} { + err := ctx.BindJSON(&WebHookResult) + dtmimp.FatalIfError(err) + if strings.Contains(WebHookResult["gid"].(string), "Error") { + return errors.New("gid contains 'Error', so return error") + } + return nil + })) } diff --git a/test/main_test.go b/test/main_test.go index 7854170..665eed5 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -31,6 +31,7 @@ func TestMain(m *testing.M) { dtmsvr.CronForwardDuration = 180 * time.Second conf.UpdateBranchSync = 1 conf.ConfigUpdateInterval = 1 + conf.AlertWebHook = busi.Busi + "/AlertWebHook" dtmdriver.Middlewares.HTTP = append(dtmdriver.Middlewares.HTTP, busi.SetHTTPHeaderForHeadersYes) dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes) diff --git a/test/msg_webhook_test.go b/test/msg_webhook_test.go new file mode 100644 index 0000000..4242aba --- /dev/null +++ b/test/msg_webhook_test.go @@ -0,0 +1,41 @@ +package test + +import ( + "testing" + + "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestMsgWebhook(t *testing.T) { + msg := genMsg(dtmimp.GetFuncName()) + busi.MainSwitch.TransInResult.SetOnce("ERROR") + msg.Submit() + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + waitTransProcessed(msg.Gid) + busi.MainSwitch.TransInResult.SetOnce("ERROR") + cronTransOnce(t, msg.Gid) + busi.MainSwitch.TransInResult.SetOnce("ERROR") + cronTransOnce(t, msg.Gid) + assert.Equal(t, msg.Gid, busi.WebHookResult["gid"]) + cronTransOnce(t, msg.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) +} + +func TestMsgWebhookError(t *testing.T) { + msg := genMsg(dtmimp.GetFuncName()) + busi.MainSwitch.TransInResult.SetOnce("ERROR") + msg.Submit() + assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) + waitTransProcessed(msg.Gid) + busi.MainSwitch.TransInResult.SetOnce("ERROR") + cronTransOnce(t, msg.Gid) + busi.MainSwitch.TransInResult.SetOnce("ERROR") + cronTransOnce(t, msg.Gid) + assert.Equal(t, msg.Gid, busi.WebHookResult["gid"]) + cronTransOnce(t, msg.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) +} diff --git a/test/types.go b/test/types.go index 2f03260..7bf1fa3 100644 --- a/test/types.go +++ b/test/types.go @@ -33,7 +33,7 @@ func waitTransProcessed(gid string) { case id := <-dtmsvr.TransProcessedTestChan: logger.FatalfIf(id != gid, "------- expecting: %s but %s found", gid, id) logger.Debugf("finish for gid %s", gid) - case <-time.After(time.Duration(time.Second * 4)): + case <-time.After(time.Duration(time.Second * 40000)): logger.FatalfIf(true, "Wait Trans timeout") } }