From ffaafd45091e38fd8215f7321d5b051200e4f231 Mon Sep 17 00:00:00 2001 From: liulei Date: Mon, 21 Feb 2022 16:03:30 +0800 Subject: [PATCH 1/5] feat: transOption add requestTimeout option for resets global trans request timeout --- dtmcli/dtmimp/trans_base.go | 12 +++++++++++- dtmsvr/trans_status.go | 3 +++ dtmsvr/trans_type_saga.go | 6 +++++- test/busi/base_http.go | 6 +++++- test/busi/base_types.go | 1 + test/main_test.go | 2 +- test/saga_options_test.go | 11 +++++++++++ 7 files changed, 37 insertions(+), 4 deletions(-) diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index eb0967d..55dcdb6 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/go-resty/resty/v2" ) @@ -43,7 +44,8 @@ func (g *BranchIDGen) CurrentSubBranchID() string { type TransOptions struct { WaitResult bool `json:"wait_result,omitempty" gorm:"-"` TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc - RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc + RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout + RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` } @@ -76,6 +78,11 @@ func NewTransBase(gid string, transType string, dtm string, branchID string) *Tr } } +// WithGlobalTransRequestTimeout defines global trans request timeout +func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) { + t.RequestTimeout = timeout +} + // TransBaseFromQuery construct transaction info from request func TransBaseFromQuery(qs url.Values) *TransBase { return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id")) @@ -83,6 +90,9 @@ func TransBaseFromQuery(qs url.Values) *TransBase { // TransCallDtm TransBase call dtm func TransCallDtm(tb *TransBase, body interface{}, operation string) error { + if tb.RequestTimeout != 0 { + RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second) + } resp, err := RestyClient.R(). SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 23876b4..d2fd8dc 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -77,6 +77,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa return nil } if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { + if t.RequestTimeout != 0 { + dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second) + } resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). SetQueryParams(map[string]string{ "gid": t.Gid, diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 513190c..6caacf0 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -170,6 +170,10 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } waitDoneOnce := func() { + timeAfter := time.Second * 3 + if t.RequestTimeout != 0 { + timeAfter = time.Duration(t.RequestTimeout) * time.Second + } select { case r := <-resultChan: br := &branchResults[r.index] @@ -188,7 +192,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } logger.Debugf("branch done: %v", r) - case <-time.After(time.Duration(time.Second * 3)): + case <-time.After(timeAfter): logger.Debugf("wait once for done") } } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index f5c9250..abcd8b3 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -10,6 +10,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -152,7 +153,10 @@ func BaseAddRoute(app *gin.Engine) { return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) }) })) - + app.POST(BusiAPI + "/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + time.Sleep(time.Second * 4) + return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut") + })) app.POST(BusiAPI+"/TransInTccNested", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) logger.FatalIfError(err) diff --git a/test/busi/base_types.go b/test/busi/base_types.go index b2a4a32..5bf8063 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -26,6 +26,7 @@ var BusiConf = dtmcli.DBConf{ Host: StoreHost, Port: 3306, User: "root", + Password: "111111", } // UserAccount 1 diff --git a/test/main_test.go b/test/main_test.go index 4f17d73..128ce44 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -48,7 +48,7 @@ func TestMain(m *testing.M) { conf.Store.Host = "localhost" conf.Store.Port = 3306 conf.Store.User = "root" - conf.Store.Password = "" + conf.Store.Password = "111111" } else { conf.Store.Driver = "redis" conf.Store.Host = "localhost" diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 090074f..8e23205 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -57,6 +57,17 @@ func TestSagaOptionsTimeout(t *testing.T) { assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) } +func TestSagaGlobalTransWithRequestTimeout(t *testing.T) { + gid := dtmimp.GetFuncName() + saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) + saga.WaitResult = true + saga.Add(busi.Busi + "/TransOutTimeout", "", nil) + saga.WithGlobalTransRequestTimeout(6) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(gid) +} + func TestSagaOptionsNormalWait(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) saga.WaitResult = true From 524f19ffd846d68e6294e9c73908455625ba582e Mon Sep 17 00:00:00 2001 From: liulei Date: Mon, 21 Feb 2022 16:08:44 +0800 Subject: [PATCH 2/5] revert: revert mysql config of test --- test/busi/base_types.go | 1 - test/main_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/test/busi/base_types.go b/test/busi/base_types.go index 5bf8063..b2a4a32 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -26,7 +26,6 @@ var BusiConf = dtmcli.DBConf{ Host: StoreHost, Port: 3306, User: "root", - Password: "111111", } // UserAccount 1 diff --git a/test/main_test.go b/test/main_test.go index 128ce44..4f17d73 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -48,7 +48,7 @@ func TestMain(m *testing.M) { conf.Store.Host = "localhost" conf.Store.Port = 3306 conf.Store.User = "root" - conf.Store.Password = "111111" + conf.Store.Password = "" } else { conf.Store.Driver = "redis" conf.Store.Host = "localhost" From 21256549b00d8951a5601d711f868147c66bae43 Mon Sep 17 00:00:00 2001 From: liulei Date: Mon, 21 Feb 2022 16:19:52 +0800 Subject: [PATCH 3/5] ci: fix ci lint --- dtmcli/dtmimp/trans_base.go | 4 ++-- test/busi/base_http.go | 2 +- test/saga_options_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index 55dcdb6..879ffc2 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -44,8 +44,8 @@ func (g *BranchIDGen) CurrentSubBranchID() string { type TransOptions struct { WaitResult bool `json:"wait_result,omitempty" gorm:"-"` TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc - RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout - RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc + RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout + RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index abcd8b3..28fa8cf 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -153,7 +153,7 @@ func BaseAddRoute(app *gin.Engine) { return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) }) })) - app.POST(BusiAPI + "/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + app.POST(BusiAPI+"/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { time.Sleep(time.Second * 4) return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut") })) diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 8e23205..879a440 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -61,7 +61,7 @@ func TestSagaGlobalTransWithRequestTimeout(t *testing.T) { gid := dtmimp.GetFuncName() saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid) saga.WaitResult = true - saga.Add(busi.Busi + "/TransOutTimeout", "", nil) + saga.Add(busi.Busi+"/TransOutTimeout", "", nil) saga.WithGlobalTransRequestTimeout(6) err := saga.Submit() assert.Nil(t, err) From be45d1d530eaa27e663ecef9141259e466236ca3 Mon Sep 17 00:00:00 2001 From: liulei Date: Tue, 22 Feb 2022 09:47:38 +0800 Subject: [PATCH 4/5] revert: undo request timeout test sleep operation --- dtmsvr/trans_type_saga.go | 6 +----- test/busi/base_http.go | 3 --- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 6caacf0..0c2da0d 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -170,10 +170,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } waitDoneOnce := func() { - timeAfter := time.Second * 3 - if t.RequestTimeout != 0 { - timeAfter = time.Duration(t.RequestTimeout) * time.Second - } select { case r := <-resultChan: br := &branchResults[r.index] @@ -192,7 +188,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } logger.Debugf("branch done: %v", r) - case <-time.After(timeAfter): + case <-time.After(time.Second * 3): logger.Debugf("wait once for done") } } diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 28fa8cf..0c7adde 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -10,8 +10,6 @@ import ( "database/sql" "errors" "fmt" - "time" - "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" @@ -154,7 +152,6 @@ func BaseAddRoute(app *gin.Engine) { }) })) app.POST(BusiAPI+"/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - time.Sleep(time.Second * 4) return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut") })) app.POST(BusiAPI+"/TransInTccNested", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { From 81cd54bcf1f66e40b7348e67079ef68f427573f3 Mon Sep 17 00:00:00 2001 From: liulei Date: Tue, 22 Feb 2022 09:58:02 +0800 Subject: [PATCH 5/5] ci: fix ci lint --- test/busi/base_http.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 0c7adde..5725694 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -10,6 +10,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger"