Browse Source

feat: transOption add requestTimeout option for resets global trans request timeout

pull/222/head
liulei 4 years ago
parent
commit
ffaafd4509
  1. 12
      dtmcli/dtmimp/trans_base.go
  2. 3
      dtmsvr/trans_status.go
  3. 6
      dtmsvr/trans_type_saga.go
  4. 6
      test/busi/base_http.go
  5. 1
      test/busi/base_types.go
  6. 2
      test/main_test.go
  7. 11
      test/saga_options_test.go

12
dtmcli/dtmimp/trans_base.go

@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"strings"
"time"
"github.com/go-resty/resty/v2"
)
@ -43,7 +44,8 @@ func (g *BranchIDGen) CurrentSubBranchID() string {
type TransOptions struct {
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc
RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"`
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"`
}
@ -76,6 +78,11 @@ func NewTransBase(gid string, transType string, dtm string, branchID string) *Tr
}
}
// WithGlobalTransRequestTimeout defines global trans request timeout
func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) {
t.RequestTimeout = timeout
}
// TransBaseFromQuery construct transaction info from request
func TransBaseFromQuery(qs url.Values) *TransBase {
return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id"))
@ -83,6 +90,9 @@ func TransBaseFromQuery(qs url.Values) *TransBase {
// TransCallDtm TransBase call dtm
func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
if tb.RequestTimeout != 0 {
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
}
resp, err := RestyClient.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
if err != nil {

3
dtmsvr/trans_status.go

@ -77,6 +77,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
return nil
}
if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
if t.RequestTimeout != 0 {
dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second)
}
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,

6
dtmsvr/trans_type_saga.go

@ -170,6 +170,10 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
}
waitDoneOnce := func() {
timeAfter := time.Second * 3
if t.RequestTimeout != 0 {
timeAfter = time.Duration(t.RequestTimeout) * time.Second
}
select {
case r := <-resultChan:
br := &branchResults[r.index]
@ -188,7 +192,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
}
logger.Debugf("branch done: %v", r)
case <-time.After(time.Duration(time.Second * 3)):
case <-time.After(timeAfter):
logger.Debugf("wait once for done")
}
}

6
test/busi/base_http.go

@ -10,6 +10,7 @@ import (
"database/sql"
"errors"
"fmt"
"time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -152,7 +153,10 @@ func BaseAddRoute(app *gin.Engine) {
return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult)
})
}))
app.POST(BusiAPI + "/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
time.Sleep(time.Second * 4)
return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut")
}))
app.POST(BusiAPI+"/TransInTccNested", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
logger.FatalIfError(err)

1
test/busi/base_types.go

@ -26,6 +26,7 @@ var BusiConf = dtmcli.DBConf{
Host: StoreHost,
Port: 3306,
User: "root",
Password: "111111",
}
// UserAccount 1

2
test/main_test.go

@ -48,7 +48,7 @@ func TestMain(m *testing.M) {
conf.Store.Host = "localhost"
conf.Store.Port = 3306
conf.Store.User = "root"
conf.Store.Password = ""
conf.Store.Password = "111111"
} else {
conf.Store.Driver = "redis"
conf.Store.Host = "localhost"

11
test/saga_options_test.go

@ -57,6 +57,17 @@ func TestSagaOptionsTimeout(t *testing.T) {
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
}
func TestSagaGlobalTransWithRequestTimeout(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
saga.WaitResult = true
saga.Add(busi.Busi + "/TransOutTimeout", "", nil)
saga.WithGlobalTransRequestTimeout(6)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(gid)
}
func TestSagaOptionsNormalWait(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.WaitResult = true

Loading…
Cancel
Save