diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index be40e1c..f891a6f 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -70,6 +70,7 @@ func updateBranchAsync() { case updateBranch := <-updateBranchAsyncChan: updates = append(updates, TransBranch{ ModelBase: dtmutil.ModelBase{ID: updateBranch.id}, + Gid: updateBranch.gid, Status: updateBranch.status, FinishTime: updateBranch.finishTime, }) diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index c04110a..ec78533 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -53,7 +53,7 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s", b.Gid, dtmcli.StatusPrepared, b.String()) } else { // 为了性能优化,把branch的status更新异步化 - updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now} + updateBranchAsyncChan <- branchStatus{id: b.ID, gid: t.Gid, status: status, finishTime: &now} } } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index b74dce4..a02e8bc 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -20,7 +20,8 @@ type transSagaProcessor struct { } func init() { - registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor { return &transSagaProcessor{TransGlobal: trans} }) + registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor { + return &transSagaProcessor{TransGlobal: trans} }) } func (t *transSagaProcessor) GenBranches() []TransBranch { @@ -114,7 +115,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { toRun := []int{} for current := 0; current < n; current++ { br := &branchResults[current] - if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared { + if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && + br.status == dtmcli.StatusPrepared { toRun = append(toRun, current) } } @@ -132,11 +134,13 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } pickAndRunCompensates := func(toRunActions []int) { for _, b := range toRunActions { - // these branches may have run. so flag them to status succeed, then run the corresponding compensate + // these branches may have run. so flag them to status succeed, then run the corresponding + // compensate branchResults[b].status = dtmcli.StatusSucceed } for i, b := range branchResults { - if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && + branchResults[i+1].status != dtmcli.StatusPrepared { rsCToStart++ go asyncExecBranch(i) } diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index 37d938c..c52dce0 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -19,6 +19,7 @@ import ( type branchStatus struct { id uint64 + gid string status string finishTime *time.Time } diff --git a/sqls/dtmsvr.storage.tdsql.sql b/sqls/dtmsvr.storage.tdsql.sql new file mode 100644 index 0000000..09cf816 --- /dev/null +++ b/sqls/dtmsvr.storage.tdsql.sql @@ -0,0 +1,46 @@ +CREATE DATABASE IF NOT EXISTS dtm +/*!40100 DEFAULT CHARACTER SET utf8mb4 */ +; +drop table IF EXISTS dtm.trans_global; +CREATE TABLE if not EXISTS dtm.trans_global ( + `id` bigint(22) NOT NULL AUTO_INCREMENT, + `gid` varchar(128) NOT NULL COMMENT '事务全局id', + `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg', + -- `data` TEXT COMMENT '事务携带的数据', -- 影响性能,不必要存储 + `status` varchar(12) NOT NULL COMMENT '全局事务的状态 prepared | submitted | aborting | finished | rollbacked', + `query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api', + `protocol` varchar(45) not null comment '通信协议 http | grpc', + `create_time` datetime DEFAULT NULL, + `update_time` datetime DEFAULT NULL, + `commit_time` datetime DEFAULT NULL, + `finish_time` datetime DEFAULT NULL, + `rollback_time` datetime DEFAULT NULL, + `options` varchar(256) DEFAULT '', + `custom_data` varchar(256) DEFAULT '', + `next_cron_interval` int(11) default null comment '下次定时处理的间隔', + `next_cron_time` datetime default null comment '下次定时处理的时间', + `owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者', + PRIMARY KEY (`id`,`gid`), + UNIQUE KEY `id` (`id`,`gid`), + UNIQUE KEY `gid` (`gid`), + key `owner`(`owner`), + key `status_next_cron_time` (`status`, `next_cron_time`) comment '这个索引用于查询超时的全局事务,能够合理的走索引' +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 shardkey=gid; +drop table IF EXISTS dtm.trans_branch_op; +CREATE TABLE IF NOT EXISTS dtm.trans_branch_op ( + `id` bigint(22) NOT NULL AUTO_INCREMENT, + `gid` varchar(128) NOT NULL COMMENT '事务全局id', + `url` varchar(128) NOT NULL COMMENT '动作关联的url', + `data` TEXT COMMENT '请求所携带的数据', + `bin_data` BLOB COMMENT 'grpc的二进制数据', + `branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支ID', + `op` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', + `status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked', + `finish_time` datetime DEFAULT NULL, + `rollback_time` datetime DEFAULT NULL, + `create_time` datetime DEFAULT NULL, + `update_time` datetime DEFAULT NULL, + PRIMARY KEY (`id`,`gid`), + UNIQUE KEY `id` (`id`,`gid`), + UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`) +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 shardkey=gid;