Browse Source

change to en

pull/251/head
yedf2 4 years ago
parent
commit
419b76c932
  1. 6
      dtmcli/barrier.go
  2. 2
      dtmcli/dtmimp/README-cn.md
  3. 4
      dtmcli/dtmimp/README.md
  4. 3
      dtmcli/dtmimp/trans_xa_base.go
  5. 3
      dtmcli/tcc.go
  6. 2
      dtmcli/xa.go
  7. 2
      dtmgrpc/dtmgimp/README-cn.md
  8. 4
      dtmgrpc/dtmgimp/README.md
  9. 4
      dtmgrpc/dtmgimp/types.go
  10. 7
      dtmgrpc/tcc.go
  11. 2
      dtmgrpc/xa.go
  12. 2
      dtmsvr/config/config.go
  13. 2
      dtmsvr/storage/sql/sql.go
  14. 2
      dtmsvr/trans_status.go
  15. 4
      dtmutil/db.go
  16. 2
      dtmutil/utils.go
  17. 6
      helper/bench/main.go
  18. 4
      main.go
  19. 39
      sqls/dtmsvr.storage.tdsql.sql
  20. 12
      test/busi/quick_start.go
  21. 2
      test/busi/utils.go
  22. 6
      test/tcc_barrier_test.go
  23. 2
      test/xa_test.go

6
dtmcli/barrier.go

