Browse Source

branch_type change to op

pull/52/head
yedf2 4 years ago
parent
commit
51a8eb3e0c
  1. 4
      bench/http.go
  2. 42
      dtmcli/barrier.go
  3. 4
      dtmcli/barrier.mysql.sql
  4. 4
      dtmcli/barrier.postgres.sql
  5. 14
      dtmcli/dtmimp/trans_base.go
  6. 1
      dtmcli/tcc.go
  7. 2
      dtmgrpc/barrier.go
  8. 4
      dtmgrpc/dtmgimp/dtmgimp.pb.go
  9. 2
      dtmgrpc/dtmgimp/dtmgimp.proto
  10. 8
      dtmgrpc/dtmgimp/utils.go
  11. 1
      dtmgrpc/tcc.go
  12. 2
      dtmgrpc/xa.go
  13. 6
      dtmsvr/api.go
  14. 8
      dtmsvr/dtmsvr.mysql.sql
  15. 4
      dtmsvr/dtmsvr.postgres.sql
  16. 9
      dtmsvr/metrics.go
  17. 18
      dtmsvr/trans.go
  18. 14
      dtmsvr/trans_msg.go
  19. 36
      dtmsvr/trans_saga.go
  20. 4
      dtmsvr/trans_tcc.go
  21. 2
      dtmsvr/trans_xa.go
  22. 8
      examples/base_types.go
  23. 2
      examples/http_xa.go
  24. 10
      test/base_test.go
  25. 12
      test/tcc_barrier_test.go

4
bench/http.go

@ -75,7 +75,7 @@ func StartSvr() {
delta DECIMAL(11, 2) not null,
gid varchar(45) not null,
branch_id varchar(45) not null,
branch_type varchar(45) not null,
op varchar(45) not null,
reason varchar(45),
create_time datetime not null default now(),
update_time datetime not null default now(),
@ -93,7 +93,7 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query())
f := func(tx dtmcli.DB) error {
for i := 0; i < sqls; i++ {
_, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, branch_type, reason) values(?,?,?,?,?,?)",
_, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)",
uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id")))
dtmimp.FatalIfError(err)
_, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid)

42
dtmcli/barrier.go

@ -12,42 +12,42 @@ type BarrierBusiFunc func(db DB) error
// BranchBarrier every branch info
type BranchBarrier struct {
TransType string
Gid string
BranchID string
BranchType string
BarrierID int
TransType string
Gid string
BranchID string
Op string
BarrierID int
}
func (bb *BranchBarrier) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.BranchType)
return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op)
}
// BarrierFromQuery construct transaction info from request
func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) {
return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("branch_type"))
return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("op"))
}
// BarrierFrom construct transaction info from request
func BarrierFrom(transType, gid, branchID, branchType string) (*BranchBarrier, error) {
func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) {
ti := &BranchBarrier{
TransType: transType,
Gid: gid,
BranchID: branchID,
BranchType: branchType,
TransType: transType,
Gid: gid,
BranchID: branchID,
Op: op,
}
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" {
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.Op == "" {
return nil, fmt.Errorf("invlid trans info: %v", ti)
}
return ti, nil
}
func insertBarrier(tx Tx, transType string, gid string, branchID string, branchType string, barrierID string, reason string) (int64, error) {
if branchType == "" {
func insertBarrier(tx Tx, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
if op == "" {
return 0, nil
}
sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
return dtmimp.DBExec(tx, sql, transType, gid, branchID, branchType, barrierID, reason)
sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}
// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465
@ -71,12 +71,12 @@ func (bb *BranchBarrier) Call(tx Tx, busiCall BarrierBusiFunc) (rerr error) {
originType := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[ti.BranchType]
}[ti.Op]
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.BranchType)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, bid, ti.BranchType)
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.BranchType == BranchCancel || ti.BranchType == BranchCompensate) && originAffected > 0 || // 这个是空补偿
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿
currentAffected == 0 { // 这个是重复请求或者悬挂
return
}

4
dtmcli/barrier.mysql.sql

@ -7,12 +7,12 @@ create table if not exists dtm_barrier.barrier(
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
branch_type varchar(45) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, branch_type, barrier_id)
UNIQUE key(gid, branch_id, op, barrier_id)
);

4
dtmcli/barrier.postgres.sql

