Browse Source

grpc PrepareAndSubmit added

pull/172/head
yedf2 4 years ago
parent
commit
9bcda551e8
  1. 1
      dtmcli/msg.go
  2. 20
      dtmgrpc/msg.go
  3. 5
      test/busi/barrier.go
  4. 1
      test/dtmsvr_test.go
  5. 54
      test/msg_grpc_barrier_test.go

1
dtmcli/msg.go

@ -40,6 +40,7 @@ func (s *Msg) Submit() error {
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {

20
dtmgrpc/msg.go

@ -7,6 +7,8 @@
package dtmgrpc
import (
"database/sql"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
@ -40,3 +42,21 @@ func (s *MsgGrpc) Prepare(queryPrepared string) error {
func (s *MsgGrpc) Submit() error {
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
}
// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *MsgGrpc) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = bb.CallWithDB(db, func(tx *sql.Tx) error {
err := busiCall(tx)
if err == nil {
err = s.Prepare(queryPrepared)
}
return err
})
}
if err == nil {
err = s.Submit()
}
return err
}

5
test/busi/barrier.go

@ -122,3 +122,8 @@ func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emp
return sagaGrpcAdjustBalance(tx, TransOutUID, in.Amount, "")
})
}
func (s *busiServer) QueryPreparedB(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.QueryPrepared(dbGet().ToSQLDB())
}

1
test/dtmsvr_test.go

@ -18,6 +18,7 @@ import (
)
var DtmServer = dtmutil.DefaultHttpServer
var DtmGrpcServer = dtmutil.DefaultGrpcServer
var Busi = busi.Busi
func getTransStatus(gid string) string {

54
test/msg_grpc_barrier_test.go

@ -0,0 +1,54 @@
package test
import (
"database/sql"
"errors"
"reflect"
"testing"
"bou.ke/monkey"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func TestMsgGrpcPrepareAndSubmit(t *testing.T) {
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
})
assert.Nil(t, err)
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
assertNotSameBalance(t, before)
}
func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) {
if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit
return
}
before := getBeforeBalances()
gid := dtmimp.GetFuncName()
req := busi.GenBusiReq(30, false, false)
msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req)
var guard *monkey.PatchGuard
err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS")
guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error {
guard.Unpatch()
_ = tx.Commit()
return errors.New("test error for patch")
})
return err
})
assert.Error(t, err)
cronTransOnceForwardNow(180)
assertNotSameBalance(t, before)
}
Loading…
Cancel
Save