🔥A cross-language distributed transaction manager. Support xa, tcc, saga, transactional messages. 跨语言分布式事务管理器
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

93 lines
2.5 KiB

package dtmgrpc
import (
context "context"
"fmt"
"github.com/yedf/dtm/dtmcli"
)
// TccGrpc struct of tcc
type TccGrpc struct {
dtmcli.TransBase
}
// TccGlobalFunc type of global tcc call
type TccGlobalFunc func(tcc *TccGrpc) error
// TccGlobalTransaction begin a tcc global transaction
// dtm dtm服务器地址
// gid 全局事务id
// tccFunc tcc事务函数,里面会定义全局事务的分支
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
tcc := &TccGrpc{TransBase: *dtmcli.NewTransBase(gid, "tcc", dtm, "")}
dc := MustGetDtmClient(tcc.Dtm)
dr := &DtmRequest{
Gid: tcc.Gid,
TransType: tcc.TransType,
}
_, rerr = dc.Prepare(context.Background(), dr)
if rerr != nil {
return rerr
}
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
defer func() {
x := recover()
if x == nil && rerr == nil {
_, rerr = dc.Submit(context.Background(), dr)
return
}
_, err := dc.Abort(context.Background(), dr)
if rerr == nil {
rerr = err
}
if x != nil {
panic(x)
}
}()
return tccFunc(tcc)
}
// TccFromRequest tcc from request info
func TccFromRequest(br *BusiRequest) (*TccGrpc, error) {
tcc := &TccGrpc{
TransBase: *dtmcli.NewTransBase(br.Info.Gid, br.Info.TransType, br.Dtm, br.Info.BranchID),
}
if tcc.Dtm == "" || tcc.Gid == "" {
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, br.Info.BranchID)
}
return tcc, nil
}
// CallBranch call a tcc branch
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
func (t *TccGrpc) CallBranch(busiData []byte, tryURL string, confirmURL string, cancelURL string) (*BusiReply, error) {
branchID := t.NewBranchID()
_, err := MustGetDtmClient(t.Dtm).RegisterTccBranch(context.Background(), &DtmTccBranchRequest{
Info: &BranchInfo{
Gid: t.Gid,
TransType: t.TransType,
BranchID: branchID,
},
BusiData: string(busiData),
Try: tryURL,
Confirm: confirmURL,
Cancel: cancelURL,
})
if err != nil {
return nil, err
}
server, method := GetServerAndMethod(tryURL)
reply := &BusiReply{}
err = MustGetGrpcConn(server).Invoke(context.Background(), method, &BusiRequest{
Info: &BranchInfo{
Gid: t.Gid,
TransType: t.TransType,
BranchID: branchID,
BranchType: t.TransType,
},
BusiData: busiData,
Dtm: t.Dtm,
}, reply)
return reply, err
}