Browse Source

Merge pull request #222 from Leizhengzi/main

feat: transOption add requestTimeout option for resets global trans r…
pull/229/head
yedf2 4 years ago
committed by GitHub
parent
commit
a1d8dff9e8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dtmcli/dtmimp/trans_base.go
  2. 3
      dtmsvr/trans_status.go
  3. 2
      dtmsvr/trans_type_saga.go
  4. 4
      test/busi/base_http.go
  5. 11
      test/saga_options_test.go

10
dtmcli/dtmimp/trans_base.go

@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"strings"
"time"
"github.com/go-resty/resty/v2"
)
@ -43,6 +44,7 @@ func (g *BranchIDGen) CurrentSubBranchID() string {
type TransOptions struct {
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc
RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"`
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"`
@ -76,6 +78,11 @@ func NewTransBase(gid string, transType string, dtm string, branchID string) *Tr
}
}
// WithGlobalTransRequestTimeout defines global trans request timeout
func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) {
t.RequestTimeout = timeout
}
// TransBaseFromQuery construct transaction info from request
func TransBaseFromQuery(qs url.Values) *TransBase {
return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id"))
@ -83,6 +90,9 @@ func TransBaseFromQuery(qs url.Values) *TransBase {
// TransCallDtm TransBase call dtm
func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
if tb.RequestTimeout != 0 {
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
}
resp, err := RestyClient.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
if err != nil {

3
dtmsvr/trans_status.go

@ -77,6 +77,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa
return nil
}
if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
if t.RequestTimeout != 0 {
dtmimp.RestyClient.SetTimeout(time.Duration(t.RequestTimeout) * time.Second)
}
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,

2
dtmsvr/trans_type_saga.go

@ -188,7 +188,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
}
logger.Debugf("branch done: %v", r)
case <-time.After(time.Duration(time.Second * 3)):
case <-time.After(time.Second * 3):
logger.Debugf("wait once for done")
}
}

4
test/busi/base_http.go

@ -152,7 +152,9 @@ func BaseAddRoute(app *gin.Engine) {
return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult)
})
}))
app.POST(BusiAPI+"/TransOutTimeout", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "TransOut")
}))
app.POST(BusiAPI+"/TransInTccNested", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
logger.FatalIfError(err)

11
test/saga_options_test.go

@ -57,6 +57,17 @@ func TestSagaOptionsTimeout(t *testing.T) {
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
}
func TestSagaGlobalTransWithRequestTimeout(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
saga.WaitResult = true
saga.Add(busi.Busi+"/TransOutTimeout", "", nil)
saga.WithGlobalTransRequestTimeout(6)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(gid)
}
func TestSagaOptionsNormalWait(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.WaitResult = true

Loading…
Cancel
Save