From ffaafd45091e38fd8215f7321d5b051200e4f231 Mon Sep 17 00:00:00 2001 From: liulei Date: Mon, 21 Feb 2022 16:03:30 +0800 Subject: [PATCH] feat: transOption add requestTimeout option for resets global trans request timeout --- dtmcli/dtmimp/trans_base.go | 12 +++++++++++- dtmsvr/trans_status.go | 3 +++ dtmsvr/trans_type_saga.go | 6 +++++- test/busi/base_http.go | 6 +++++- test/busi/base_types.go | 1 + test/main_test.go | 2 +- test/saga_options_test.go | 11 +++++++++++ 7 files changed, 37 insertions(+), 4 deletions(-) diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index eb0967d..55dcdb6 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/go-resty/resty/v2" ) @@ -43,7 +44,8 @@ func (g *BranchIDGen) CurrentSubBranchID() string { type TransOptions struct { WaitResult bool `json:"wait_result,omitempty" gorm:"-"` TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc - RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc + RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout + RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` } @@ -76,6 +78,11 @@ func NewTransBase(gid string, transType string, dtm string, branchID string) *Tr } } +// WithGlobalTransRequestTimeout defines global trans request timeout +func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) { + t.RequestTimeout = timeout +} + // TransBaseFromQuery construct transaction info from request func TransBaseFromQuery(qs url.Values) *TransBase { return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id")) @@ -83,6 +90,9 @@ func TransBaseFromQuery(qs url.Values) *TransBase { // TransCallDtm TransBase call dtm func TransCallDtm(tb *TransBase, body interface{}, operation string) error { + if tb.RequestTimeout != 0 { + RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) + } resp, err := RestyClient.R(). SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 23876b4..d2fd8dc 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -77,6 +77,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa return nil } if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { + if t.RequestTimeout != 0 { + dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second) + } resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). SetQueryParams(map[string]string{ "gid": t.Gid, diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 513190c..6caacf0 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -170,6 +170,10 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } waitDoneOnce := func() { + timeAfter := time.Second * 3 + if t.RequestTimeout != 0 { + timeAfter = time.Duration(t.RequestTimeout) * time.Second + } select { case r := <-resultChan: br := &branchResults[r.index] @@ -188,7 +192,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } logger.Debugf("branch done: %v", r) - case <-time.After(time.Duration(time.Second * 3)): + case <-time.After(timeAfter): logger.Debugf("wait once for done") } } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index f5c9250..abcd8b3 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -10,6 +10,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -152,7 +153,10 @@ func BaseAddRoute(app *gin.Engine) { return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) }) })) - + app.POST(BusiAPI + "/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + time.Sleep(time.Second * 4) + return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut") + })) app.POST(BusiAPI+"/TransInTccNested", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) logger.FatalIfError(err) diff --git a/test/busi/base_types.go b/test/busi/base_types.go index b2a4a32..5bf8063 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -26,6 +26,7 @@ var BusiConf = dtmcli.DBConf{ Host: StoreHost, Port: 3306, User: "root", + Password: "111111", } // UserAccount 1 diff --git a/test/main_test.go b/test/main_test.go index 4f17d73..128ce44 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -48,7 +48,7 @@ func TestMain(m *testing.M) { conf.Store.Host = "localhost" conf.Store.Port = 3306 conf.Store.User = "root" - conf.Store.Password = "" + conf.Store.Password = "111111" } else { conf.Store.Driver = "redis" conf.Store.Host = "localhost" diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 090074f..8e23205 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -57,6 +57,17 @@ func TestSagaOptionsTimeout(t *testing.T) { assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) } +func TestSagaGlobalTransWithRequestTimeout(t *testing.T) { + gid := dtmimp.GetFuncName() + saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) + saga.WaitResult = true + saga.Add(busi.Busi + "/TransOutTimeout", "", nil) + saga.WithGlobalTransRequestTimeout(6) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(gid) +} + func TestSagaOptionsNormalWait(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) saga.WaitResult = true