Browse Source

Merge pull request #71 from yedf/alpha

barrier interface change to sql.Tx
pull/72/head
yedf2 4 years ago
committed by GitHub
parent
commit
c50a9720be
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bench/http.go
  2. 16
      dtmcli/barrier.go
  3. 7
      dtmcli/dtmimp/types.go
  4. 3
      dtmcli/types.go
  5. 15
      examples/grpc_saga_barrier.go
  6. 18
      examples/http_saga_barrier.go
  7. 2
      examples/http_saga_gorm_barrier.go
  8. 25
      examples/http_tcc_barrier.go
  9. 5
      test/base_test.go
  10. 2
      test/tcc_barrier_test.go

2
bench/http.go

@ -97,7 +97,7 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, nil return dtmcli.MapSuccess, nil
} }
tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query()) tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query())
f := func(tx dtmcli.DB) error { f := func(tx *sql.Tx) error {
for i := 0; i < sqls; i++ { for i := 0; i < sqls; i++ {
_, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)", _, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)",
uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id"))) uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id")))

16
dtmcli/barrier.go

@ -7,6 +7,7 @@
package dtmcli package dtmcli
import ( import (
"database/sql"
"fmt" "fmt"
"net/url" "net/url"
@ -14,7 +15,7 @@ import (
) )
// BarrierBusiFunc type for busi func // BarrierBusiFunc type for busi func
type BarrierBusiFunc func(db DB) error type BarrierBusiFunc func(tx *sql.Tx) error
// BranchBarrier every branch info // BranchBarrier every branch info
type BranchBarrier struct { type BranchBarrier struct {
@ -48,7 +49,7 @@ func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) {
return ti, nil return ti, nil
} }
func insertBarrier(tx Tx, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) { func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
if op == "" { if op == "" {
return 0, nil return 0, nil
} }
@ -59,7 +60,7 @@ func insertBarrier(tx Tx, transType string, gid string, branchID string, op stri
// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 // Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465
// tx: 本地数据库的事务对象,允许子事务屏障进行事务操作 // tx: 本地数据库的事务对象,允许子事务屏障进行事务操作
// busiCall: 业务函数,仅在必要时被调用 // busiCall: 业务函数,仅在必要时被调用
func (bb *BranchBarrier) Call(tx Tx, busiCall BarrierBusiFunc) (rerr error) { func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
bb.BarrierID = bb.BarrierID + 1 bb.BarrierID = bb.BarrierID + 1
bid := fmt.Sprintf("%02d", bb.BarrierID) bid := fmt.Sprintf("%02d", bb.BarrierID)
defer func() { defer func() {
@ -89,3 +90,12 @@ func (bb *BranchBarrier) Call(tx Tx, busiCall BarrierBusiFunc) (rerr error) {
rerr = busiCall(tx) rerr = busiCall(tx)
return return
} }
// CallWithDB the same as Call, but with *sql.DB
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {
tx, err := db.Begin()
if err != nil {
return err
}
return bb.Call(tx, busiCall)
}

7
dtmcli/dtmimp/types.go

@ -13,10 +13,3 @@ type DB interface {
Exec(query string, args ...interface{}) (sql.Result, error) Exec(query string, args ...interface{}) (sql.Result, error)
QueryRow(query string, args ...interface{}) *sql.Row QueryRow(query string, args ...interface{}) *sql.Row
} }
// Tx interface of dtmcli tx
type Tx interface {
Rollback() error
Commit() error
DB
}

3
dtmcli/types.go

@ -25,9 +25,6 @@ func MustGenGid(server string) string {
// DB interface // DB interface
type DB = dtmimp.DB type DB = dtmimp.DB
// Tx interface
type Tx = dtmimp.Tx
// TransOptions transaction option // TransOptions transaction option
type TransOptions = dtmimp.TransOptions type TransOptions = dtmimp.TransOptions

15
examples/grpc_saga_barrier.go

@ -8,6 +8,7 @@ package examples
import ( import (
"context" "context"
"database/sql"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
@ -41,28 +42,28 @@ func sagaGrpcBarrierAdjustBalance(db dtmcli.DB, uid int, amount int64, result st
func (s *busiServer) TransInBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { func (s *busiServer) TransInBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx) barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(tx dtmcli.DB) error { return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcBarrierAdjustBalance(tx, 2, in.Amount, in.TransInResult) return sagaGrpcBarrierAdjustBalance(tx, 2, in.Amount, in.TransInResult)
}) })
} }
func (s *busiServer) TransOutBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { func (s *busiServer) TransOutBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx) barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(db dtmcli.DB) error { return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcBarrierAdjustBalance(db, 1, -in.Amount, in.TransOutResult) return sagaGrpcBarrierAdjustBalance(tx, 1, -in.Amount, in.TransOutResult)
}) })
} }
func (s *busiServer) TransInRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { func (s *busiServer) TransInRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx) barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(db dtmcli.DB) error { return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcBarrierAdjustBalance(db, 2, -in.Amount, "") return sagaGrpcBarrierAdjustBalance(tx, 2, -in.Amount, "")
}) })
} }
func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
barrier := MustBarrierFromGrpc(ctx) barrier := MustBarrierFromGrpc(ctx)
return &emptypb.Empty{}, barrier.Call(txGet(), func(db dtmcli.DB) error { return &emptypb.Empty{}, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaGrpcBarrierAdjustBalance(db, 1, in.Amount, "") return sagaGrpcBarrierAdjustBalance(tx, 1, in.Amount, "")
}) })
} }

