mirror of https://github.com/dtm-labs/dtm.git
13 changed files with 29 additions and 387 deletions
@ -1,69 +0,0 @@ |
|||
package dtm |
|||
|
|||
import ( |
|||
"context" |
|||
"database/sql" |
|||
"fmt" |
|||
|
|||
"github.com/yedf/dtm/common" |
|||
) |
|||
|
|||
type BusiFunc func(db *sql.DB) (interface{}, error) |
|||
|
|||
type TransInfo struct { |
|||
TransType string |
|||
Gid string |
|||
BranchID string |
|||
BranchType string |
|||
} |
|||
|
|||
func (t *TransInfo) String() string { |
|||
return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) |
|||
} |
|||
|
|||
type BarrierModel struct { |
|||
common.ModelBase |
|||
TransInfo |
|||
} |
|||
|
|||
func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } |
|||
|
|||
func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) { |
|||
if branchType == "" { |
|||
return 0, nil |
|||
} |
|||
res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values(?,?,?,?)", transType, gid, branchID, branchType) |
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
return res.RowsAffected() |
|||
} |
|||
|
|||
func ThroughBarrierCall(db *sql.DB, transType string, gid string, branchId string, branchType string, busiCall BusiFunc) (res interface{}, rerr error) { |
|||
tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{}) |
|||
if rerr != nil { |
|||
return |
|||
} |
|||
defer func() { |
|||
if x := recover(); x != nil { |
|||
tx.Rollback() |
|||
panic(x) |
|||
} else if rerr != nil { |
|||
tx.Rollback() |
|||
} else { |
|||
tx.Commit() |
|||
} |
|||
}() |
|||
|
|||
originType := map[string]string{ |
|||
"cancel": "action", |
|||
"compensate": "action", |
|||
}[branchType] |
|||
originAffected, _ := insertBarrier(tx, transType, gid, branchId, originType) |
|||
currentAffected, rerr := insertBarrier(tx, transType, gid, branchId, branchType) |
|||
if currentAffected == 0 || (originType == "cancel" || originType == "compensate") && originAffected > 0 { |
|||
return |
|||
} |
|||
res, rerr = busiCall(db) |
|||
return |
|||
} |
|||
@ -1,70 +0,0 @@ |
|||
package dtm |
|||
|
|||
import ( |
|||
"fmt" |
|||
|
|||
jsonitor "github.com/json-iterator/go" |
|||
"github.com/sirupsen/logrus" |
|||
"github.com/yedf/dtm/common" |
|||
) |
|||
|
|||
type Msg struct { |
|||
MsgData |
|||
Server string |
|||
} |
|||
|
|||
type MsgData struct { |
|||
Gid string `json:"gid"` |
|||
TransType string `json:"trans_type"` |
|||
Steps []MsgStep `json:"steps"` |
|||
QueryPrepared string `json:"query_prepared"` |
|||
} |
|||
type MsgStep struct { |
|||
Action string `json:"action"` |
|||
Data string `json:"data"` |
|||
} |
|||
|
|||
func MsgNew(server string) *Msg { |
|||
return &Msg{ |
|||
MsgData: MsgData{ |
|||
TransType: "msg", |
|||
}, |
|||
Server: server, |
|||
} |
|||
} |
|||
func (s *Msg) Add(action string, postData interface{}) *Msg { |
|||
logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) |
|||
step := MsgStep{ |
|||
Action: action, |
|||
Data: common.MustMarshalString(postData), |
|||
} |
|||
s.Steps = append(s.Steps, step) |
|||
return s |
|||
} |
|||
|
|||
func (s *Msg) Submit() error { |
|||
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) |
|||
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if resp.StatusCode() != 200 { |
|||
return fmt.Errorf("submit failed: %v", resp.Body()) |
|||
} |
|||
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() |
|||
return nil |
|||
} |
|||
|
|||
func (s *Msg) Prepare(queryPrepared string) error { |
|||
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) |
|||
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) |
|||
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if resp.StatusCode() != 200 { |
|||
return fmt.Errorf("prepare failed: %v", resp.Body()) |
|||
} |
|||
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() |
|||
return nil |
|||
} |
|||
@ -1,57 +0,0 @@ |
|||
package dtm |
|||
|
|||
import ( |
|||
"fmt" |
|||
|
|||
jsonitor "github.com/json-iterator/go" |
|||
"github.com/sirupsen/logrus" |
|||
"github.com/yedf/dtm/common" |
|||
) |
|||
|
|||
type Saga struct { |
|||
SagaData |
|||
Server string |
|||
} |
|||
|
|||
type SagaData struct { |
|||
Gid string `json:"gid"` |
|||
TransType string `json:"trans_type"` |
|||
Steps []SagaStep `json:"steps"` |
|||
} |
|||
type SagaStep struct { |
|||
Action string `json:"action"` |
|||
Compensate string `json:"compensate"` |
|||
Data string `json:"data"` |
|||
} |
|||
|
|||
func SagaNew(server string) *Saga { |
|||
return &Saga{ |
|||
SagaData: SagaData{ |
|||
TransType: "saga", |
|||
}, |
|||
Server: server, |
|||
} |
|||
} |
|||
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { |
|||
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) |
|||
step := SagaStep{ |
|||
Action: action, |
|||
Compensate: compensate, |
|||
Data: common.MustMarshalString(postData), |
|||
} |
|||
s.Steps = append(s.Steps, step) |
|||
return s |
|||
} |
|||
|
|||
func (s *Saga) Submit() error { |
|||
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) |
|||
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if resp.StatusCode() != 200 { |
|||
return fmt.Errorf("submit failed: %v", resp.Body()) |
|||
} |
|||
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() |
|||
return nil |
|||
} |
|||
@ -1,59 +0,0 @@ |
|||
package dtm |
|||
|
|||
import ( |
|||
"fmt" |
|||
|
|||
jsonitor "github.com/json-iterator/go" |
|||
"github.com/sirupsen/logrus" |
|||
"github.com/yedf/dtm/common" |
|||
) |
|||
|
|||
type Tcc struct { |
|||
TccData |
|||
Server string |
|||
} |
|||
|
|||
type TccData struct { |
|||
Gid string `json:"gid"` |
|||
TransType string `json:"trans_type"` |
|||
Steps []TccStep `json:"steps"` |
|||
} |
|||
type TccStep struct { |
|||
Try string `json:"try"` |
|||
Confirm string `json:"confirm"` |
|||
Cancel string `json:"cancel"` |
|||
Data string `json:"data"` |
|||
} |
|||
|
|||
func TccNew(server string) *Tcc { |
|||
return &Tcc{ |
|||
TccData: TccData{ |
|||
TransType: "tcc", |
|||
}, |
|||
Server: server, |
|||
} |
|||
} |
|||
func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *Tcc { |
|||
logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data) |
|||
step := TccStep{ |
|||
Try: try, |
|||
Confirm: confirm, |
|||
Cancel: cancel, |
|||
Data: common.MustMarshalString(data), |
|||
} |
|||
s.Steps = append(s.Steps, step) |
|||
return s |
|||
} |
|||
|
|||
func (s *Tcc) Submit() error { |
|||
logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) |
|||
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/submit", s.Server)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if resp.StatusCode() != 200 { |
|||
return fmt.Errorf("submit failed: %v", resp.Body()) |
|||
} |
|||
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() |
|||
return nil |
|||
} |
|||
@ -1,103 +0,0 @@ |
|||
package dtm |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net/url" |
|||
"strings" |
|||
|
|||
"github.com/gin-gonic/gin" |
|||
"github.com/yedf/dtm/common" |
|||
) |
|||
|
|||
type M = map[string]interface{} |
|||
|
|||
var e2p = common.E2P |
|||
|
|||
type XaGlobalFunc func() error |
|||
|
|||
type XaLocalFunc func(db *common.DB) error |
|||
|
|||
type XaClient struct { |
|||
Server string |
|||
Conf map[string]string |
|||
CallbackUrl string |
|||
} |
|||
|
|||
func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient { |
|||
xa := &XaClient{ |
|||
Server: server, |
|||
Conf: mysqlConf, |
|||
CallbackUrl: callbackUrl, |
|||
} |
|||
u, err := url.Parse(callbackUrl) |
|||
e2p(err) |
|||
app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { |
|||
type CallbackReq struct { |
|||
Gid string `json:"gid"` |
|||
Branch string `json:"branch"` |
|||
Action string `json:"action"` |
|||
} |
|||
req := CallbackReq{} |
|||
b, err := c.GetRawData() |
|||
e2p(err) |
|||
common.MustUnmarshal(b, &req) |
|||
tx, my := common.DbAlone(xa.Conf) |
|||
defer my.Close() |
|||
if req.Action == "commit" { |
|||
tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.Branch)) |
|||
} else if req.Action == "rollback" { |
|||
tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.Branch)) |
|||
} else { |
|||
panic(fmt.Errorf("unknown action: %s", req.Action)) |
|||
} |
|||
return M{"result": "SUCCESS"}, nil |
|||
})) |
|||
return xa |
|||
} |
|||
|
|||
func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { |
|||
defer common.P2E(&rerr) |
|||
branch := common.GenGid() |
|||
tx, my := common.DbAlone(xa.Conf) |
|||
defer func() { my.Close() }() |
|||
tx.Must().Exec(fmt.Sprintf("XA start '%s'", branch)) |
|||
err := transFunc(tx) |
|||
e2p(err) |
|||
resp, err := common.RestyClient.R(). |
|||
SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). |
|||
Post(xa.Server + "/branch") |
|||
e2p(err) |
|||
if !strings.Contains(resp.String(), "SUCCESS") { |
|||
e2p(fmt.Errorf("unknown server response: %s", resp.String())) |
|||
} |
|||
tx.Must().Exec(fmt.Sprintf("XA end '%s'", branch)) |
|||
tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branch)) |
|||
return nil |
|||
} |
|||
|
|||
func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rerr error) { |
|||
data := &M{ |
|||
"gid": gid, |
|||
"trans_type": "xa", |
|||
} |
|||
defer func() { |
|||
x := recover() |
|||
if x != nil { |
|||
_, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort") |
|||
rerr = x.(error) |
|||
} |
|||
}() |
|||
resp, err := common.RestyClient.R().SetBody(data).Post(xa.Server + "/prepare") |
|||
e2p(err) |
|||
if !strings.Contains(resp.String(), "SUCCESS") { |
|||
panic(fmt.Errorf("unexpected result: %s", resp.String())) |
|||
} |
|||
err = transFunc() |
|||
e2p(err) |
|||
resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit") |
|||
e2p(err) |
|||
if !strings.Contains(resp.String(), "SUCCESS") { |
|||
panic(fmt.Errorf("unexpected result: %s", resp.String())) |
|||
} |
|||
return nil |
|||
} |
|||
Loading…
Reference in new issue