Browse Source

Merge pull request #172 from dtm-labs/alpha

Add PrepareAndCommit for grpc
pull/173/head v1.9.1
yedf2 4 years ago
committed by GitHub
parent
commit
fcd3f30b65
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dtmcli/barrier.go
  2. 2
      dtmcli/msg.go
  3. 11
      dtmcli/types.go
  4. 20
      dtmgrpc/msg.go
  5. 5
      test/busi/barrier.go
  6. 1
      test/dtmsvr_test.go
  7. 4
      test/main_test.go
  8. 54
      test/msg_grpc_barrier_test.go

6
dtmcli/barrier.go

@ -95,10 +95,10 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
// CallWithDB the same as Call, but with *sql.DB // CallWithDB the same as Call, but with *sql.DB
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error { func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {
tx, err := db.Begin() tx, err := db.Begin()
if err != nil { if err == nil {
return err err = bb.Call(tx, busiCall)
} }
return bb.Call(tx, busiCall) return err
} }
// QueryPrepared queries prepared data // QueryPrepared queries prepared data

2
dtmcli/msg.go

@ -40,7 +40,7 @@ func (s *Msg) Submit() error {
return dtmimp.TransCallDtm(&s.TransBase, s, "submit") return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
} }
// PrepareAndSubmit execs prepare and submit operation // PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error { func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil { if err == nil {

11
dtmcli/types.go

@ -57,14 +57,9 @@ func SetBarrierTableName(tablename string) {
dtmimp.BarrierTableName = tablename dtmimp.BarrierTableName = tablename
} }
// OnBeforeRequest add before request middleware // GetRestyClient get the resty.Client for http request
func OnBeforeRequest(middleware func(c *resty.Client, r *resty.Request) error) { func GetRestyClient() *resty.Client {
dtmimp.RestyClient.OnBeforeRequest(middleware) return dtmimp.RestyClient
}
// OnAfterResponse add after request middleware
func OnAfterResponse(middleware func(c *resty.Client, resp *resty.Response) error) {
dtmimp.RestyClient.OnAfterResponse(middleware)
} }
// SetPassthroughHeaders experimental. // SetPassthroughHeaders experimental.

20
dtmgrpc/msg.go

@ -7,6 +7,8 @@
package dtmgrpc package dtmgrpc
import ( import (
"database/sql"
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
@ -40,3 +42,21 @@ func (s *MsgGrpc) Prepare(queryPrepared string) error {
func (s *MsgGrpc) Submit() error { func (s *MsgGrpc) Submit() error {
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit") return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
} }
// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *MsgGrpc) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = bb.CallWithDB(db, func(tx *sql.Tx) error {
err := busiCall(tx)
if err == nil {
err = s.Prepare(queryPrepared)
}
return err
})
}
if err == nil {
err = s.Submit()
}
return err
}

5
test/busi/barrier.go

@ -122,3 +122,8 @@ func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emp
return sagaGrpcAdjustBalance(tx, TransOutUID, in.Amount, "") return sagaGrpcAdjustBalance(tx, TransOutUID, in.Amount, "")
}) })
} }
func (s *busiServer) QueryPreparedB(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.QueryPrepared(dbGet().ToSQLDB())
}

1
test/dtmsvr_test.go

@ -18,6 +18,7 @@ import (
) )
var DtmServer = dtmutil.DefaultHTTPServer var DtmServer = dtmutil.DefaultHTTPServer
var DtmGrpcServer = dtmutil.DefaultGrpcServer
var Busi = busi.Busi var Busi = busi.Busi
func getTransStatus(gid string) string { func getTransStatus(gid string) string {

4
test/main_test.go

@ -37,8 +37,8 @@ func TestMain(m *testing.M) {
conf.UpdateBranchSync = 1 conf.UpdateBranchSync = 1
dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes) dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes)
dtmcli.OnBeforeRequest(busi.SetHttpHeaderForHeadersYes) dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHttpHeaderForHeadersYes)
dtmcli.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil }) dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil })
tenv := os.Getenv("TEST_STORE") tenv := os.Getenv("TEST_STORE")
if tenv == "boltdb" { if tenv == "boltdb" {

54
test/msg_grpc_barrier_test.go

@ -0,0 +1,54 @@
package test
import (
"database/sql"
"errors"
"reflect"
"testing"
"bou.ke/monkey"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgGrpcPrepareAndSubmit(t *testing.T) {
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
})
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assertNotSameBalance(t, before)
}
func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
var guard *monkey.PatchGuard
err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
guard.Unpatch()
_ = tx.Commit()
return errors.New("test error for patch")
})
return err
})
assert.Error(t, err)
cronTransOnceForwardNow(180)
assertNotSameBalance(t, before)
}
Loading…
Cancel
Save