Browse Source

branch conflict detect ok

pull/328/head
yedf2 4 years ago
parent
commit
ea15146423
  1. 6
      dtmgrpc/workflow/workflow.go
  2. 20
      dtmsvr/storage/boltdb/boltdb.go
  3. 11
      dtmsvr/storage/redis/redis.go
  4. 6
      dtmsvr/storage/sql/sql.go
  5. 2
      test/busi/base_http.go
  6. 16
      test/busi/base_types.go
  7. 2
      test/busi/utils.go
  8. 2
      test/tcc_barrier_test.go
  9. 46
      test/workflow_test.go

6
dtmgrpc/workflow/workflow.go

@ -85,9 +85,9 @@ func (wf *Workflow) NewRequest() *resty.Request {
return wf.restyClient.R().SetContext(wf.Context) return wf.restyClient.R().SetContext(wf.Context)
} }
// DefineSagaPhase2 will define a saga branch transaction // AddSagaPhase2 will define a saga branch transaction
// param compensate specify a function for the compensation of next workflow action // param compensate specify a function for the compensation of next workflow action
func (wf *Workflow) DefineSagaPhase2(compensate WfPhase2Func) { func (wf *Workflow) AddSagaPhase2(compensate WfPhase2Func) {
branchID := wf.currentBranch branchID := wf.currentBranch
wf.failedOps = append(wf.failedOps, workflowPhase2Item{ wf.failedOps = append(wf.failedOps, workflowPhase2Item{
branchID: branchID, branchID: branchID,
@ -98,7 +98,7 @@ func (wf *Workflow) DefineSagaPhase2(compensate WfPhase2Func) {
// DefineSagaPhase2 will define a tcc branch transaction // DefineSagaPhase2 will define a tcc branch transaction
// param confirm, concel specify the confirm and cancel operation of next workflow action // param confirm, concel specify the confirm and cancel operation of next workflow action
func (wf *Workflow) DefineTccPhase2(confirm, cancel WfPhase2Func) { func (wf *Workflow) AddTccPhase2(confirm, cancel WfPhase2Func) {
branchID := wf.currentBranch branchID := wf.currentBranch
wf.failedOps = append(wf.failedOps, workflowPhase2Item{ wf.failedOps = append(wf.failedOps, workflowPhase2Item{
branchID: branchID, branchID: branchID,

20
dtmsvr/storage/boltdb/boltdb.go

@ -69,12 +69,12 @@ func initializeBuckets(db *bolt.DB) error {
// cleanupExpiredData will clean the expired data in boltdb, the // cleanupExpiredData will clean the expired data in boltdb, the
// expired time is configurable. // expired time is configurable.
func cleanupExpiredData(expiredSeconds time.Duration, db *bolt.DB) error { func cleanupExpiredData(expire time.Duration, db *bolt.DB) error {
if expiredSeconds <= 0 { if expire <= 0 {
return nil return nil
} }
lastKeepTime := time.Now().Add(-expiredSeconds) lastKeepTime := time.Now().Add(-expire)
return db.Update(func(t *bolt.Tx) error { return db.Update(func(t *bolt.Tx) error {
globalBucket := t.Bucket(bucketGlobal) globalBucket := t.Bucket(bucketGlobal)
if globalBucket == nil { if globalBucket == nil {
@ -209,9 +209,15 @@ func tPutGlobal(t *bolt.Tx, global *storage.TransGlobalStore) {
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func tPutBranches(t *bolt.Tx, branches []storage.TransBranchStore, start int64) { func tPutBranches(t *bolt.Tx, branches []storage.TransBranchStore, start int64) error {
if start == -1 { if start == -1 {
bs := tGetBranches(t, branches[0].Gid) b0 := &branches[0]
bs := tGetBranches(t, b0.Gid)
for _, b := range bs {
if b.BranchID == b0.BranchID && b.Op == b0.Op {
return storage.ErrUniqueConflict
}
}
start = int64(len(bs)) start = int64(len(bs))
} }
for i, b := range branches { for i, b := range branches {
@ -220,6 +226,7 @@ func tPutBranches(t *bolt.Tx, branches []storage.TransBranchStore, start int64)
err := t.Bucket(bucketBranches).Put([]byte(k), []byte(v)) err := t.Bucket(bucketBranches).Put([]byte(k), []byte(v))
dtmimp.E2P(err) dtmimp.E2P(err)
} }
return nil
} }
func tDelIndex(t *bolt.Tx, unix int64, gid string) { func tDelIndex(t *bolt.Tx, unix int64, gid string) {
@ -323,8 +330,7 @@ func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []sto
if g.Status != status { if g.Status != status {
return storage.ErrNotFound return storage.ErrNotFound
} }
tPutBranches(t, branches, int64(branchStart)) return tPutBranches(t, branches, int64(branchStart))
return nil
}) })
dtmimp.E2P(err) dtmimp.E2P(err)
} }

11
dtmsvr/storage/redis/redis.go

@ -198,6 +198,17 @@ if old ~= ARGV[3] then
return 'NOT_FOUND' return 'NOT_FOUND'
end end
local start = ARGV[4] local start = ARGV[4]
-- check duplicates for workflow
if start == "-1" then
local t = cjson.decode(ARGV[5])
local bs = redis.call('LRANGE', KEYS[2], 0, -1)
for i = 1, table.getn(bs) do
local c = cjson.decode(bs[i])
if t['branch_id'] == c['branch_id'] and t['op'] == c['op'] then
return 'UNIQUE_CONFLICT'
end
end
end
for k = 5, table.getn(ARGV) do for k = 5, table.getn(ARGV) do
if start == "-1" then if start == "-1" then
redis.call('RPUSH', KEYS[2], ARGV[k]) redis.call('RPUSH', KEYS[2], ARGV[k])

6
dtmsvr/storage/sql/sql.go

@ -89,7 +89,11 @@ func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []sto
g := &storage.TransGlobalStore{} g := &storage.TransGlobalStore{}
dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g) dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g)
if dbr.Error == nil { if dbr.Error == nil {
dbr = tx.Save(branches) if branchStart == -1 {
dbr = tx.Create(branches)
} else {
dbr = tx.Save(branches)
}
} }
return wrapError(dbr.Error) return wrapError(dbr.Error)
}) })

2
test/busi/base_http.go

@ -158,7 +158,7 @@ func BaseAddRoute(app *gin.Engine) {
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
logger.FatalIfError(err) logger.FatalIfError(err)
logger.Debugf("TransInTccNested ") logger.Debugf("TransInTccNested ")
resp, err := tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") resp, err := tcc.CallBranch(&ReqHttp{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
if err != nil { if err != nil {
return err return err
} }

16
test/busi/base_types.go

@ -60,21 +60,21 @@ func GetBalanceByUID(uid int, store string) int {
return dtmimp.MustAtoi(ua.Balance[:len(ua.Balance)-3]) return dtmimp.MustAtoi(ua.Balance[:len(ua.Balance)-3])
} }
// TransReq transaction request payload // ReqHttp transaction request payload
type TransReq struct { type ReqHttp struct {
Amount int `json:"amount"` Amount int `json:"amount"`
TransInResult string `json:"trans_in_result"` TransInResult string `json:"trans_in_result"`
TransOutResult string `json:"trans_out_Result"` TransOutResult string `json:"trans_out_Result"`
Store string `json:"store"` // default mysql, value can be mysql|redis Store string `json:"store"` // default mysql, value can be mysql|redis
} }
func (t *TransReq) String() string { func (t *ReqHttp) String() string {
return fmt.Sprintf("amount: %d transIn: %s transOut: %s", t.Amount, t.TransInResult, t.TransOutResult) return fmt.Sprintf("amount: %d transIn: %s transOut: %s", t.Amount, t.TransInResult, t.TransOutResult)
} }
// GenTransReq 1 // GenTransReq 1
func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { func GenTransReq(amount int, outFailed bool, inFailed bool) *ReqHttp {
return &TransReq{ return &ReqHttp{
Amount: amount, Amount: amount,
TransOutResult: dtmimp.If(outFailed, dtmcli.ResultFailure, "").(string), TransOutResult: dtmimp.If(outFailed, dtmcli.ResultFailure, "").(string),
TransInResult: dtmimp.If(inFailed, dtmcli.ResultFailure, "").(string), TransInResult: dtmimp.If(inFailed, dtmcli.ResultFailure, "").(string),
@ -90,16 +90,16 @@ func GenBusiReq(amount int, outFailed bool, inFailed bool) *BusiReq {
} }
} }
func reqFrom(c *gin.Context) *TransReq { func reqFrom(c *gin.Context) *ReqHttp {
v, ok := c.Get("trans_req") v, ok := c.Get("trans_req")
if !ok { if !ok {
req := TransReq{} req := ReqHttp{}
err := c.BindJSON(&req) err := c.BindJSON(&req)
logger.FatalIfError(err) logger.FatalIfError(err)
c.Set("trans_req", &req) c.Set("trans_req", &req)
v = &req v = &req
} }
return v.(*TransReq) return v.(*ReqHttp)
} }
func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier { func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {

2
test/busi/utils.go

@ -25,6 +25,8 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
type ReqGrpc = BusiReq
func dbGet() *dtmutil.DB { func dbGet() *dtmutil.DB {
return dtmutil.DbGet(BusiConf) return dtmutil.DbGet(BusiConf)
} }

2
test/tcc_barrier_test.go

@ -69,7 +69,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
gid := dtmimp.GetFuncName() + store gid := dtmimp.GetFuncName() + store
cronFinished := make(chan string, 2) cronFinished := make(chan string, 2)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
body := &busi.TransReq{Amount: 30, Store: store} body := &busi.ReqHttp{Amount: 30, Store: store}
tryURL := Busi + "/TccBTransOutTry" tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm" confirmURL := Busi + "/TccBTransOutConfirm"
cancelURL := Busi + "/SleepCancel" cancelURL := Busi + "/SleepCancel"

46
test/workflow_test.go

@ -9,12 +9,15 @@ package test
import ( import (
"database/sql" "database/sql"
"testing" "testing"
"time"
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc/workflow" "github.com/dtm-labs/dtm/dtmgrpc/workflow"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/test/busi" "github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -25,7 +28,7 @@ func TestWorkflowNormal(t *testing.T) {
gid := dtmimp.GetFuncName() gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.TransReq var req busi.ReqHttp
dtmimp.MustUnmarshal(data, &req) dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TransOut") _, err := wf.NewRequest().SetBody(req).Post(Busi + "/TransOut")
if err != nil { if err != nil {
@ -47,13 +50,13 @@ func TestWorkflowNormal(t *testing.T) {
func TestWorkflowRollback(t *testing.T) { func TestWorkflowRollback(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenTransReq(30, false, true) req := &busi.ReqHttp{Amount: 30, TransInResult: dtmimp.ResultFailure}
gid := dtmimp.GetFuncName() gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.TransReq var req busi.ReqHttp
dtmimp.MustUnmarshal(data, &req) dtmimp.MustUnmarshal(data, &req)
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { wf.AddSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom") _, err := wf.NewRequest().SetBody(req).Post(Busi + "/SagaBTransOutCom")
return err return err
}) })
@ -65,7 +68,7 @@ func TestWorkflowRollback(t *testing.T) {
if err != nil { if err != nil {
return err return err
} }
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { wf.AddSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { return bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "") return busi.SagaAdjustBalance(tx, busi.TransInUID, -req.Amount, "")
}) })
@ -90,7 +93,7 @@ func TestWorkflowGrpcNormal(t *testing.T) {
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq var req busi.BusiReq
dtmgimp.MustProtoUnmarshal(data, &req) dtmgimp.MustProtoUnmarshal(data, &req)
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { wf.AddSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err return err
}) })
@ -98,7 +101,7 @@ func TestWorkflowGrpcNormal(t *testing.T) {
if err != nil { if err != nil {
return err return err
} }
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { wf.AddSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req) _, err := busi.BusiCli.TransInRevertBSaga(wf.Context, &req)
return err return err
}) })
@ -133,7 +136,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) {
if fetchOngoingStep(0) { if fetchOngoingStep(0) {
return dtmcli.ErrOngoing return dtmcli.ErrOngoing
} }
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { wf.AddSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(4) { if fetchOngoingStep(4) {
return dtmcli.ErrOngoing return dtmcli.ErrOngoing
} }
@ -147,7 +150,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) {
if err != nil { if err != nil {
return err return err
} }
wf.DefineSagaPhase2(func(bb *dtmcli.BranchBarrier) error { wf.AddSagaPhase2(func(bb *dtmcli.BranchBarrier) error {
if fetchOngoingStep(3) { if fetchOngoingStep(3) {
return dtmcli.ErrOngoing return dtmcli.ErrOngoing
} }
@ -263,3 +266,28 @@ func TestWorkflowXaResume(t *testing.T) {
cronTransOnceForwardNow(t, gid, 1000) cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid)) assert.Equal(t, StatusSucceed, getTransStatus(gid))
} }
func TestWorkflowBranchConflict(t *testing.T) {
gid := dtmimp.GetFuncName()
store := dtmsvr.GetStore()
now := time.Now()
g := &storage.TransGlobalStore{
Gid: gid,
Status: dtmcli.StatusPrepared,
NextCronTime: &now,
}
err := store.MaySaveNewTrans(g, []storage.TransBranchStore{
{
BranchID: "00",
Op: dtmimp.OpAction,
},
})
assert.Nil(t, err)
err = dtmimp.CatchP(func() {
store.LockGlobalSaveBranches(gid, dtmcli.StatusPrepared, []storage.TransBranchStore{
{BranchID: "00", Op: dtmimp.OpAction},
}, -1)
})
assert.Equal(t, storage.ErrUniqueConflict, err)
store.ChangeGlobalStatus(g, StatusSucceed, []string{}, true)
}

Loading…
Cancel
Save