diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 492296d..7bdf1ad 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -63,9 +63,9 @@ func insertBarrier(tx DB, transType string, gid string, branchID string, op stri return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason) } -// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 -// tx: 本地数据库的事务对象,允许子事务屏障进行事务操作 -// busiCall: 业务函数,仅在必要时被调用 +// Call see detail description in https://en.dtm.pub/practice/barrier.html +// tx: local transaction connection +// busiCall: busi func func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) { bid := bb.newBarrierID() defer dtmimp.DeferDo(&rerr, func() error { diff --git a/dtmcli/dtmimp/README-cn.md b/dtmcli/dtmimp/README-cn.md new file mode 100644 index 0000000..ba4fb25 --- /dev/null +++ b/dtmcli/dtmimp/README-cn.md @@ -0,0 +1,2 @@ +## 注意 +此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口 \ No newline at end of file diff --git a/dtmcli/dtmimp/README.md b/dtmcli/dtmimp/README.md index ba4fb25..06aa51e 100644 --- a/dtmcli/dtmimp/README.md +++ b/dtmcli/dtmimp/README.md @@ -1,2 +1,2 @@ -## 注意 -此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口 \ No newline at end of file +## Notice +Please donot use this package, and this package should only be used in dtm internally. The interfaces are not stable, and package name has postfix "imp" \ No newline at end of file diff --git a/dtmcli/dtmimp/trans_xa_base.go b/dtmcli/dtmimp/trans_xa_base.go index d1eb352..a61a3ac 100644 --- a/dtmcli/dtmimp/trans_xa_base.go +++ b/dtmcli/dtmimp/trans_xa_base.go @@ -61,13 +61,12 @@ func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) error) return } -// HandleGlobalTrans http/grpc GlobalTransaction的公共方法 +// HandleGlobalTrans http/grpc GlobalTransaction shared func func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) { rerr = callDtm("prepare") if rerr != nil { return } - // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer DeferDo(&rerr, func() error { return callDtm("submit") }, func() error { diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 7d0c77d..b68c5ae 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -25,7 +25,7 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // TccGlobalTransaction begin a tcc global transaction // dtm dtm server address // gid global transaction ID -// tccFunc tcc事务函数,里面会定义全局事务的分支 +// tccFunc define the detail tcc busi func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { return TccGlobalTransaction2(dtm, gid, func(t *Tcc) {}, tccFunc) } @@ -38,7 +38,6 @@ func TccGlobalTransaction2(dtm string, gid string, custom func(*Tcc), tccFunc Tc if rerr != nil { return rerr } - // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer dtmimp.DeferDo(&rerr, func() error { return dtmimp.TransCallDtm(&tcc.TransBase, tcc, "submit") }, func() error { diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 505837d..179891d 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -58,7 +58,7 @@ func NewXaClient(server string, mysqlConf DBConf, notifyURL string, register XaR return xa, nil } -// HandleCallback 处理commit/rollback的回调 +// HandleCallback handle commit/rollback callback func (xc *XaClient) HandleCallback(gid string, branchID string, action string) interface{} { return xc.XaClientBase.HandleCallback(gid, branchID, action) } diff --git a/dtmgrpc/dtmgimp/README-cn.md b/dtmgrpc/dtmgimp/README-cn.md new file mode 100644 index 0000000..ba4fb25 --- /dev/null +++ b/dtmgrpc/dtmgimp/README-cn.md @@ -0,0 +1,2 @@ +## 注意 +此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口 \ No newline at end of file diff --git a/dtmgrpc/dtmgimp/README.md b/dtmgrpc/dtmgimp/README.md index ba4fb25..06aa51e 100644 --- a/dtmgrpc/dtmgimp/README.md +++ b/dtmgrpc/dtmgimp/README.md @@ -1,2 +1,2 @@ -## 注意 -此包带imp后缀,主要被dtm内部使用,相关接口可能会发生变更,请勿使用这里的接口 \ No newline at end of file +## Notice +Please donot use this package, and this package should only be used in dtm internally. The interfaces are not stable, and package name has postfix "imp" \ No newline at end of file diff --git a/dtmgrpc/dtmgimp/types.go b/dtmgrpc/dtmgimp/types.go index d19cd69..2b6a653 100644 --- a/dtmgrpc/dtmgimp/types.go +++ b/dtmgrpc/dtmgimp/types.go @@ -19,7 +19,7 @@ import ( "google.golang.org/protobuf/proto" ) -// GrpcServerLog 打印grpc服务端的日志 +// GrpcServerLog middleware to print server-side grpc log func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { began := time.Now() logger.Debugf("grpc server handling: %s %s", info.FullMethod, dtmimp.MustMarshalString(req)) @@ -35,7 +35,7 @@ func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerI return m, err } -// GrpcClientLog 打印grpc调用的日志 +// GrpcClientLog middleware to print client-side grpc log func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req)) LogDtmCtx(ctx) diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index bbbaef8..24a2f6a 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -25,9 +25,9 @@ type TccGrpc struct { type TccGlobalFunc func(tcc *TccGrpc) error // TccGlobalTransaction begin a tcc global transaction -// dtm dtm服务器地址 -// gid 全局事务id -// tccFunc tcc事务函数,里面会定义全局事务的分支 +// dtm dtm server url +// gid global transaction id +// tccFunc tcc busi func, define the transaction logic func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { return TccGlobalTransaction2(dtm, gid, func(tg *TccGrpc) {}, tccFunc) } @@ -40,7 +40,6 @@ func TccGlobalTransaction2(dtm string, gid string, custom func(*TccGrpc), tccFun if rerr != nil { return rerr } - // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer dtmimp.DeferDo(&rerr, func() error { return dtmgimp.DtmGrpcCall(&tcc.TransBase, "Submit") }, func() error { diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index a7bbebb..37f7cfc 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -57,7 +57,7 @@ func NewXaGrpcClient(server string, mysqlConf dtmcli.DBConf, notifyURL string) * return xa } -// HandleCallback 处理commit/rollback的回调 +// HandleCallback handle commit/rollback callback func (xc *XaGrpcClient) HandleCallback(ctx context.Context) (*emptypb.Empty, error) { tb := dtmgimp.TransBaseFromGrpc(ctx) return &emptypb.Empty{}, xc.XaClientBase.HandleCallback(tb.Gid, tb.BranchID, tb.Op) diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go index 750af5e..a72bea7 100644 --- a/dtmsvr/config/config.go +++ b/dtmsvr/config/config.go @@ -84,7 +84,7 @@ type configType struct { Log Log `yaml:"Log"` } -// Config 配置 +// Config config var Config = configType{} // MustLoadConfig load config from env and file diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index d87297a..c15c7fc 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -103,7 +103,7 @@ func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []sto dbr := db.Must().Clauses(clause.OnConflict{ DoNothing: true, }).Create(global) - if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 + if dbr.RowsAffected <= 0 { // not a new trans, return return storage.ErrUniqueConflict } if len(branches) > 0 { diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 2c1c16a..768a881 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -67,7 +67,7 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos) logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s", b.Gid, dtmcli.StatusPrepared, b.String()) - } else { // 为了性能优化,把branch的status更新异步化 + } else { // for better performance, batch the updates of branch status updateBranchAsyncChan <- branchStatus{id: b.ID, gid: t.Gid, status: status, finishTime: &now} } } diff --git a/dtmutil/db.go b/dtmutil/db.go index 86973b2..97bfefa 100644 --- a/dtmutil/db.go +++ b/dtmutil/db.go @@ -78,7 +78,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { afterName := "cb_after" logger.Debugf("installing db plugin: %s", op.Name()) - // 开始前 + // before _ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before) _ = db.Callback().Query().Before("gorm:query").Register(beforeName, before) _ = db.Callback().Delete().Before("gorm:before_delete").Register(beforeName, before) @@ -86,7 +86,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { _ = db.Callback().Row().Before("gorm:row").Register(beforeName, before) _ = db.Callback().Raw().Before("gorm:raw").Register(beforeName, before) - // 结束后 + // after _ = db.Callback().Create().After("gorm:after_create").Register(afterName, after) _ = db.Callback().Query().After("gorm:after_query").Register(afterName, after) _ = db.Callback().Delete().After("gorm:after_delete").Register(afterName, after) diff --git a/dtmutil/utils.go b/dtmutil/utils.go index 849b539..9c78bb2 100644 --- a/dtmutil/utils.go +++ b/dtmutil/utils.go @@ -112,7 +112,7 @@ func MustGetwd() string { return wd } -// GetSQLDir 获取调用该函数的caller源代码的目录,主要用于测试时,查找相关文件 +// GetSQLDir get sql scripts dir, used in test func GetSQLDir() string { wd := MustGetwd() if filepath.Base(wd) == "test" { diff --git a/helper/bench/main.go b/helper/bench/main.go index e8cb94c..058db3e 100644 --- a/helper/bench/main.go +++ b/helper/bench/main.go @@ -47,8 +47,8 @@ func main() { } else { hintAndExit() } - dtmsvr.StartSvr() // 启动dtmsvr的api服务 - go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询 - svr.StartSvr() // 启动bench服务 + dtmsvr.StartSvr() + go dtmsvr.CronExpiredTrans(-1) + svr.StartSvr() select {} } diff --git a/main.go b/main.go index f0fcd98..813e920 100644 --- a/main.go +++ b/main.go @@ -69,7 +69,7 @@ func main() { } _, _ = maxprocs.Set(maxprocs.Logger(logger.Infof)) registry.WaitStoreUp() - dtmsvr.StartSvr() // 启动dtmsvr的api服务 - go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询 + dtmsvr.StartSvr() // start dtmsvr api + go dtmsvr.CronExpiredTrans(-1) // start dtmsvr cron job select {} } diff --git a/sqls/dtmsvr.storage.tdsql.sql b/sqls/dtmsvr.storage.tdsql.sql index 09cf816..2f6e42d 100644 --- a/sqls/dtmsvr.storage.tdsql.sql +++ b/sqls/dtmsvr.storage.tdsql.sql @@ -4,38 +4,37 @@ CREATE DATABASE IF NOT EXISTS dtm drop table IF EXISTS dtm.trans_global; CREATE TABLE if not EXISTS dtm.trans_global ( `id` bigint(22) NOT NULL AUTO_INCREMENT, - `gid` varchar(128) NOT NULL COMMENT '事务全局id', - `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg', - -- `data` TEXT COMMENT '事务携带的数据', -- 影响性能,不必要存储 - `status` varchar(12) NOT NULL COMMENT '全局事务的状态 prepared | submitted | aborting | finished | rollbacked', - `query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api', - `protocol` varchar(45) not null comment '通信协议 http | grpc', + `gid` varchar(128) NOT NULL COMMENT 'global transaction id', + `trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg', + `status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked', + `query_prepared` varchar(128) NOT NULL COMMENT 'url to check for 2-phase message', + `protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, - `commit_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, - `options` varchar(256) DEFAULT '', - `custom_data` varchar(256) DEFAULT '', - `next_cron_interval` int(11) default null comment '下次定时处理的间隔', - `next_cron_time` datetime default null comment '下次定时处理的时间', - `owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者', + `options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout', + `custom_data` varchar(256) DEFAULT '' COMMENT 'custom data for transaction', + `next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job', + `next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job', + `owner` varchar(128) not null default '' comment 'who is locking this trans', + `ext_data` TEXT comment 'extended data for this trans', PRIMARY KEY (`id`,`gid`), UNIQUE KEY `id` (`id`,`gid`), UNIQUE KEY `gid` (`gid`), key `owner`(`owner`), - key `status_next_cron_time` (`status`, `next_cron_time`) comment '这个索引用于查询超时的全局事务,能够合理的走索引' + key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans' ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 shardkey=gid; drop table IF EXISTS dtm.trans_branch_op; CREATE TABLE IF NOT EXISTS dtm.trans_branch_op ( `id` bigint(22) NOT NULL AUTO_INCREMENT, - `gid` varchar(128) NOT NULL COMMENT '事务全局id', - `url` varchar(128) NOT NULL COMMENT '动作关联的url', - `data` TEXT COMMENT '请求所携带的数据', - `bin_data` BLOB COMMENT 'grpc的二进制数据', - `branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支ID', - `op` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', - `status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked', + `gid` varchar(128) NOT NULL COMMENT 'global transaction id', + `url` varchar(128) NOT NULL COMMENT 'the url of this op', + `data` TEXT COMMENT 'request body, depreceated', + `bin_data` BLOB COMMENT 'request body', + `branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID', + `op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel', + `status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed', `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL, diff --git a/test/busi/quick_start.go b/test/busi/quick_start.go index 09d84ce..6d57af5 100644 --- a/test/busi/quick_start.go +++ b/test/busi/quick_start.go @@ -9,7 +9,7 @@ import ( "github.com/gin-gonic/gin" ) -// 事务参与者的服务地址 +// busi address const qsBusiAPI = "/api/busi_start" const qsBusiPort = 8082 @@ -57,14 +57,14 @@ const dtmServer = "http://localhost:36789/api/dtmsvr" // QsFireRequest quick start: fire request func QsFireRequest() string { - req := &gin.H{"amount": 30} // 微服务的载荷 - // DtmServer为DTM服务的地址 + req := &gin.H{"amount": 30} // load of micro-service + // DtmServer is the url of dtm saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)). - // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCompensate" + // add a TransOut subtraction,forward operation with url: qsBusi+"/TransOut", reverse compensation operation with url: qsBusi+"/TransOutCompensate" Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). - // 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCompensate" + // add a TransIn subtraction, forward operation with url: qsBusi+"/TransIn", reverse compensation operation with url: qsBusi+"/TransInCompensate" Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req) - // 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务 + // submit the created saga transaction,dtm ensures all subtractions either complete or get revoked err := saga.Submit() if err != nil { diff --git a/test/busi/utils.go b/test/busi/utils.go index 8f68373..de5a490 100644 --- a/test/busi/utils.go +++ b/test/busi/utils.go @@ -100,7 +100,7 @@ func oldWrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc return fn(c) }() var b = []byte{} - if resp, ok := r.(*resty.Response); ok { // 如果是response,则取出body直接处理 + if resp, ok := r.(*resty.Response); ok { // if it is a response,the get the body b = resp.Body() } else if err == nil { b, err = json.Marshal(r) diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index c55308d..3774b31 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -74,7 +74,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) { tryURL := Busi + "/TccBTransOutTry" confirmURL := Busi + "/TccBTransOutConfirm" cancelURL := Busi + "/TccBSleepCancel" - // 请参见子事务屏障里的时序图,这里为了模拟该时序图,手动拆解了callbranch + // refer to time diagram for barrier, here we simulate it branchID := tcc.NewSubBranchID() busi.SetSleepCancelHandler(func(c *gin.Context) interface{} { res := busi.TccBarrierTransOutCancel(c) @@ -85,7 +85,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) { logger.Debugf("disorderHandler after cancel return read") return res }) - // 注册子事务 + // register tcc branch resp, err := dtmimp.RestyClient.R(). SetBody(map[string]interface{}{ "gid": tcc.Gid, @@ -124,7 +124,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) { "op": dtmcli.BranchTry, }). Post(tryURL) - assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功 + assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // dangle op, return success logger.Debugf("cronFinished read") <-cronFinished <-cronFinished diff --git a/test/xa_test.go b/test/xa_test.go index b276340..6da797a 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -49,7 +49,7 @@ func TestXaDuplicate(t *testing.T) { _, err = dtmimp.DBExec(sdb, "xa recover") assert.Nil(t, err) } - _, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // 先把某一个事务提交,模拟重复请求 + _, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // simulate repeated request assert.Nil(t, err) return xa.CallBranch(req, busi.Busi+"/TransInXa") })