Browse Source

Merge pull request #149 from fsdfsffdsf/main

feature:support tencent distributed database tdsql
pull/152/head
yedf2 4 years ago
committed by GitHub
parent
commit
66bfeb573f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dtmsvr/svr.go
  2. 2
      dtmsvr/trans_status.go
  3. 12
      dtmsvr/trans_type_saga.go
  4. 1
      dtmsvr/utils.go
  5. 46
      sqls/dtmsvr.storage.tdsql.sql

1
dtmsvr/svr.go

@ -70,6 +70,7 @@ func updateBranchAsync() {
case updateBranch := <-updateBranchAsyncChan:
updates = append(updates, TransBranch{
ModelBase: dtmutil.ModelBase{ID: updateBranch.id},
Gid: updateBranch.gid,
Status: updateBranch.status,
FinishTime: updateBranch.finishTime,
})

2
dtmsvr/trans_status.go

@ -53,7 +53,7 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo
logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s",
b.Gid, dtmcli.StatusPrepared, b.String())
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now}
updateBranchAsyncChan <- branchStatus{id: b.ID, gid: t.Gid, status: status, finishTime: &now}
}
}

12
dtmsvr/trans_type_saga.go

@ -20,7 +20,8 @@ type transSagaProcessor struct {
}
func init() {
registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor { return &transSagaProcessor{TransGlobal: trans} })
registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor {
return &transSagaProcessor{TransGlobal: trans} })
}
func (t *transSagaProcessor) GenBranches() []TransBranch {
@ -114,7 +115,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
toRun := []int{}
for current := 0; current < n; current++ {
br := &branchResults[current]
if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared {
if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) &&
br.status == dtmcli.StatusPrepared {
toRun = append(toRun, current)
}
}
@ -132,11 +134,13 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
pickAndRunCompensates := func(toRunActions []int) {
for _, b := range toRunActions {
// these branches may have run. so flag them to status succeed, then run the corresponding compensate
// these branches may have run. so flag them to status succeed, then run the corresponding
// compensate
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
go asyncExecBranch(i)
}

1
dtmsvr/utils.go

@ -19,6 +19,7 @@ import (
type branchStatus struct {
id uint64
gid string
status string
finishTime *time.Time
}

46
sqls/dtmsvr.storage.tdsql.sql

@ -0,0 +1,46 @@
CREATE DATABASE IF NOT EXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table IF EXISTS dtm.trans_global;
CREATE TABLE if not EXISTS dtm.trans_global (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg',
-- `data` TEXT COMMENT '事务携带的数据', -- 影响性能,不必要存储
`status` varchar(12) NOT NULL COMMENT '全局事务的状态 prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api',
`protocol` varchar(45) not null comment '通信协议 http | grpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`commit_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(256) DEFAULT '',
`custom_data` varchar(256) DEFAULT '',
`next_cron_interval` int(11) default null comment '下次定时处理的间隔',
`next_cron_time` datetime default null comment '下次定时处理的时间',
`owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者',
PRIMARY KEY (`id`,`gid`),
UNIQUE KEY `id` (`id`,`gid`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),
key `status_next_cron_time` (`status`, `next_cron_time`) comment '这个索引用于查询超时的全局事务,能够合理的走索引'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 shardkey=gid;
drop table IF EXISTS dtm.trans_branch_op;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`url` varchar(128) NOT NULL COMMENT '动作关联的url',
`data` TEXT COMMENT '请求所携带的数据',
`bin_data` BLOB COMMENT 'grpc的二进制数据',
`branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支ID',
`op` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa',
`status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked',
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`,`gid`),
UNIQUE KEY `id` (`id`,`gid`),
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 shardkey=gid;
Loading…
Cancel
Save