Browse Source

msg passed

pull/195/head
yedf2 4 years ago
parent
commit
3a1aeccc21
  1. 6
      dtmcli/dtmimp/trans_base.go
  2. 7
      dtmcli/msg.go
  3. 2
      dtmcli/tcc.go
  4. 2
      dtmcli/xa.go
  5. 4
      test/msg_barrier_test.go

6
dtmcli/dtmimp/trans_base.go

@ -106,8 +106,8 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
return TransCallDtm(tb, m, operation) return TransCallDtm(tb, m, operation)
} }
// TransRequestBranch TransBAse request branch result // TransRequestBranch TransBase request branch result
func TransRequestBranch(t *TransBase, body interface{}, branchID string, op string, url string) (*resty.Response, error) { func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
resp, err := RestyClient.R(). resp, err := RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(map[string]string{ SetQueryParams(map[string]string{
@ -118,7 +118,7 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri
"op": op, "op": op,
}). }).
SetHeaders(t.BranchHeaders). SetHeaders(t.BranchHeaders).
Post(url) Execute(method, url)
if err == nil { if err == nil {
err = RespAsErrorCompatible(resp) err = RespAsErrorCompatible(resp)
} }

7
dtmcli/msg.go

@ -8,6 +8,7 @@ package dtmcli
import ( import (
"database/sql" "database/sql"
"errors"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
) )
@ -55,10 +56,10 @@ func (s *Msg) PrepareAndSubmitBarrier(queryPrepared string, busiCall func(bb *Br
} }
if err == nil { if err == nil {
err = busiCall(bb) err = busiCall(bb)
if err != nil && err != ErrFailure { if err != nil && !errors.Is(err, ErrFailure) {
_, err = dtmimp.TransRequestBranch(&s.TransBase, nil, bb.BranchID, bb.Op, queryPrepared) _, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared)
} }
if err == ErrFailure { if errors.Is(err, ErrFailure) {
_ = dtmimp.TransCallDtm(&s.TransBase, s, "abort") _ = dtmimp.TransCallDtm(&s.TransBase, s, "abort")
} }
} }

2
dtmcli/tcc.go

@ -75,5 +75,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
if err != nil { if err != nil {
return nil, err return nil, err
} }
return dtmimp.TransRequestBranch(&t.TransBase, body, branchID, BranchTry, tryURL) return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, BranchTry, tryURL)
} }

2
dtmcli/xa.go

@ -101,5 +101,5 @@ func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc Xa
// CallBranch call a xa branch // CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewSubBranchID() branchID := x.NewSubBranchID()
return dtmimp.TransRequestBranch(&x.TransBase, body, branchID, BranchAction, url) return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, BranchAction, url)
} }

4
test/msg_barrier_test.go

@ -99,7 +99,7 @@ func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) {
}) })
return err return err
}) })
assert.Error(t, err) assert.Nil(t, err) // final commit will ignore error after submit
cronTransOnceForwardNow(180) waitTransProcessed(gid)
assertNotSameBalance(t, before, "mysql") assertNotSameBalance(t, before, "mysql")
} }

Loading…
Cancel
Save