@ -6,11 +6,11 @@ create table if not exists dtm_barrier.barrier(
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
branch_type varchar(45) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '',
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
PRIMARY KEY(id),
CONSTRAINT uniq_barrier unique(gid, branch_id, branch_type, barrier_id)
CONSTRAINT uniq_barrier unique(gid, branch_id, op, barrier_id)
);

14
dtmcli/dtmimp/trans_base.go

@ -51,7 +51,7 @@ type TransBase struct {
Payloads []string `json:"payloads,omitempty"` // used in MSG/SAGA
BinPayloads [][]byte `json:"-"`
BranchIDGen `json:"-"` // used in XA/TCC
BranchType string `json:"-"` // used in XA/TCC
Op string `json:"-"` // used in XA/TCC
QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG
}
@ -102,15 +102,15 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
}
// TransRequestBranch TransBAse request branch result
func TransRequestBranch(t *TransBase, body interface{}, branchID string, branchType string, url string) (*resty.Response, error) {
func TransRequestBranch(t *TransBase, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
resp, err := RestyClient.R().
SetBody(body).
SetQueryParams(map[string]string{
"dtm": t.Dtm,
"gid": t.Gid,
"branch_id": branchID,
"trans_type": t.TransType,
"branch_type": branchType,
"dtm": t.Dtm,
"gid": t.Gid,
"branch_id": branchID,
"trans_type": t.TransType,
"op": op,
}).
Post(url)
return resp, CheckResponse(resp, err)

1
dtmcli/tcc.go

@ -52,7 +52,6 @@ func TccFromQuery(qs url.Values) (*Tcc, error) {
}
// CallBranch call a tcc branch
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
branchID := t.NewSubBranchID()
err := dtmimp.TransRegisterBranch(&t.TransBase, map[string]string{

2
dtmgrpc/barrier.go

@ -10,5 +10,5 @@ import (
// BarrierFromGrpc generate a Barrier from grpc context
func BarrierFromGrpc(ctx context.Context) (*dtmcli.BranchBarrier, error) {
tb := dtmgimp.TransBaseFromGrpc(ctx)
return dtmcli.BarrierFrom(tb.TransType, tb.Gid, tb.BranchID, tb.BranchType)
return dtmcli.BarrierFrom(tb.TransType, tb.Gid, tb.BranchID, tb.Op)
}

4
dtmgrpc/dtmgimp/dtmgimp.pb.go

@ -273,7 +273,7 @@ type DtmBranchRequest struct {
Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"`
TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"`
BranchID string `protobuf:"bytes,3,opt,name=BranchID,proto3" json:"BranchID,omitempty"`
BranchType string `protobuf:"bytes,4,opt,name=BranchType,proto3" json:"BranchType,omitempty"`
Op string `protobuf:"bytes,4,opt,name=Op,proto3" json:"Op,omitempty"`
Data map[string]string `protobuf:"bytes,5,rep,name=Data,proto3" json:"Data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
BusiPayload []byte `protobuf:"bytes,6,opt,name=BusiPayload,proto3" json:"BusiPayload,omitempty"`
}
@ -333,7 +333,7 @@ func (x *DtmBranchRequest) GetBranchID() string {
func (x *DtmBranchRequest) GetBranchType() string {
if x != nil {
return x.BranchType
return x.Op
}
return ""
}

2
dtmgrpc/dtmgimp/dtmgimp.proto

@ -39,7 +39,7 @@ message DtmBranchRequest {
string Gid = 1;
string TransType = 2;
string BranchID = 3;
string BranchType = 4;
string Op = 4;
map<string, string> Data = 5;
bytes BusiPayload = 6;
}

8
dtmgrpc/dtmgimp/utils.go

@ -37,12 +37,12 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
const mdpre string = "dtm-"
// TransInfo2Ctx add trans info to grpc context
func TransInfo2Ctx(gid, transType, branchID, branchType, dtm string) context.Context {
func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context {
md := metadata.Pairs(
mdpre+"gid", gid,
mdpre+"trans_type", transType,
mdpre+"branch_id", branchID,
mdpre+"branch_type", branchType,
mdpre+"op", op,
mdpre+"dtm", dtm,
)
return metadata.NewOutgoingContext(context.Background(), md)
@ -52,7 +52,7 @@ func TransInfo2Ctx(gid, transType, branchID, branchType, dtm string) context.Con
func LogDtmCtx(ctx context.Context) {
tb := TransBaseFromGrpc(ctx)
if tb.Gid != "" {
dtmimp.Logf("gid: %s trans_type: %s branch_id: %s branch_type: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.BranchType, tb.Dtm)
dtmimp.Logf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm)
}
}
@ -68,6 +68,6 @@ func mdGet(md metadata.MD, key string) string {
func TransBaseFromGrpc(ctx context.Context) *dtmimp.TransBase {
md, _ := metadata.FromIncomingContext(ctx)
tb := dtmimp.NewTransBase(mdGet(md, "gid"), mdGet(md, "trans_type"), mdGet(md, "dtm"), mdGet(md, "branch_id"))
tb.BranchType = mdGet(md, "branch_type")
tb.Op = mdGet(md, "op")
return tb
}

1
dtmgrpc/tcc.go

@ -62,7 +62,6 @@ func TccFromGrpc(ctx context.Context) (*TccGrpc, error) {
}
// CallBranch call a tcc branch
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL string, cancelURL string, reply interface{}) error {
branchID := t.NewSubBranchID()
bd, err := proto.Marshal(busiMsg)

2
dtmgrpc/xa.go

@ -52,7 +52,7 @@ func NewXaGrpcClient(server string, mysqlConf map[string]string, notifyURL strin
// HandleCallback 处理commit/rollback的回调
func (xc *XaGrpcClient) HandleCallback(ctx context.Context) (*emptypb.Empty, error) {
tb := dtmgimp.TransBaseFromGrpc(ctx)
return &emptypb.Empty{}, xc.XaClientBase.HandleCallback(tb.Gid, tb.BranchID, tb.BranchType)
return &emptypb.Empty{}, xc.XaClientBase.HandleCallback(tb.Gid, tb.BranchID, tb.Op)
}
// XaLocalTransaction start a xa local transaction

6
dtmsvr/api.go

@ -56,13 +56,13 @@ func svcRegisterBranch(branch *TransBranch, data map[string]string) (interface{}
branches := []TransBranch{*branch, *branch}
if dbt.TransType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
branches[i].BranchType = b
branches[i].Op = b
branches[i].URL = data[b]
}
} else if dbt.TransType == "xa" {
branches[0].BranchType = dtmcli.BranchRollback
branches[0].Op = dtmcli.BranchRollback
branches[0].URL = data["url"]
branches[1].BranchType = dtmcli.BranchCommit
branches[1].Op = dtmcli.BranchCommit
branches[1].URL = data["url"]
} else {
return nil, fmt.Errorf("unknow trans type: %s", dbt.TransType)

8
dtmsvr/dtmsvr.mysql.sql

@ -34,15 +34,15 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch (
`url` varchar(128) NOT NULL COMMENT '动作关联的url',
`data` TEXT COMMENT '请求所携带的数据',
`bin_data` BLOB COMMENT 'grpc的二进制数据',
`branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支名称',
`branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa',
`branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支ID',
`op` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa',
`status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked',
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `branch_type`),
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`),
KEY `create_time` (`create_time`),
KEY `update_time` (`update_time`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
@ -50,7 +50,7 @@ drop table IF EXISTS dtm.trans_log;
CREATE TABLE IF NOT EXISTS dtm.trans_log (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`branch_id` varchar(128) DEFAULT NULL COMMENT '事务分支',
`branch_id` varchar(128) DEFAULT NULL COMMENT '事务分支ID',
`action` varchar(45) DEFAULT NULL COMMENT '行为',
`old_status` varchar(45) NOT NULL DEFAULT '' COMMENT '旧状态',
`new_status` varchar(45) NOT NULL COMMENT '新状态',

4
dtmsvr/dtmsvr.postgres.sql

@ -38,14 +38,14 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch (
data TEXT,
bin_data BLOB,
branch_id VARCHAR(128) NOT NULL,
branch_type varchar(45) NOT NULL,
op varchar(45) NOT NULL,
status varchar(45) NOT NULL,
finish_time timestamp(0) DEFAULT NULL,
rollback_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT gid_uniq UNIQUE (gid, branch_id, branch_type)
CONSTRAINT gid_uniq UNIQUE (gid, branch_id, op)
);
CREATE INDEX if not EXISTS create_time ON dtm.trans_branch (create_time);
CREATE INDEX if not EXISTS update_time ON dtm.trans_branch (update_time);

9
dtmsvr/metrics.go

@ -2,13 +2,14 @@ package dtmsvr
import (
"context"
"net/http"
"strings"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"net/http"
"strings"
)
var (
@ -87,9 +88,9 @@ func transactionMetrics(global *TransGlobal, status bool) {
func branchMetrics(global *TransGlobal, branch *TransBranch, status bool) {
if status {
branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.BranchType, "ok").Inc()
branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.Op, "ok").Inc()
} else {
branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.BranchType, "fail").Inc()
branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.Op, "fail").Inc()
}
}

18
dtmsvr/trans.go

@ -97,7 +97,7 @@ type TransBranch struct {
URL string `json:"url"`
BinData []byte
BranchID string `json:"branch_id"`
BranchType string
Op string
Status string
FinishTime *time.Time
RollbackTime *time.Time
@ -211,12 +211,12 @@ func (t *TransGlobal) setNextCron(ctype cronType) []string {
return []string{"next_cron_interval", "next_cron_time"}
}
func (t *TransGlobal) getURLResult(url string, branchID, branchType string, branchPayload []byte) (string, error) {
func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) {
if t.Protocol == "grpc" {
dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url))
server, method := dtmgimp.GetServerAndMethod(url)
conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, branchType, "")
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "")
err := conn.Invoke(ctx, method, branchPayload, []byte{})
if err == nil {
return dtmcli.ResultSuccess, nil
@ -234,10 +234,10 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran
dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url))
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,
"trans_type": t.TransType,
"branch_id": branchID,
"branch_type": branchType,
"gid": t.Gid,
"trans_type": t.TransType,
"branch_id": branchID,
"op": op,
}).
SetHeader("Content-type", "application/json").
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url)
@ -248,13 +248,13 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran
}
func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
body, err := t.getURLResult(branch.URL, branch.BranchID, branch.BranchType, branch.BinData)
body, err := t.getURLResult(branch.URL, branch.BranchID, branch.Op, branch.BinData)
if err != nil {
return "", err
}
if strings.Contains(body, dtmcli.ResultSuccess) {
return dtmcli.StatusSucceed, nil
} else if strings.HasSuffix(t.TransType, "saga") && branch.BranchType == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) {
} else if strings.HasSuffix(t.TransType, "saga") && branch.Op == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) {
return dtmcli.StatusFailed, nil
} else if strings.Contains(body, dtmcli.ResultOngoing) {
return "", dtmimp.ErrOngoing

14
dtmsvr/trans_msg.go

@ -21,12 +21,12 @@ func (t *transMsgProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
for i, step := range t.Steps {
b := &TransBranch{
Gid: t.Gid,
BranchID: fmt.Sprintf("%02d", i+1),
BinData: t.BinPayloads[i],
URL: step[dtmcli.BranchAction],
BranchType: dtmcli.BranchAction,
Status: dtmcli.StatusPrepared,
Gid: t.Gid,
BranchID: fmt.Sprintf("%02d", i+1),
BinData: t.BinPayloads[i],
URL: step[dtmcli.BranchAction],
Op: dtmcli.BranchAction,
Status: dtmcli.StatusPrepared,
}
branches = append(branches, *b)
}
@ -58,7 +58,7 @@ func (t *transMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) e
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
if branch.BranchType != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared {
if branch.Op != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared {
continue
}
err := t.execBranch(db, branch)

36
dtmsvr/trans_saga.go

@ -21,14 +21,14 @@ func (t *transSagaProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
for i, step := range t.Steps {
branch := fmt.Sprintf("%02d", i+1)
for _, branchType := range []string{dtmcli.BranchCompensate, dtmcli.BranchAction} {
for _, op := range []string{dtmcli.BranchCompensate, dtmcli.BranchAction} {
branches = append(branches, TransBranch{
Gid: t.Gid,
BranchID: branch,
BinData: t.BinPayloads[i],
URL: step[branchType],
BranchType: branchType,
Status: dtmcli.StatusPrepared,
Gid: t.Gid,
BranchID: branch,
BinData: t.BinPayloads[i],
URL: step[op],
Op: op,
Status: dtmcli.StatusPrepared,
})
}
}
@ -41,10 +41,10 @@ type cSagaCustom struct {
}
type branchResult struct {
index int
status string
started bool
branchType string
index int
status string
started bool
op string
}
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error {
@ -64,14 +64,14 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
if b.BranchType == dtmcli.BranchAction {
if b.Op == dtmcli.BranchAction {
if b.Status == dtmcli.StatusPrepared {
rsAToStart++
} else if b.Status == dtmcli.StatusFailed {
rsAFailed++
}
}
branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType}
branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op}
}
isPreconditionsSucceed := func(current int) bool {
// if !csc.Concurrent,then check the branch in previous step is succeed
@ -94,7 +94,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
if x := recover(); x != nil {
err = dtmimp.AsError(x)
}
resultChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
if err != nil {
dtmimp.LogRedf("exec branch error: %v", err)
}
@ -105,7 +105,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
toRun := []int{}
for current := 0; current < n; current++ {
br := &branchResults[current]
if br.branchType == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared {
if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared {
toRun = append(toRun, current)
}
}
@ -115,7 +115,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
runBranches := func(toRun []int) {
for _, b := range toRun {
branchResults[b].started = true
if branchResults[b].branchType == dtmcli.BranchAction {
if branchResults[b].op == dtmcli.BranchAction {
rsAStarted++
}
go asyncExecBranch(b)
@ -127,7 +127,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.branchType == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
go asyncExecBranch(i)
}
@ -138,7 +138,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
case r := <-resultChan:
br := &branchResults[r.index]
br.status = r.status
if r.branchType == dtmcli.BranchAction {
if r.op == dtmcli.BranchAction {
rsADone++
if r.status == dtmcli.StatusFailed {
rsAFailed++

4
dtmsvr/trans_tcc.go

@ -25,9 +25,9 @@ func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) e
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(db, dtmcli.StatusAborting)
}
branchType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
if branches[current].BranchType == branchType && branches[current].Status == dtmcli.StatusPrepared {
if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
dtmimp.Logf("branch info: current: %d ID: %d", current, branches[current].ID)
err := t.execBranch(db, &branches[current])
if err != nil {

2
dtmsvr/trans_xa.go

@ -27,7 +27,7 @@ func (t *transXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) er
}
currentType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchCommit, dtmcli.BranchRollback).(string)
for _, branch := range branches {
if branch.BranchType == currentType && branch.Status != dtmcli.StatusSucceed {
if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed {
err := t.execBranch(db, &branch)
if err != nil {
return err

8
examples/base_types.go

@ -61,10 +61,10 @@ func reqFrom(c *gin.Context) *TransReq {
func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {
info := dtmcli.BranchBarrier{
TransType: c.Query("trans_type"),
Gid: c.Query("gid"),
BranchID: c.Query("branch_id"),
BranchType: c.Query("branch_type"),
TransType: c.Query("trans_type"),
Gid: c.Query("gid"),
BranchID: c.Query("branch_id"),
Op: c.Query("op"),
}
return &info
}

2
examples/http_xa.go

@ -16,7 +16,7 @@ func init() {
var err error
XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) {
app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("branch_type"))
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("op"))
}))
})
dtmimp.FatalIfError(err)

10
test/base_test.go

@ -24,12 +24,12 @@ func TestBaseSqlDB(t *testing.T) {
asserts := assert.New(t)
db := common.DbGet(config.DB)
barrier := &dtmcli.BranchBarrier{
TransType: "saga",
Gid: "gid2",
BranchID: "branch_id2",
BranchType: dtmcli.BranchAction,
TransType: "saga",
Gid: "gid2",
BranchID: "branch_id2",
Op: dtmcli.BranchAction,
}
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(db dtmcli.DB) error {

12
test/tcc_barrier_test.go

@ -87,11 +87,11 @@ func TestTccBarrierDisorder(t *testing.T) {
r, _ := dtmimp.RestyClient.R().
SetBody(body).
SetQueryParams(map[string]string{
"dtm": tcc.Dtm,
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"branch_type": dtmcli.BranchTry,
"dtm": tcc.Dtm,
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"op": dtmcli.BranchTry,
}).
Post(tryURL)
assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功
@ -115,7 +115,7 @@ func TestTccBarrierDisorder(t *testing.T) {
}
func TestTccBarrierPanic(t *testing.T) {
bb := &dtmcli.BranchBarrier{TransType: "saga", Gid: "gid1", BranchID: "bid1", BranchType: "action", BarrierID: 1}
bb := &dtmcli.BranchBarrier{TransType: "saga", Gid: "gid1", BranchID: "bid1", Op: "action", BarrierID: 1}
var err error
func() {
defer dtmimp.P2E(&err)

Loading…
Cancel
Save