From 51a8eb3e0cdea5036012070a136fd7b0480eef32 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 6 Nov 2021 23:39:05 +0800 Subject: [PATCH] branch_type change to op --- bench/http.go | 4 ++-- dtmcli/barrier.go | 42 +++++++++++++++++------------------ dtmcli/barrier.mysql.sql | 4 ++-- dtmcli/barrier.postgres.sql | 4 ++-- dtmcli/dtmimp/trans_base.go | 14 ++++++------ dtmcli/tcc.go | 1 - dtmgrpc/barrier.go | 2 +- dtmgrpc/dtmgimp/dtmgimp.pb.go | 4 ++-- dtmgrpc/dtmgimp/dtmgimp.proto | 2 +- dtmgrpc/dtmgimp/utils.go | 8 +++---- dtmgrpc/tcc.go | 1 - dtmgrpc/xa.go | 2 +- dtmsvr/api.go | 6 ++--- dtmsvr/dtmsvr.mysql.sql | 8 +++---- dtmsvr/dtmsvr.postgres.sql | 4 ++-- dtmsvr/metrics.go | 9 ++++---- dtmsvr/trans.go | 18 +++++++-------- dtmsvr/trans_msg.go | 14 ++++++------ dtmsvr/trans_saga.go | 36 +++++++++++++++--------------- dtmsvr/trans_tcc.go | 4 ++-- dtmsvr/trans_xa.go | 2 +- examples/base_types.go | 8 +++---- examples/http_xa.go | 2 +- test/base_test.go | 10 ++++----- test/tcc_barrier_test.go | 12 +++++----- 25 files changed, 110 insertions(+), 111 deletions(-) diff --git a/bench/http.go b/bench/http.go index 010977f..98817d7 100644 --- a/bench/http.go +++ b/bench/http.go @@ -75,7 +75,7 @@ func StartSvr() { delta DECIMAL(11, 2) not null, gid varchar(45) not null, branch_id varchar(45) not null, - branch_type varchar(45) not null, + op varchar(45) not null, reason varchar(45), create_time datetime not null default now(), update_time datetime not null default now(), @@ -93,7 +93,7 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query()) f := func(tx dtmcli.DB) error { for i := 0; i < sqls; i++ { - _, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, branch_type, reason) values(?,?,?,?,?,?)", + _, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)", uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id"))) dtmimp.FatalIfError(err) _, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 024b975..c38823a 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -12,42 +12,42 @@ type BarrierBusiFunc func(db DB) error // BranchBarrier every branch info type BranchBarrier struct { - TransType string - Gid string - BranchID string - BranchType string - BarrierID int + TransType string + Gid string + BranchID string + Op string + BarrierID int } func (bb *BranchBarrier) String() string { - return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.BranchType) + return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.Op) } // BarrierFromQuery construct transaction info from request func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) { - return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("branch_type")) + return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("op")) } // BarrierFrom construct transaction info from request -func BarrierFrom(transType, gid, branchID, branchType string) (*BranchBarrier, error) { +func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) { ti := &BranchBarrier{ - TransType: transType, - Gid: gid, - BranchID: branchID, - BranchType: branchType, + TransType: transType, + Gid: gid, + BranchID: branchID, + Op: op, } - if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" { + if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.Op == "" { return nil, fmt.Errorf("invlid trans info: %v", ti) } return ti, nil } -func insertBarrier(tx Tx, transType string, gid string, branchID string, branchType string, barrierID string, reason string) (int64, error) { - if branchType == "" { +func insertBarrier(tx Tx, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) { + if op == "" { return 0, nil } - sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") - return dtmimp.DBExec(tx, sql, transType, gid, branchID, branchType, barrierID, reason) + sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") + return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason) } // Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 @@ -71,12 +71,12 @@ func (bb *BranchBarrier) Call(tx Tx, busiCall BarrierBusiFunc) (rerr error) { originType := map[string]string{ BranchCancel: BranchTry, BranchCompensate: BranchAction, - }[ti.BranchType] + }[ti.Op] - originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.BranchType) - currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, bid, ti.BranchType) + originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) + currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) - if (ti.BranchType == BranchCancel || ti.BranchType == BranchCompensate) && originAffected > 0 || // 这个是空补偿 + if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } diff --git a/dtmcli/barrier.mysql.sql b/dtmcli/barrier.mysql.sql index 60b828f..ccb524b 100644 --- a/dtmcli/barrier.mysql.sql +++ b/dtmcli/barrier.mysql.sql @@ -7,12 +7,12 @@ create table if not exists dtm_barrier.barrier( trans_type varchar(45) default '', gid varchar(128) default '', branch_id varchar(128) default '', - branch_type varchar(45) default '', + op varchar(45) default '', barrier_id varchar(45) default '', reason varchar(45) default '' comment 'the branch type who insert this record', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), key(update_time), - UNIQUE key(gid, branch_id, branch_type, barrier_id) + UNIQUE key(gid, branch_id, op, barrier_id) ); \ No newline at end of file diff --git a/dtmcli/barrier.postgres.sql b/dtmcli/barrier.postgres.sql index 62b8f5a..7add4a0 100644 --- a/dtmcli/barrier.postgres.sql +++ b/dtmcli/barrier.postgres.sql @@ -6,11 +6,11 @@ create table if not exists dtm_barrier.barrier( trans_type varchar(45) default '', gid varchar(128) default '', branch_id varchar(128) default '', - branch_type varchar(45) default '', + op varchar(45) default '', barrier_id varchar(45) default '', reason varchar(45) default '', create_time timestamp(0) DEFAULT NULL, update_time timestamp(0) DEFAULT NULL, PRIMARY KEY(id), - CONSTRAINT uniq_barrier unique(gid, branch_id, branch_type, barrier_id) + CONSTRAINT uniq_barrier unique(gid, branch_id, op, barrier_id) ); \ No newline at end of file diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index 17b6128..2cec738 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -51,7 +51,7 @@ type TransBase struct { Payloads []string `json:"payloads,omitempty"` // used in MSG/SAGA BinPayloads [][]byte `json:"-"` BranchIDGen `json:"-"` // used in XA/TCC - BranchType string `json:"-"` // used in XA/TCC + Op string `json:"-"` // used in XA/TCC QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG } @@ -102,15 +102,15 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin } // TransRequestBranch TransBAse request branch result -func TransRequestBranch(t *TransBase, body interface{}, branchID string, branchType string, url string) (*resty.Response, error) { +func TransRequestBranch(t *TransBase, body interface{}, branchID string, op string, url string) (*resty.Response, error) { resp, err := RestyClient.R(). SetBody(body). SetQueryParams(map[string]string{ - "dtm": t.Dtm, - "gid": t.Gid, - "branch_id": branchID, - "trans_type": t.TransType, - "branch_type": branchType, + "dtm": t.Dtm, + "gid": t.Gid, + "branch_id": branchID, + "trans_type": t.TransType, + "op": op, }). Post(url) return resp, CheckResponse(resp, err) diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 889055a..9f9c416 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -52,7 +52,6 @@ func TccFromQuery(qs url.Values) (*Tcc, error) { } // CallBranch call a tcc branch -// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewSubBranchID() err := dtmimp.TransRegisterBranch(&t.TransBase, map[string]string{ diff --git a/dtmgrpc/barrier.go b/dtmgrpc/barrier.go index 5c96246..6c1b004 100644 --- a/dtmgrpc/barrier.go +++ b/dtmgrpc/barrier.go @@ -10,5 +10,5 @@ import ( // BarrierFromGrpc generate a Barrier from grpc context func BarrierFromGrpc(ctx context.Context) (*dtmcli.BranchBarrier, error) { tb := dtmgimp.TransBaseFromGrpc(ctx) - return dtmcli.BarrierFrom(tb.TransType, tb.Gid, tb.BranchID, tb.BranchType) + return dtmcli.BarrierFrom(tb.TransType, tb.Gid, tb.BranchID, tb.Op) } diff --git a/dtmgrpc/dtmgimp/dtmgimp.pb.go b/dtmgrpc/dtmgimp/dtmgimp.pb.go index f689104..a0b5e21 100644 --- a/dtmgrpc/dtmgimp/dtmgimp.pb.go +++ b/dtmgrpc/dtmgimp/dtmgimp.pb.go @@ -273,7 +273,7 @@ type DtmBranchRequest struct { Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"` TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"` BranchID string `protobuf:"bytes,3,opt,name=BranchID,proto3" json:"BranchID,omitempty"` - BranchType string `protobuf:"bytes,4,opt,name=BranchType,proto3" json:"BranchType,omitempty"` + Op string `protobuf:"bytes,4,opt,name=Op,proto3" json:"Op,omitempty"` Data map[string]string `protobuf:"bytes,5,rep,name=Data,proto3" json:"Data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` BusiPayload []byte `protobuf:"bytes,6,opt,name=BusiPayload,proto3" json:"BusiPayload,omitempty"` } @@ -333,7 +333,7 @@ func (x *DtmBranchRequest) GetBranchID() string { func (x *DtmBranchRequest) GetBranchType() string { if x != nil { - return x.BranchType + return x.Op } return "" } diff --git a/dtmgrpc/dtmgimp/dtmgimp.proto b/dtmgrpc/dtmgimp/dtmgimp.proto index 21a0d8e..ef2479e 100644 --- a/dtmgrpc/dtmgimp/dtmgimp.proto +++ b/dtmgrpc/dtmgimp/dtmgimp.proto @@ -39,7 +39,7 @@ message DtmBranchRequest { string Gid = 1; string TransType = 2; string BranchID = 3; - string BranchType = 4; + string Op = 4; map Data = 5; bytes BusiPayload = 6; } diff --git a/dtmgrpc/dtmgimp/utils.go b/dtmgrpc/dtmgimp/utils.go index badcf14..397c0f7 100644 --- a/dtmgrpc/dtmgimp/utils.go +++ b/dtmgrpc/dtmgimp/utils.go @@ -37,12 +37,12 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error { const mdpre string = "dtm-" // TransInfo2Ctx add trans info to grpc context -func TransInfo2Ctx(gid, transType, branchID, branchType, dtm string) context.Context { +func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context { md := metadata.Pairs( mdpre+"gid", gid, mdpre+"trans_type", transType, mdpre+"branch_id", branchID, - mdpre+"branch_type", branchType, + mdpre+"op", op, mdpre+"dtm", dtm, ) return metadata.NewOutgoingContext(context.Background(), md) @@ -52,7 +52,7 @@ func TransInfo2Ctx(gid, transType, branchID, branchType, dtm string) context.Con func LogDtmCtx(ctx context.Context) { tb := TransBaseFromGrpc(ctx) if tb.Gid != "" { - dtmimp.Logf("gid: %s trans_type: %s branch_id: %s branch_type: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.BranchType, tb.Dtm) + dtmimp.Logf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm) } } @@ -68,6 +68,6 @@ func mdGet(md metadata.MD, key string) string { func TransBaseFromGrpc(ctx context.Context) *dtmimp.TransBase { md, _ := metadata.FromIncomingContext(ctx) tb := dtmimp.NewTransBase(mdGet(md, "gid"), mdGet(md, "trans_type"), mdGet(md, "dtm"), mdGet(md, "branch_id")) - tb.BranchType = mdGet(md, "branch_type") + tb.Op = mdGet(md, "op") return tb } diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index 16524c4..278e4cb 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -62,7 +62,6 @@ func TccFromGrpc(ctx context.Context) (*TccGrpc, error) { } // CallBranch call a tcc branch -// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL string, cancelURL string, reply interface{}) error { branchID := t.NewSubBranchID() bd, err := proto.Marshal(busiMsg) diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index db562cf..2be30ec 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -52,7 +52,7 @@ func NewXaGrpcClient(server string, mysqlConf map[string]string, notifyURL strin // HandleCallback 处理commit/rollback的回调 func (xc *XaGrpcClient) HandleCallback(ctx context.Context) (*emptypb.Empty, error) { tb := dtmgimp.TransBaseFromGrpc(ctx) - return &emptypb.Empty{}, xc.XaClientBase.HandleCallback(tb.Gid, tb.BranchID, tb.BranchType) + return &emptypb.Empty{}, xc.XaClientBase.HandleCallback(tb.Gid, tb.BranchID, tb.Op) } // XaLocalTransaction start a xa local transaction diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 5bc458a..458f752 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -56,13 +56,13 @@ func svcRegisterBranch(branch *TransBranch, data map[string]string) (interface{} branches := []TransBranch{*branch, *branch} if dbt.TransType == "tcc" { for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} { - branches[i].BranchType = b + branches[i].Op = b branches[i].URL = data[b] } } else if dbt.TransType == "xa" { - branches[0].BranchType = dtmcli.BranchRollback + branches[0].Op = dtmcli.BranchRollback branches[0].URL = data["url"] - branches[1].BranchType = dtmcli.BranchCommit + branches[1].Op = dtmcli.BranchCommit branches[1].URL = data["url"] } else { return nil, fmt.Errorf("unknow trans type: %s", dbt.TransType) diff --git a/dtmsvr/dtmsvr.mysql.sql b/dtmsvr/dtmsvr.mysql.sql index 76dbcc7..39a1cc3 100644 --- a/dtmsvr/dtmsvr.mysql.sql +++ b/dtmsvr/dtmsvr.mysql.sql @@ -34,15 +34,15 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch ( `url` varchar(128) NOT NULL COMMENT '动作关联的url', `data` TEXT COMMENT '请求所携带的数据', `bin_data` BLOB COMMENT 'grpc的二进制数据', - `branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支名称', - `branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', + `branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支ID', + `op` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', `status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked', `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), - UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `branch_type`), + UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`), KEY `create_time` (`create_time`), KEY `update_time` (`update_time`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; @@ -50,7 +50,7 @@ drop table IF EXISTS dtm.trans_log; CREATE TABLE IF NOT EXISTS dtm.trans_log ( `id` bigint(22) NOT NULL AUTO_INCREMENT, `gid` varchar(128) NOT NULL COMMENT '事务全局id', - `branch_id` varchar(128) DEFAULT NULL COMMENT '事务分支', + `branch_id` varchar(128) DEFAULT NULL COMMENT '事务分支ID', `action` varchar(45) DEFAULT NULL COMMENT '行为', `old_status` varchar(45) NOT NULL DEFAULT '' COMMENT '旧状态', `new_status` varchar(45) NOT NULL COMMENT '新状态', diff --git a/dtmsvr/dtmsvr.postgres.sql b/dtmsvr/dtmsvr.postgres.sql index 1d21a26..30c5bd5 100644 --- a/dtmsvr/dtmsvr.postgres.sql +++ b/dtmsvr/dtmsvr.postgres.sql @@ -38,14 +38,14 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch ( data TEXT, bin_data BLOB, branch_id VARCHAR(128) NOT NULL, - branch_type varchar(45) NOT NULL, + op varchar(45) NOT NULL, status varchar(45) NOT NULL, finish_time timestamp(0) DEFAULT NULL, rollback_time timestamp(0) DEFAULT NULL, create_time timestamp(0) DEFAULT NULL, update_time timestamp(0) DEFAULT NULL, PRIMARY KEY (id), - CONSTRAINT gid_uniq UNIQUE (gid, branch_id, branch_type) + CONSTRAINT gid_uniq UNIQUE (gid, branch_id, op) ); CREATE INDEX if not EXISTS create_time ON dtm.trans_branch (create_time); CREATE INDEX if not EXISTS update_time ON dtm.trans_branch (update_time); diff --git a/dtmsvr/metrics.go b/dtmsvr/metrics.go index 2c50e20..f0016f1 100644 --- a/dtmsvr/metrics.go +++ b/dtmsvr/metrics.go @@ -2,13 +2,14 @@ package dtmsvr import ( "context" + "net/http" + "strings" + "github.com/gin-gonic/gin" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" - "net/http" - "strings" ) var ( @@ -87,9 +88,9 @@ func transactionMetrics(global *TransGlobal, status bool) { func branchMetrics(global *TransGlobal, branch *TransBranch, status bool) { if status { - branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.BranchType, "ok").Inc() + branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.Op, "ok").Inc() } else { - branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.BranchType, "fail").Inc() + branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.Op, "fail").Inc() } } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 709b998..b4dea89 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -97,7 +97,7 @@ type TransBranch struct { URL string `json:"url"` BinData []byte BranchID string `json:"branch_id"` - BranchType string + Op string Status string FinishTime *time.Time RollbackTime *time.Time @@ -211,12 +211,12 @@ func (t *TransGlobal) setNextCron(ctype cronType) []string { return []string{"next_cron_interval", "next_cron_time"} } -func (t *TransGlobal) getURLResult(url string, branchID, branchType string, branchPayload []byte) (string, error) { +func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) { if t.Protocol == "grpc" { dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url)) server, method := dtmgimp.GetServerAndMethod(url) conn := dtmgimp.MustGetGrpcConn(server, true) - ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, branchType, "") + ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "") err := conn.Invoke(ctx, method, branchPayload, []byte{}) if err == nil { return dtmcli.ResultSuccess, nil @@ -234,10 +234,10 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url)) resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). SetQueryParams(map[string]string{ - "gid": t.Gid, - "trans_type": t.TransType, - "branch_id": branchID, - "branch_type": branchType, + "gid": t.Gid, + "trans_type": t.TransType, + "branch_id": branchID, + "op": op, }). SetHeader("Content-type", "application/json"). Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url) @@ -248,13 +248,13 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran } func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) { - body, err := t.getURLResult(branch.URL, branch.BranchID, branch.BranchType, branch.BinData) + body, err := t.getURLResult(branch.URL, branch.BranchID, branch.Op, branch.BinData) if err != nil { return "", err } if strings.Contains(body, dtmcli.ResultSuccess) { return dtmcli.StatusSucceed, nil - } else if strings.HasSuffix(t.TransType, "saga") && branch.BranchType == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) { + } else if strings.HasSuffix(t.TransType, "saga") && branch.Op == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) { return dtmcli.StatusFailed, nil } else if strings.Contains(body, dtmcli.ResultOngoing) { return "", dtmimp.ErrOngoing diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index ace7edf..8288faf 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -21,12 +21,12 @@ func (t *transMsgProcessor) GenBranches() []TransBranch { branches := []TransBranch{} for i, step := range t.Steps { b := &TransBranch{ - Gid: t.Gid, - BranchID: fmt.Sprintf("%02d", i+1), - BinData: t.BinPayloads[i], - URL: step[dtmcli.BranchAction], - BranchType: dtmcli.BranchAction, - Status: dtmcli.StatusPrepared, + Gid: t.Gid, + BranchID: fmt.Sprintf("%02d", i+1), + BinData: t.BinPayloads[i], + URL: step[dtmcli.BranchAction], + Op: dtmcli.BranchAction, + Status: dtmcli.StatusPrepared, } branches = append(branches, *b) } @@ -58,7 +58,7 @@ func (t *transMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) e current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { branch := &branches[current] - if branch.BranchType != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared { + if branch.Op != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared { continue } err := t.execBranch(db, branch) diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 0da5ae1..3760a89 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -21,14 +21,14 @@ func (t *transSagaProcessor) GenBranches() []TransBranch { branches := []TransBranch{} for i, step := range t.Steps { branch := fmt.Sprintf("%02d", i+1) - for _, branchType := range []string{dtmcli.BranchCompensate, dtmcli.BranchAction} { + for _, op := range []string{dtmcli.BranchCompensate, dtmcli.BranchAction} { branches = append(branches, TransBranch{ - Gid: t.Gid, - BranchID: branch, - BinData: t.BinPayloads[i], - URL: step[branchType], - BranchType: branchType, - Status: dtmcli.StatusPrepared, + Gid: t.Gid, + BranchID: branch, + BinData: t.BinPayloads[i], + URL: step[op], + Op: op, + Status: dtmcli.StatusPrepared, }) } } @@ -41,10 +41,10 @@ type cSagaCustom struct { } type branchResult struct { - index int - status string - started bool - branchType string + index int + status string + started bool + op string } func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) error { @@ -64,14 +64,14 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) branchResults := make([]branchResult, n) // save the branch result for i := 0; i < n; i++ { b := branches[i] - if b.BranchType == dtmcli.BranchAction { + if b.Op == dtmcli.BranchAction { if b.Status == dtmcli.StatusPrepared { rsAToStart++ } else if b.Status == dtmcli.StatusFailed { rsAFailed++ } } - branchResults[i] = branchResult{status: branches[i].Status, branchType: branches[i].BranchType} + branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op} } isPreconditionsSucceed := func(current int) bool { // if !csc.Concurrent,then check the branch in previous step is succeed @@ -94,7 +94,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) if x := recover(); x != nil { err = dtmimp.AsError(x) } - resultChan <- branchResult{index: i, status: branches[i].Status, branchType: branches[i].BranchType} + resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op} if err != nil { dtmimp.LogRedf("exec branch error: %v", err) } @@ -105,7 +105,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) toRun := []int{} for current := 0; current < n; current++ { br := &branchResults[current] - if br.branchType == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared { + if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && br.status == dtmcli.StatusPrepared { toRun = append(toRun, current) } } @@ -115,7 +115,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) runBranches := func(toRun []int) { for _, b := range toRun { branchResults[b].started = true - if branchResults[b].branchType == dtmcli.BranchAction { + if branchResults[b].op == dtmcli.BranchAction { rsAStarted++ } go asyncExecBranch(b) @@ -127,7 +127,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) branchResults[b].status = dtmcli.StatusSucceed } for i, b := range branchResults { - if b.branchType == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && branchResults[i+1].status != dtmcli.StatusPrepared { rsCToStart++ go asyncExecBranch(i) } @@ -138,7 +138,7 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) case r := <-resultChan: br := &branchResults[r.index] br.status = r.status - if r.branchType == dtmcli.BranchAction { + if r.op == dtmcli.BranchAction { rsADone++ if r.status == dtmcli.StatusFailed { rsAFailed++ diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 8baac06..8aa607b 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -25,9 +25,9 @@ func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) e if t.Status == dtmcli.StatusPrepared && t.isTimeout() { t.changeStatus(db, dtmcli.StatusAborting) } - branchType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string) + op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string) for current := len(branches) - 1; current >= 0; current-- { - if branches[current].BranchType == branchType && branches[current].Status == dtmcli.StatusPrepared { + if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared { dtmimp.Logf("branch info: current: %d ID: %d", current, branches[current].ID) err := t.execBranch(db, &branches[current]) if err != nil { diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index e8106bc..a882cf4 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -27,7 +27,7 @@ func (t *transXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) er } currentType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchCommit, dtmcli.BranchRollback).(string) for _, branch := range branches { - if branch.BranchType == currentType && branch.Status != dtmcli.StatusSucceed { + if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed { err := t.execBranch(db, &branch) if err != nil { return err diff --git a/examples/base_types.go b/examples/base_types.go index 394962a..7ba4ba6 100644 --- a/examples/base_types.go +++ b/examples/base_types.go @@ -61,10 +61,10 @@ func reqFrom(c *gin.Context) *TransReq { func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier { info := dtmcli.BranchBarrier{ - TransType: c.Query("trans_type"), - Gid: c.Query("gid"), - BranchID: c.Query("branch_id"), - BranchType: c.Query("branch_type"), + TransType: c.Query("trans_type"), + Gid: c.Query("gid"), + BranchID: c.Query("branch_id"), + Op: c.Query("op"), } return &info } diff --git a/examples/http_xa.go b/examples/http_xa.go index c968ca2..6f66779 100644 --- a/examples/http_xa.go +++ b/examples/http_xa.go @@ -16,7 +16,7 @@ func init() { var err error XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) { app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("branch_type")) + return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("op")) })) }) dtmimp.FatalIfError(err) diff --git a/test/base_test.go b/test/base_test.go index 51ed610..e9de670 100644 --- a/test/base_test.go +++ b/test/base_test.go @@ -24,12 +24,12 @@ func TestBaseSqlDB(t *testing.T) { asserts := assert.New(t) db := common.DbGet(config.DB) barrier := &dtmcli.BranchBarrier{ - TransType: "saga", - Gid: "gid2", - BranchID: "branch_id2", - BranchType: dtmcli.BranchAction, + TransType: "saga", + Gid: "gid2", + BranchID: "branch_id2", + Op: dtmcli.BranchAction, } - db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')") + db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')") tx, err := db.ToSQLDB().Begin() asserts.Nil(err) err = barrier.Call(tx, func(db dtmcli.DB) error { diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index 71aff5f..a1f5c20 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -87,11 +87,11 @@ func TestTccBarrierDisorder(t *testing.T) { r, _ := dtmimp.RestyClient.R(). SetBody(body). SetQueryParams(map[string]string{ - "dtm": tcc.Dtm, - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "branch_type": dtmcli.BranchTry, + "dtm": tcc.Dtm, + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "op": dtmcli.BranchTry, }). Post(tryURL) assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功 @@ -115,7 +115,7 @@ func TestTccBarrierDisorder(t *testing.T) { } func TestTccBarrierPanic(t *testing.T) { - bb := &dtmcli.BranchBarrier{TransType: "saga", Gid: "gid1", BranchID: "bid1", BranchType: "action", BarrierID: 1} + bb := &dtmcli.BranchBarrier{TransType: "saga", Gid: "gid1", BranchID: "bid1", Op: "action", BarrierID: 1} var err error func() { defer dtmimp.P2E(&err)