Browse Source

add webhook

pull/362/head
yedf2 3 years ago
parent
commit
70873802b2
  1. 6
      conf.sample.yml
  2. 2
      dtmsvr/config/config.go
  3. 21
      dtmsvr/trans_status.go
  4. 12
      test/busi/base_http.go
  5. 1
      test/main_test.go
  6. 41
      test/msg_webhook_test.go
  7. 2
      test/types.go

6
conf.sample.yml

@ -64,4 +64,8 @@
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status
# TimeZoneOffset: '' #default '' using system default. '+8': Asia/Shanghai; '0': GMT
# ConfigUpdateInterval: 10 # the interval to update configuration in memory such as topics map... (seconds)
# ConfigUpdateInterval: 10 # the interval to update configuration in memory such as topics map... (seconds)
# TimeZoneOffset: '' # default '' using system default. '+8': Asia/Shanghai; '0': GMT
# AlertRetryLimit: 3 # default 3; if a transaction branch has been retried 3 times, the AlertHook will be called
# AlertWebHook: '' # default ''; sample: 'http://localhost:8080/dtm-hook'. this hook will be called like this:
## curl -H "Content-Type: application/json" -d '{"gid":"xxxx","status":"submitted","retry_count":3}' http://localhost:8080/dtm-hook

2
dtmsvr/config/config.go

@ -99,6 +99,8 @@ type Type struct {
Log Log `yaml:"Log"`
TimeZoneOffset string `yaml:"TimeZoneOffset"`
ConfigUpdateInterval int64 `yaml:"ConfigUpdateInterval" default:"3"`
AlertRetryLimit int64 `yaml:"AlertRetryLimit" default:"3"`
AlertWebHook string `yaml:"AlertWebHook"`
}
// Config config

21
dtmsvr/trans_status.go

@ -3,6 +3,7 @@ package dtmsvr
import (
"errors"
"fmt"
"math"
"net/url"
"strings"
"time"
@ -14,6 +15,7 @@ import (
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtmdriver"
"github.com/dtm-labs/logger"
"github.com/gin-gonic/gin"
"github.com/lithammer/shortuuid/v3"
"google.golang.org/grpc/metadata"
)
@ -221,13 +223,28 @@ func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
}
branchMetrics(t, branch, status == dtmcli.StatusSucceed)
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval {
if err == nil && (time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval) {
t.touchCronTime(cronReset, 0)
} else if err == dtmimp.ErrOngoing {
t.touchCronTime(cronKeep, 0)
} else if err != nil {
t.touchCronTime(cronBackoff, 0)
v := t.NextCronInterval / t.getNextCronInterval(cronReset)
retryCount := int64(math.Log2(float64(v)))
logger.Debugf("origin: %d v: %d retryCount: %d", t.getNextCronInterval(cronReset), v, retryCount)
if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" {
_, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
"gid": t.Gid,
"status": t.Status,
"branch": branch.URL,
"error": err.Error(),
"retry_count": retryCount,
}).Post(conf.AlertWebHook)
if err2 != nil {
logger.Errorf("alerting webhook error: %v", err2)
}
}
}
return err
}

12
test/busi/base_http.go

@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"strings"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -49,6 +50,9 @@ func SetSleepCancelHandler(handler SleepCancelHandler) {
sleepCancelHandler = handler
}
// WebHookResult 1
var WebHookResult gin.H
// BaseAppStartup base app startup
func BaseAppStartup() *gin.Engine {
logger.Infof("examples starting")
@ -221,4 +225,12 @@ func BaseAddRoute(app *gin.Engine) {
retryNums = 3
return nil
}))
app.POST(BusiAPI+"/AlertWebHook", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} {
err := ctx.BindJSON(&WebHookResult)
dtmimp.FatalIfError(err)
if strings.Contains(WebHookResult["gid"].(string), "Error") {
return errors.New("gid contains 'Error', so return error")
}
return nil
}))
}

1
test/main_test.go

@ -31,6 +31,7 @@ func TestMain(m *testing.M) {
dtmsvr.CronForwardDuration = 180 * time.Second
conf.UpdateBranchSync = 1
conf.ConfigUpdateInterval = 1
conf.AlertWebHook = busi.Busi + "/AlertWebHook"
dtmdriver.Middlewares.HTTP = append(dtmdriver.Middlewares.HTTP, busi.SetHTTPHeaderForHeadersYes)
dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes)

41
test/msg_webhook_test.go

@ -0,0 +1,41 @@
package test
import (
"testing"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgWebhook(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
busi.MainSwitch.TransInResult.SetOnce("ERROR")
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
busi.MainSwitch.TransInResult.SetOnce("ERROR")
cronTransOnce(t, msg.Gid)
busi.MainSwitch.TransInResult.SetOnce("ERROR")
cronTransOnce(t, msg.Gid)
assert.Equal(t, msg.Gid, busi.WebHookResult["gid"])
cronTransOnce(t, msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
func TestMsgWebhookError(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
busi.MainSwitch.TransInResult.SetOnce("ERROR")
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
busi.MainSwitch.TransInResult.SetOnce("ERROR")
cronTransOnce(t, msg.Gid)
busi.MainSwitch.TransInResult.SetOnce("ERROR")
cronTransOnce(t, msg.Gid)
assert.Equal(t, msg.Gid, busi.WebHookResult["gid"])
cronTransOnce(t, msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}

2
test/types.go

@ -33,7 +33,7 @@ func waitTransProcessed(gid string) {
case id := <-dtmsvr.TransProcessedTestChan:
logger.FatalfIf(id != gid, "------- expecting: %s but %s found", gid, id)
logger.Debugf("finish for gid %s", gid)
case <-time.After(time.Duration(time.Second * 4)):
case <-time.After(time.Duration(time.Second * 40000)):
logger.FatalfIf(true, "Wait Trans timeout")
}
}

Loading…
Cancel
Save