mirror of https://github.com/dtm-labs/dtm.git
14 changed files with 304 additions and 193 deletions
@ -0,0 +1,28 @@ |
|||
package dtmpb |
|||
|
|||
import ( |
|||
"github.com/yedf/dtm/dtmcli" |
|||
grpc "google.golang.org/grpc" |
|||
) |
|||
|
|||
var clients = map[string]DtmClient{} |
|||
|
|||
// GetDtmClient 1
|
|||
func GetDtmClient(grpcServer string) (cli DtmClient, rerr error) { |
|||
if clients[grpcServer] == nil { |
|||
conn, err := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(dtmcli.GrpcClientLog)) |
|||
if err == nil { |
|||
clients[grpcServer] = NewDtmClient(conn) |
|||
dtmcli.Logf("dtm client inited for %s", grpcServer) |
|||
} |
|||
} |
|||
cli = clients[grpcServer] |
|||
return |
|||
} |
|||
|
|||
// MustGetDtmClient 1
|
|||
func MustGetDtmClient(grpcServer string) DtmClient { |
|||
cli, err := GetDtmClient(grpcServer) |
|||
dtmcli.E2P(err) |
|||
return cli |
|||
} |
|||
@ -1,123 +1,15 @@ |
|||
package dtmsvr |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
import "fmt" |
|||
|
|||
"github.com/gin-gonic/gin" |
|||
"github.com/yedf/dtm/common" |
|||
"github.com/yedf/dtm/dtmcli" |
|||
"gorm.io/gorm" |
|||
"gorm.io/gorm/clause" |
|||
) |
|||
|
|||
func addRoute(engine *gin.Engine) { |
|||
engine.POST("/api/dtmsvr/prepare", common.WrapHandler(prepare)) |
|||
engine.POST("/api/dtmsvr/submit", common.WrapHandler(submit)) |
|||
engine.POST("/api/dtmsvr/registerXaBranch", common.WrapHandler(registerXaBranch)) |
|||
engine.POST("/api/dtmsvr/registerTccBranch", common.WrapHandler(registerTccBranch)) |
|||
engine.POST("/api/dtmsvr/abort", common.WrapHandler(abort)) |
|||
engine.GET("/api/dtmsvr/query", common.WrapHandler(query)) |
|||
engine.GET("/api/dtmsvr/newGid", common.WrapHandler(newGid)) |
|||
} |
|||
|
|||
func newGid(c *gin.Context) (interface{}, error) { |
|||
return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil |
|||
} |
|||
|
|||
func prepare(c *gin.Context) (interface{}, error) { |
|||
t := TransFromContext(c) |
|||
t.Status = "prepared" |
|||
t.saveNew(dbGet()) |
|||
return dtmcli.ResultSuccess, nil |
|||
} |
|||
|
|||
func submit(c *gin.Context) (interface{}, error) { |
|||
func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) { |
|||
db := dbGet() |
|||
t := TransFromContext(c) |
|||
dbt := TransFromDb(db, t.Gid) |
|||
if dbt != nil && dbt.Status != "prepared" && dbt.Status != "submitted" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status %s, cannot sumbmit", dbt.Status)}, nil |
|||
} |
|||
t.Status = "submitted" |
|||
t.saveNew(db) |
|||
return t.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil |
|||
} |
|||
|
|||
func abort(c *gin.Context) (interface{}, error) { |
|||
db := dbGet() |
|||
t := TransFromContext(c) |
|||
dbt := TransFromDb(db, t.Gid) |
|||
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil |
|||
} |
|||
return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil |
|||
} |
|||
|
|||
func registerXaBranch(c *gin.Context) (interface{}, error) { |
|||
branch := TransBranch{} |
|||
err := c.BindJSON(&branch) |
|||
e2p(err) |
|||
db := dbGet() |
|||
dbt := TransFromDb(db, branch.Gid) |
|||
if dbt.Status != "prepared" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil |
|||
} |
|||
branches := []TransBranch{branch, branch} |
|||
branches[0].BranchType = "rollback" |
|||
branches[1].BranchType = "commit" |
|||
db.Must().Clauses(clause.OnConflict{ |
|||
DoNothing: true, |
|||
}).Create(branches) |
|||
e2p(err) |
|||
global := TransGlobal{Gid: branch.Gid} |
|||
global.touch(db, config.TransCronInterval) |
|||
return dtmcli.ResultSuccess, nil |
|||
} |
|||
|
|||
func registerTccBranch(c *gin.Context) (interface{}, error) { |
|||
data := dtmcli.MS{} |
|||
err := c.BindJSON(&data) |
|||
e2p(err) |
|||
branch := TransBranch{ |
|||
Gid: data["gid"], |
|||
BranchID: data["branch_id"], |
|||
Status: data["status"], |
|||
Data: data["data"], |
|||
} |
|||
db := dbGet() |
|||
dbt := TransFromDb(db, branch.Gid) |
|||
if dbt.Status != "prepared" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil |
|||
} |
|||
return t.Process(db, waitResult), nil |
|||
|
|||
branches := []TransBranch{branch, branch, branch} |
|||
for i, b := range []string{"cancel", "confirm", "try"} { |
|||
branches[i].BranchType = b |
|||
branches[i].URL = data[b] |
|||
} |
|||
|
|||
db.Must().Clauses(clause.OnConflict{ |
|||
DoNothing: true, |
|||
}).Create(branches) |
|||
e2p(err) |
|||
global := TransGlobal{Gid: branch.Gid} |
|||
global.touch(dbGet(), config.TransCronInterval) |
|||
return dtmcli.ResultSuccess, nil |
|||
} |
|||
|
|||
func query(c *gin.Context) (interface{}, error) { |
|||
gid := c.Query("gid") |
|||
if gid == "" { |
|||
return nil, errors.New("no gid specified") |
|||
} |
|||
trans := TransGlobal{} |
|||
db := dbGet() |
|||
dbr := db.Must().Where("gid", gid).First(&trans) |
|||
if dbr.Error == gorm.ErrRecordNotFound { |
|||
return M{"transaction": nil, "branches": [0]int{}}, nil |
|||
} |
|||
branches := []TransBranch{} |
|||
db.Must().Where("gid", gid).Find(&branches) |
|||
return M{"transaction": trans, "branches": branches}, nil |
|||
} |
|||
|
|||
@ -0,0 +1,115 @@ |
|||
package dtmsvr |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
|
|||
"github.com/gin-gonic/gin" |
|||
"github.com/yedf/dtm/common" |
|||
"github.com/yedf/dtm/dtmcli" |
|||
"gorm.io/gorm" |
|||
"gorm.io/gorm/clause" |
|||
) |
|||
|
|||
func addRoute(engine *gin.Engine) { |
|||
engine.POST("/api/dtmsvr/prepare", common.WrapHandler(prepare)) |
|||
engine.POST("/api/dtmsvr/submit", common.WrapHandler(submit)) |
|||
engine.POST("/api/dtmsvr/registerXaBranch", common.WrapHandler(registerXaBranch)) |
|||
engine.POST("/api/dtmsvr/registerTccBranch", common.WrapHandler(registerTccBranch)) |
|||
engine.POST("/api/dtmsvr/abort", common.WrapHandler(abort)) |
|||
engine.GET("/api/dtmsvr/query", common.WrapHandler(query)) |
|||
engine.GET("/api/dtmsvr/newGid", common.WrapHandler(newGid)) |
|||
} |
|||
|
|||
func newGid(c *gin.Context) (interface{}, error) { |
|||
return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil |
|||
} |
|||
|
|||
func prepare(c *gin.Context) (interface{}, error) { |
|||
t := TransFromContext(c) |
|||
t.Status = "prepared" |
|||
t.saveNew(dbGet()) |
|||
return dtmcli.ResultSuccess, nil |
|||
} |
|||
|
|||
func submit(c *gin.Context) (interface{}, error) { |
|||
return svcSubmit(TransFromContext(c), c.Query("wait_result") == "1") |
|||
} |
|||
|
|||
func abort(c *gin.Context) (interface{}, error) { |
|||
db := dbGet() |
|||
t := TransFromContext(c) |
|||
dbt := TransFromDb(db, t.Gid) |
|||
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil |
|||
} |
|||
return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil |
|||
} |
|||
|
|||
func registerXaBranch(c *gin.Context) (interface{}, error) { |
|||
branch := TransBranch{} |
|||
err := c.BindJSON(&branch) |
|||
e2p(err) |
|||
db := dbGet() |
|||
dbt := TransFromDb(db, branch.Gid) |
|||
if dbt.Status != "prepared" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil |
|||
} |
|||
branches := []TransBranch{branch, branch} |
|||
branches[0].BranchType = "rollback" |
|||
branches[1].BranchType = "commit" |
|||
db.Must().Clauses(clause.OnConflict{ |
|||
DoNothing: true, |
|||
}).Create(branches) |
|||
e2p(err) |
|||
global := TransGlobal{Gid: branch.Gid} |
|||
global.touch(db, config.TransCronInterval) |
|||
return dtmcli.ResultSuccess, nil |
|||
} |
|||
|
|||
func registerTccBranch(c *gin.Context) (interface{}, error) { |
|||
data := dtmcli.MS{} |
|||
err := c.BindJSON(&data) |
|||
e2p(err) |
|||
branch := TransBranch{ |
|||
Gid: data["gid"], |
|||
BranchID: data["branch_id"], |
|||
Status: data["status"], |
|||
Data: data["data"], |
|||
} |
|||
db := dbGet() |
|||
dbt := TransFromDb(db, branch.Gid) |
|||
if dbt.Status != "prepared" { |
|||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil |
|||
} |
|||
|
|||
branches := []TransBranch{branch, branch, branch} |
|||
for i, b := range []string{"cancel", "confirm", "try"} { |
|||
branches[i].BranchType = b |
|||
branches[i].URL = data[b] |
|||
} |
|||
|
|||
db.Must().Clauses(clause.OnConflict{ |
|||
DoNothing: true, |
|||
}).Create(branches) |
|||
e2p(err) |
|||
global := TransGlobal{Gid: branch.Gid} |
|||
global.touch(dbGet(), config.TransCronInterval) |
|||
return dtmcli.ResultSuccess, nil |
|||
} |
|||
|
|||
func query(c *gin.Context) (interface{}, error) { |
|||
gid := c.Query("gid") |
|||
if gid == "" { |
|||
return nil, errors.New("no gid specified") |
|||
} |
|||
trans := TransGlobal{} |
|||
db := dbGet() |
|||
dbr := db.Must().Where("gid", gid).First(&trans) |
|||
if dbr.Error == gorm.ErrRecordNotFound { |
|||
return M{"transaction": nil, "branches": [0]int{}}, nil |
|||
} |
|||
branches := []TransBranch{} |
|||
db.Must().Where("gid", gid).Find(&branches) |
|||
return M{"transaction": trans, "branches": branches}, nil |
|||
} |
|||
Loading…
Reference in new issue