@ -63,9 +63,9 @@ func insertBarrier(tx DB, transType string, gid string, branchID string, op stri
return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}
// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465
// tx: 本地数据库的事务对象,允许子事务屏障进行事务操作
// busiCall: 业务函数,仅在必要时被调用
// Call see detail description in https://en.dtm.pub/practice/barrier.html
// tx: local transaction connection
// busiCall: busi func
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
bid := bb.newBarrierID()
defer dtmimp.DeferDo(&rerr, func() error {

2
dtmcli/dtmimp/README-cn.md

@ -0,0 +1,2 @@
## 注意
此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口

4
dtmcli/dtmimp/README.md

@ -1,2 +1,2 @@
## 注意
此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口
## Notice
Please donot use this package, and this package should only be used in dtm internally. The interfaces are not stable, and package name has postfix "imp"

3
dtmcli/dtmimp/trans_xa_base.go

@ -61,13 +61,12 @@ func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) error)
return
}
// HandleGlobalTrans http/grpc GlobalTransaction的公共方法
// HandleGlobalTrans http/grpc GlobalTransaction shared func
func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) {
rerr = callDtm("prepare")
if rerr != nil {
return
}
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
defer DeferDo(&rerr, func() error {
return callDtm("submit")
}, func() error {

3
dtmcli/tcc.go

@ -25,7 +25,7 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error)
// TccGlobalTransaction begin a tcc global transaction
// dtm dtm server address
// gid global transaction ID
// tccFunc tcc事务函数,里面会定义全局事务的分支
// tccFunc define the detail tcc busi
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
return TccGlobalTransaction2(dtm, gid, func(t *Tcc) {}, tccFunc)
}
@ -38,7 +38,6 @@ func TccGlobalTransaction2(dtm string, gid string, custom func(*Tcc), tccFunc Tc
if rerr != nil {
return rerr
}
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
defer dtmimp.DeferDo(&rerr, func() error {
return dtmimp.TransCallDtm(&tcc.TransBase, tcc, "submit")
}, func() error {

2
dtmcli/xa.go

@ -58,7 +58,7 @@ func NewXaClient(server string, mysqlConf DBConf, notifyURL string, register XaR
return xa, nil
}
// HandleCallback 处理commit/rollback的回调
// HandleCallback handle commit/rollback callback
func (xc *XaClient) HandleCallback(gid string, branchID string, action string) interface{} {
return xc.XaClientBase.HandleCallback(gid, branchID, action)
}

2
dtmgrpc/dtmgimp/README-cn.md

@ -0,0 +1,2 @@
## 注意
此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口

4
dtmgrpc/dtmgimp/README.md

@ -1,2 +1,2 @@
## 注意
此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口
## Notice
Please donot use this package, and this package should only be used in dtm internally. The interfaces are not stable, and package name has postfix "imp"

4
dtmgrpc/dtmgimp/types.go

@ -19,7 +19,7 @@ import (
"google.golang.org/protobuf/proto"
)
// GrpcServerLog 打印grpc服务端的日志
// GrpcServerLog middleware to print server-side grpc log
func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
began := time.Now()
logger.Debugf("grpc server handling: %s %s", info.FullMethod, dtmimp.MustMarshalString(req))
@ -35,7 +35,7 @@ func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerI
return m, err
}
// GrpcClientLog 打印grpc调用的日志
// GrpcClientLog middleware to print client-side grpc log
func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req))
LogDtmCtx(ctx)

7
dtmgrpc/tcc.go

@ -25,9 +25,9 @@ type TccGrpc struct {
type TccGlobalFunc func(tcc *TccGrpc) error
// TccGlobalTransaction begin a tcc global transaction
// dtm dtm服务器地址
// gid 全局事务id
// tccFunc tcc事务函数,里面会定义全局事务的分支
// dtm dtm server url
// gid global transaction id
// tccFunc tcc busi func, define the transaction logic
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
return TccGlobalTransaction2(dtm, gid, func(tg *TccGrpc) {}, tccFunc)
}
@ -40,7 +40,6 @@ func TccGlobalTransaction2(dtm string, gid string, custom func(*TccGrpc), tccFun
if rerr != nil {
return rerr
}
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
defer dtmimp.DeferDo(&rerr, func() error {
return dtmgimp.DtmGrpcCall(&tcc.TransBase, "Submit")
}, func() error {

2
dtmgrpc/xa.go

@ -57,7 +57,7 @@ func NewXaGrpcClient(server string, mysqlConf dtmcli.DBConf, notifyURL string) *
return xa
}
// HandleCallback 处理commit/rollback的回调
// HandleCallback handle commit/rollback callback
func (xc *XaGrpcClient) HandleCallback(ctx context.Context) (*emptypb.Empty, error) {
tb := dtmgimp.TransBaseFromGrpc(ctx)
return &emptypb.Empty{}, xc.XaClientBase.HandleCallback(tb.Gid, tb.BranchID, tb.Op)

2
dtmsvr/config/config.go

@ -84,7 +84,7 @@ type configType struct {
Log Log `yaml:"Log"`
}
// Config 配置
// Config config
var Config = configType{}
// MustLoadConfig load config from env and file

2
dtmsvr/storage/sql/sql.go

@ -103,7 +103,7 @@ func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []sto
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(global)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
if dbr.RowsAffected <= 0 { // not a new trans, return
return storage.ErrUniqueConflict
}
if len(branches) > 0 {

2
dtmsvr/trans_status.go

@ -67,7 +67,7 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo
GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos)
logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s",
b.Gid, dtmcli.StatusPrepared, b.String())
} else { // 为了性能优化,把branch的status更新异步化
} else { // for better performance, batch the updates of branch status
updateBranchAsyncChan <- branchStatus{id: b.ID, gid: t.Gid, status: status, finishTime: &now}
}
}

4
dtmutil/db.go

@ -78,7 +78,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
afterName := "cb_after"
logger.Debugf("installing db plugin: %s", op.Name())
// 开始前
// before
_ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before)
_ = db.Callback().Query().Before("gorm:query").Register(beforeName, before)
_ = db.Callback().Delete().Before("gorm:before_delete").Register(beforeName, before)
@ -86,7 +86,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
_ = db.Callback().Row().Before("gorm:row").Register(beforeName, before)
_ = db.Callback().Raw().Before("gorm:raw").Register(beforeName, before)
// 结束后
// after
_ = db.Callback().Create().After("gorm:after_create").Register(afterName, after)
_ = db.Callback().Query().After("gorm:after_query").Register(afterName, after)
_ = db.Callback().Delete().After("gorm:after_delete").Register(afterName, after)

2
dtmutil/utils.go

@ -112,7 +112,7 @@ func MustGetwd() string {
return wd
}
// GetSQLDir 获取调用该函数的caller源代码的目录,主要用于测试时,查找相关文件
// GetSQLDir get sql scripts dir, used in test
func GetSQLDir() string {
wd := MustGetwd()
if filepath.Base(wd) == "test" {

6
helper/bench/main.go

@ -47,8 +47,8 @@ func main() {
} else {
hintAndExit()
}
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询
svr.StartSvr() // 启动bench服务
dtmsvr.StartSvr()
go dtmsvr.CronExpiredTrans(-1)
svr.StartSvr()
select {}
}

4
main.go

@ -69,7 +69,7 @@ func main() {
}
_, _ = maxprocs.Set(maxprocs.Logger(logger.Infof))
registry.WaitStoreUp()
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询
dtmsvr.StartSvr() // start dtmsvr api
go dtmsvr.CronExpiredTrans(-1) // start dtmsvr cron job
select {}
}