18
examples/http_saga_barrier.go

@ -7,6 +7,8 @@
package examples package examples
import ( import (
"database/sql"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
@ -45,15 +47,15 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
return req.TransInResult, nil return req.TransInResult, nil
} }
barrier := MustBarrierFromGin(c) barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaBarrierAdjustBalance(db, 1, req.Amount) return sagaBarrierAdjustBalance(tx, 1, req.Amount)
}) })
} }
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c) barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaBarrierAdjustBalance(db, 1, -reqFrom(c).Amount) return sagaBarrierAdjustBalance(tx, 1, -reqFrom(c).Amount)
}) })
} }
@ -63,14 +65,14 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
return req.TransOutResult, nil return req.TransOutResult, nil
} }
barrier := MustBarrierFromGin(c) barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaBarrierAdjustBalance(db, 2, -req.Amount) return sagaBarrierAdjustBalance(tx, 2, -req.Amount)
}) })
} }
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c) barrier := MustBarrierFromGin(c)
return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, barrier.Call(txGet(), func(tx *sql.Tx) error {
return sagaBarrierAdjustBalance(db, 2, reqFrom(c).Amount) return sagaBarrierAdjustBalance(tx, 2, reqFrom(c).Amount)
}) })
} }

2
examples/http_saga_gorm_barrier.go

@ -37,7 +37,7 @@ func sagaGormBarrierTransOut(c *gin.Context) (interface{}, error) {
req := reqFrom(c) req := reqFrom(c)
barrier := MustBarrierFromGin(c) barrier := MustBarrierFromGin(c)
tx := dbGet().DB.Begin() tx := dbGet().DB.Begin()
return dtmcli.MapSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(db dtmcli.DB) error { return dtmcli.MapSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(tx1 *sql.Tx) error {
return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2).Error return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2).Error
}) })
} }

25
examples/http_tcc_barrier.go

@ -7,6 +7,7 @@
package examples package examples
import ( import (
"database/sql"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -68,20 +69,20 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" { if req.TransInResult != "" {
return req.TransInResult, nil return req.TransInResult, nil
} }
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return adjustTrading(db, transInUID, req.Amount) return adjustTrading(tx, transInUID, req.Amount)
}) })
} }
func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) { func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return adjustBalance(db, transInUID, reqFrom(c).Amount) return adjustBalance(tx, transInUID, reqFrom(c).Amount)
}) })
} }
func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) { func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return adjustTrading(db, transInUID, -reqFrom(c).Amount) return adjustTrading(tx, transInUID, -reqFrom(c).Amount)
}) })
} }
@ -90,20 +91,20 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) {
if req.TransOutResult != "" { if req.TransOutResult != "" {
return req.TransOutResult, nil return req.TransOutResult, nil
} }
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return adjustTrading(db, transOutUID, -req.Amount) return adjustTrading(tx, transOutUID, -req.Amount)
}) })
} }
func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return adjustBalance(db, transOutUID, -reqFrom(c).Amount) return adjustBalance(tx, transOutUID, -reqFrom(c).Amount)
}) })
} }
// TccBarrierTransOutCancel will be use in test // TccBarrierTransOutCancel will be use in test
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(db dtmcli.DB) error { return dtmcli.MapSuccess, MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return adjustTrading(db, transOutUID, reqFrom(c).Amount) return adjustTrading(tx, transOutUID, reqFrom(c).Amount)
}) })
} }

5
test/base_test.go

@ -7,6 +7,7 @@
package test package test
import ( import (
"database/sql"
"fmt" "fmt"
"testing" "testing"
@ -38,7 +39,7 @@ func TestBaseSqlDB(t *testing.T) {
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')") db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
tx, err := db.ToSQLDB().Begin() tx, err := db.ToSQLDB().Begin()
asserts.Nil(err) asserts.Nil(err)
err = barrier.Call(tx, func(db dtmcli.DB) error { err = barrier.Call(tx, func(tx *sql.Tx) error {
dtmimp.Logf("rollback gid2") dtmimp.Logf("rollback gid2")
return fmt.Errorf("gid2 error") return fmt.Errorf("gid2 error")
}) })
@ -50,7 +51,7 @@ func TestBaseSqlDB(t *testing.T) {
barrier.BarrierID = 0 barrier.BarrierID = 0
tx2, err := db.ToSQLDB().Begin() tx2, err := db.ToSQLDB().Begin()
asserts.Nil(err) asserts.Nil(err)
err = barrier.Call(tx2, func(db dtmcli.DB) error { err = barrier.Call(tx2, func(tx *sql.Tx) error {
dtmimp.Logf("submit gid2") dtmimp.Logf("submit gid2")
return nil return nil
}) })

2
test/tcc_barrier_test.go

@ -126,7 +126,7 @@ func TestTccBarrierPanic(t *testing.T) {
func() { func() {
defer dtmimp.P2E(&err) defer dtmimp.P2E(&err)
tx, _ := dbGet().ToSQLDB().BeginTx(context.Background(), &sql.TxOptions{}) tx, _ := dbGet().ToSQLDB().BeginTx(context.Background(), &sql.TxOptions{})
bb.Call(tx, func(db dtmcli.DB) error { bb.Call(tx, func(tx *sql.Tx) error {
panic(fmt.Errorf("an error")) panic(fmt.Errorf("an error"))
}) })
}() }()

Loading…
Cancel
Save