diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index eb0967d..879ffc2 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/go-resty/resty/v2" ) @@ -43,6 +44,7 @@ func (g *BranchIDGen) CurrentSubBranchID() string { type TransOptions struct { WaitResult bool `json:"wait_result,omitempty" gorm:"-"` TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc + RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` @@ -76,6 +78,11 @@ func NewTransBase(gid string, transType string, dtm string, branchID string) *Tr } } +// WithGlobalTransRequestTimeout defines global trans request timeout +func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) { + t.RequestTimeout = timeout +} + // TransBaseFromQuery construct transaction info from request func TransBaseFromQuery(qs url.Values) *TransBase { return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id")) @@ -83,6 +90,9 @@ func TransBaseFromQuery(qs url.Values) *TransBase { // TransCallDtm TransBase call dtm func TransCallDtm(tb *TransBase, body interface{}, operation string) error { + if tb.RequestTimeout != 0 { + RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) + } resp, err := RestyClient.R(). SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 23876b4..d2fd8dc 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -77,6 +77,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa return nil } if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { + if t.RequestTimeout != 0 { + dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second) + } resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). SetQueryParams(map[string]string{ "gid": t.Gid, diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 513190c..0c2da0d 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -188,7 +188,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } logger.Debugf("branch done: %v", r) - case <-time.After(time.Duration(time.Second * 3)): + case <-time.After(time.Second * 3): logger.Debugf("wait once for done") } } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index f5c9250..5725694 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -152,7 +152,9 @@ func BaseAddRoute(app *gin.Engine) { return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) }) })) - + app.POST(BusiAPI+"/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut") + })) app.POST(BusiAPI+"/TransInTccNested", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) logger.FatalIfError(err) diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 090074f..879a440 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -57,6 +57,17 @@ func TestSagaOptionsTimeout(t *testing.T) { assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) } +func TestSagaGlobalTransWithRequestTimeout(t *testing.T) { + gid := dtmimp.GetFuncName() + saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) + saga.WaitResult = true + saga.Add(busi.Busi+"/TransOutTimeout", "", nil) + saga.WithGlobalTransRequestTimeout(6) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(gid) +} + func TestSagaOptionsNormalWait(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) saga.WaitResult = true