39
sqls/dtmsvr.storage.tdsql.sql

@ -4,38 +4,37 @@ CREATE DATABASE IF NOT EXISTS dtm
drop table IF EXISTS dtm.trans_global;
CREATE TABLE if not EXISTS dtm.trans_global (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg',
-- `data` TEXT COMMENT '事务携带的数据', -- 影响性能,不必要存储
`status` varchar(12) NOT NULL COMMENT '全局事务的状态 prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api',
`protocol` varchar(45) not null comment '通信协议 http | grpc',
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(128) NOT NULL COMMENT 'url to check for 2-phase message',
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`commit_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(256) DEFAULT '',
`custom_data` varchar(256) DEFAULT '',
`next_cron_interval` int(11) default null comment '下次定时处理的间隔',
`next_cron_time` datetime default null comment '下次定时处理的时间',
`owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者',
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
`custom_data` varchar(256) DEFAULT '' COMMENT 'custom data for transaction',
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',
`ext_data` TEXT comment 'extended data for this trans',
PRIMARY KEY (`id`,`gid`),
UNIQUE KEY `id` (`id`,`gid`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),
key `status_next_cron_time` (`status`, `next_cron_time`) comment '这个索引用于查询超时的全局事务,能够合理的走索引'
key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 shardkey=gid;
drop table IF EXISTS dtm.trans_branch_op;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`url` varchar(128) NOT NULL COMMENT '动作关联的url',
`data` TEXT COMMENT '请求所携带的数据',
`bin_data` BLOB COMMENT 'grpc的二进制数据',
`branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支ID',
`op` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa',
`status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked',
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`url` varchar(128) NOT NULL COMMENT 'the url of this op',
`data` TEXT COMMENT 'request body, depreceated',
`bin_data` BLOB COMMENT 'request body',
`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL,

12
test/busi/quick_start.go

@ -9,7 +9,7 @@ import (
"github.com/gin-gonic/gin"
)
// 事务参与者的服务地址
// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8082
@ -57,14 +57,14 @@ const dtmServer = "http://localhost:36789/api/dtmsvr"
// QsFireRequest quick start: fire request
func QsFireRequest() string {
req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址
req := &gin.H{"amount": 30} // load of micro-service
// DtmServer is the url of dtm
saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCompensate"
// add a TransOut subtraction,forward operation with url: qsBusi+"/TransOut", reverse compensation operation with url: qsBusi+"/TransOutCompensate"
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCompensate"
// add a TransIn subtraction, forward operation with url: qsBusi+"/TransIn", reverse compensation operation with url: qsBusi+"/TransInCompensate"
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
// submit the created saga transaction,dtm ensures all subtractions either complete or get revoked
err := saga.Submit()
if err != nil {

2
test/busi/utils.go

@ -100,7 +100,7 @@ func oldWrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc
return fn(c)
}()
var b = []byte{}
if resp, ok := r.(*resty.Response); ok { // 如果是response,则取出body直接处理
if resp, ok := r.(*resty.Response); ok { // if it is a response,the get the body
b = resp.Body()
} else if err == nil {
b, err = json.Marshal(r)

6
test/tcc_barrier_test.go

@ -74,7 +74,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm"
cancelURL := Busi + "/TccBSleepCancel"
// 请参见子事务屏障里的时序图,这里为了模拟该时序图,手动拆解了callbranch
// refer to time diagram for barrier, here we simulate it
branchID := tcc.NewSubBranchID()
busi.SetSleepCancelHandler(func(c *gin.Context) interface{} {
res := busi.TccBarrierTransOutCancel(c)
@ -85,7 +85,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
logger.Debugf("disorderHandler after cancel return read")
return res
})
// 注册子事务
// register tcc branch
resp, err := dtmimp.RestyClient.R().
SetBody(map[string]interface{}{
"gid": tcc.Gid,
@ -124,7 +124,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) {
"op": dtmcli.BranchTry,
}).
Post(tryURL)
assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功
assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // dangle op, return success
logger.Debugf("cronFinished read")
<-cronFinished
<-cronFinished

2
test/xa_test.go

@ -49,7 +49,7 @@ func TestXaDuplicate(t *testing.T) {
_, err = dtmimp.DBExec(sdb, "xa recover")
assert.Nil(t, err)
}
_, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // 先把某一个事务提交,模拟重复请求
_, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // simulate repeated request
assert.Nil(t, err)
return xa.CallBranch(req, busi.Busi+"/TransInXa")
})

Loading…
Cancel
Save