Browse Source

Merge pull request #55 from yedf/alpha

fix postgres sql
pull/58/head v1.5.2
yedf2 4 years ago
committed by GitHub
parent
commit
fb40e806bb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      common/types.go
  2. 4
      common/types_test.go
  3. 4
      dtmcli/barrier.postgres.sql
  4. 2
      dtmcli/dtmimp/utils.go
  5. 65
      dtmsvr/api.go
  6. 2
      dtmsvr/api_http.go
  7. 24
      dtmsvr/dtmsvr.postgres.sql
  8. 341
      dtmsvr/trans.go
  9. 127
      dtmsvr/trans_class.go
  10. 83
      dtmsvr/trans_process.go
  11. 159
      dtmsvr/trans_status.go
  12. 0
      dtmsvr/trans_type_msg.go
  13. 3
      dtmsvr/trans_type_saga.go
  14. 0
      dtmsvr/trans_type_tcc.go
  15. 0
      dtmsvr/trans_type_xa.go
  16. 8
      dtmsvr/utils.go
  17. 1
      examples/base_http.go
  18. 4
      examples/examples.postgres.sql
  19. 6
      go.mod
  20. 9
      go.sum
  21. 8
      test/xa_test.go

3
common/types.go

@ -197,6 +197,9 @@ func checkConfig() string {
func WaitDBUp() {
sdb, err := dtmimp.StandaloneDB(DtmConfig.DB)
dtmimp.FatalIfError(err)
defer func() {
sdb.Close()
}()
for _, err := dtmimp.DBExec(sdb, "select 1"); err != nil; { // wait for mysql to start
time.Sleep(3 * time.Second)
_, err = dtmimp.DBExec(sdb, "select 1")

4
common/types_test.go

@ -19,6 +19,10 @@ func TestDb(t *testing.T) {
assert.NotEqual(t, nil, err)
}
func TestWaitDBUp(t *testing.T) {
WaitDBUp()
}
func TestDbAlone(t *testing.T) {
db, err := dtmimp.StandaloneDB(DtmConfig.DB)
assert.Nil(t, err)

4
dtmcli/barrier.postgres.sql

@ -9,8 +9,8 @@ create table if not exists dtm_barrier.barrier(
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '',
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
PRIMARY KEY(id),
CONSTRAINT uniq_barrier unique(gid, branch_id, op, barrier_id)
);

2
dtmcli/dtmimp/utils.go

@ -220,7 +220,7 @@ func GetDsn(conf map[string]string) string {
dsn := map[string]string{
"mysql": fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
conf["user"], conf["password"], host, conf["port"], conf["database"]),
"postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%s sslmode=disable TimeZone=Asia/Shanghai",
"postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%s sslmode=disable",
host, conf["user"], conf["password"], conf["database"], conf["port"]),
}[driver]
PanicIf(dsn == "", fmt.Errorf("unknow driver: %s", driver))

65
dtmsvr/api.go

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/yedf/dtm/dtmcli"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
@ -13,10 +14,11 @@ func svcSubmit(t *TransGlobal) (interface{}, error) {
err := t.saveNew(db)
if err == errUniqueConflict {
dbt := transFromDb(db, t.Gid)
dbt := transFromDb(db.DB, t.Gid, false)
if dbt.Status == dtmcli.StatusPrepared {
updates := t.setNextCron(cronReset)
db.Must().Model(t).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t)
dbr := db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t)
checkAffected(dbr)
} else if dbt.Status != dtmcli.StatusSubmitted {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot sumbmit", dbt.Status)}, nil
}
@ -28,7 +30,7 @@ func svcPrepare(t *TransGlobal) (interface{}, error) {
t.Status = dtmcli.StatusPrepared
err := t.saveNew(dbGet())
if err == errUniqueConflict {
dbt := transFromDb(dbGet(), t.Gid)
dbt := transFromDb(dbGet().DB, t.Gid, false)
if dbt.Status != dtmcli.StatusPrepared {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot prepare", dbt.Status)}, nil
}
@ -38,7 +40,7 @@ func svcPrepare(t *TransGlobal) (interface{}, error) {
func svcAbort(t *TransGlobal) (interface{}, error) {
db := dbGet()
dbt := transFromDb(db, t.Gid)
dbt := transFromDb(db.DB, t.Gid, false)
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != dtmcli.StatusPrepared && dbt.Status != dtmcli.StatusAborting {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: '%s' current status '%s', cannot abort", dbt.TransType, dbt.Status)}, nil
}
@ -46,32 +48,37 @@ func svcAbort(t *TransGlobal) (interface{}, error) {
return dbt.Process(db), nil
}
func svcRegisterBranch(branch *TransBranch, data map[string]string) (interface{}, error) {
db := dbGet()
dbt := transFromDb(db, branch.Gid)
if dbt.Status != dtmcli.StatusPrepared {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
}
func svcRegisterBranch(branch *TransBranch, data map[string]string) (ret interface{}, rerr error) {
err := dbGet().Transaction(func(db *gorm.DB) error {
dbt := transFromDb(db, branch.Gid, true)
if dbt.Status != dtmcli.StatusPrepared {
ret = map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}
return nil
}
branches := []TransBranch{*branch, *branch}
if dbt.TransType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
branches[i].Op = b
branches[i].URL = data[b]
branches := []TransBranch{*branch, *branch}
if dbt.TransType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
branches[i].Op = b
branches[i].URL = data[b]
}
} else if dbt.TransType == "xa" {
branches[0].Op = dtmcli.BranchRollback
branches[0].URL = data["url"]
branches[1].Op = dtmcli.BranchCommit
branches[1].URL = data["url"]
} else {
rerr = fmt.Errorf("unknow trans type: %s", dbt.TransType)
return nil
}
} else if dbt.TransType == "xa" {
branches[0].Op = dtmcli.BranchRollback
branches[0].URL = data["url"]
branches[1].Op = dtmcli.BranchCommit
branches[1].URL = data["url"]
} else {
return nil, fmt.Errorf("unknow trans type: %s", dbt.TransType)
}
db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(branches)
global := TransGlobal{Gid: branch.Gid}
global.touch(dbGet(), cronKeep)
return dtmcli.MapSuccess, nil
dbr := db.Clauses(clause.OnConflict{
DoNothing: true,
}).Create(branches)
checkAffected(dbr)
ret = dtmcli.MapSuccess
return nil
})
e2p(err)
return
}

2
dtmsvr/api_http.go

@ -57,7 +57,7 @@ func query(c *gin.Context) (interface{}, error) {
return nil, errors.New("no gid specified")
}
db := dbGet()
trans := transFromDb(db, gid)
trans := transFromDb(db.DB, gid, false)
branches := []TransBranch{}
db.Must().Where("gid", gid).Find(&branches)
return map[string]interface{}{"transaction": trans, "branches": branches}, nil

24
dtmsvr/dtmsvr.postgres.sql

@ -11,15 +11,15 @@ CREATE TABLE if not EXISTS dtm.trans_global (
status varchar(45) NOT NULL,
query_prepared varchar(128) NOT NULL,
protocol varchar(45) not null,
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
commit_time timestamp(0) DEFAULT NULL,
finish_time timestamp(0) DEFAULT NULL,
rollback_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
commit_time timestamp(0) with time zone DEFAULT NULL,
finish_time timestamp(0) with time zone DEFAULT NULL,
rollback_time timestamp(0) with time zone DEFAULT NULL,
options varchar(256) DEFAULT '',
custom_data varchar(256) DEFAULT '',
next_cron_interval int default null,
next_cron_time timestamp(0) default null,
next_cron_time timestamp(0) with time zone default null,
owner varchar(128) not null default '',
PRIMARY KEY (id),
CONSTRAINT gid UNIQUE (gid)
@ -34,14 +34,14 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
gid varchar(128) NOT NULL,
url varchar(128) NOT NULL,
data TEXT,
bin_data BLOB,
bin_data bytea,
branch_id VARCHAR(128) NOT NULL,
op varchar(45) NOT NULL,
status varchar(45) NOT NULL,
finish_time timestamp(0) DEFAULT NULL,
rollback_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
finish_time timestamp(0) with time zone DEFAULT NULL,
rollback_time timestamp(0) with time zone DEFAULT NULL,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT gid_uniq UNIQUE (gid, branch_id, op)
CONSTRAINT gid_branch_uniq UNIQUE (gid, branch_id, op)
);

341
dtmsvr/trans.go

@ -1,341 +0,0 @@
package dtmsvr
import (
"errors"
"fmt"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var errUniqueConflict = errors.New("unique key conflict error")
// TransGlobal global transaction
type TransGlobal struct {
common.ModelBase
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Steps []map[string]string `json:"steps" gorm:"-"`
Payloads []string `json:"payloads" gorm:"-"`
BinPayloads [][]byte `json:"-" gorm:"-"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
Protocol string `json:"protocol"`
CommitTime *time.Time
FinishTime *time.Time
RollbackTime *time.Time
Options string
CustomData string `json:"custom_data"`
NextCronInterval int64
NextCronTime *time.Time
dtmimp.TransOptions
lastTouched time.Time // record the start time of process
}
// TableName TableName
func (*TransGlobal) TableName() string {
return "dtm.trans_global"
}
type transProcessor interface {
GenBranches() []TransBranch
ProcessOnce(db *common.DB, branches []TransBranch) error
}
func (t *TransGlobal) touch(db *common.DB, ctype cronType) *gorm.DB {
t.lastTouched = time.Now()
updates := t.setNextCron(ctype)
return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Select(updates).Updates(t)
}
func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB {
old := t.Status
t.Status = status
updates := t.setNextCron(cronReset)
updates = append(updates, "status")
now := time.Now()
if status == dtmcli.StatusSucceed {
t.FinishTime = &now
updates = append(updates, "finish_time")
} else if status == dtmcli.StatusFailed {
t.RollbackTime = &now
updates = append(updates, "rollback_time")
}
dbr := db.Must().Model(t).Where("status=?", old).Select(updates).Updates(t)
checkAffected(dbr)
return dbr
}
func (t *TransGlobal) isTimeout() bool {
timeout := t.TimeoutToFail
if t.TimeoutToFail == 0 && t.TransType != "saga" {
timeout = config.TimeoutToFail
}
if timeout == 0 {
return false
}
return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second
}
func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
// TransBranch branch transaction
type TransBranch struct {
common.ModelBase
Gid string
URL string `json:"url"`
BinData []byte
BranchID string `json:"branch_id"`
Op string
Status string
FinishTime *time.Time
RollbackTime *time.Time
}
// TableName TableName
func (*TransBranch) TableName() string {
return "dtm.trans_branch_op"
}
func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB {
if common.DtmConfig.UpdateBranchSync > 0 {
dbr := db.Must().Model(t).Updates(map[string]interface{}{
"status": status,
"finish_time": time.Now(),
})
checkAffected(dbr)
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: t.ID, status: status}
}
t.Status = status
return db.DB
}
func checkAffected(db1 *gorm.DB) {
if db1.RowsAffected == 0 {
panic(fmt.Errorf("rows affected 0, please check for abnormal trans"))
}
}
type processorCreator func(*TransGlobal) transProcessor
var processorFac = map[string]processorCreator{}
func registorProcessorCreator(transType string, creator processorCreator) {
processorFac[transType] = creator
}
func (t *TransGlobal) getProcessor() transProcessor {
return processorFac[t.TransType](t)
}
// Process process global transaction once
func (t *TransGlobal) Process(db *common.DB) map[string]interface{} {
r := t.process(db)
transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess)
return r
}
func (t *TransGlobal) process(db *common.DB) map[string]interface{} {
if t.Options != "" {
dtmimp.MustUnmarshalString(t.Options, &t.TransOptions)
}
if !t.WaitResult {
go t.processInner(db)
return dtmcli.MapSuccess
}
submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(db)
if err != nil {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
if submitting && t.Status != dtmcli.StatusSucceed {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"}
}
return dtmcli.MapSuccess
}
func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil {
dtmimp.LogRedf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {
dtmimp.Logf("processed: %s", t.Gid)
TransProcessedTestChan <- t.Gid
dtmimp.Logf("notified: %s", t.Gid)
}
}()
dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status)
branches := []TransBranch{}
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(db, branches)
return
}
type cronType int
const (
cronBackoff cronType = iota
cronReset
cronKeep
)
func (t *TransGlobal) setNextCron(ctype cronType) []string {
if ctype == cronBackoff {
t.NextCronInterval = t.NextCronInterval * 2
} else if ctype == cronKeep {
// do nothing
} else if t.RetryInterval != 0 {
t.NextCronInterval = t.RetryInterval
} else {
t.NextCronInterval = config.RetryInterval
}
next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second)
t.NextCronTime = &next
return []string{"next_cron_interval", "next_cron_time"}
}
func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) {
if t.Protocol == "grpc" {
dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url))
server, method := dtmgimp.GetServerAndMethod(url)
conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "")
err := conn.Invoke(ctx, method, branchPayload, []byte{})
if err == nil {
return dtmcli.ResultSuccess, nil
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.Aborted {
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ResultOngoing, nil
} else if st.Message() == dtmcli.ResultFailure {
return dtmcli.ResultFailure, nil
}
}
return "", err
}
dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url))
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,
"trans_type": t.TransType,
"branch_id": branchID,
"op": op,
}).
SetHeader("Content-type", "application/json").
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url)
if err != nil {
return "", err
}
return resp.String(), nil
}
func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
body, err := t.getURLResult(branch.URL, branch.BranchID, branch.Op, branch.BinData)
if err != nil {
return "", err
}
if strings.Contains(body, dtmcli.ResultSuccess) {
return dtmcli.StatusSucceed, nil
} else if strings.HasSuffix(t.TransType, "saga") && branch.Op == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) {
return dtmcli.StatusFailed, nil
} else if strings.Contains(body, dtmcli.ResultOngoing) {
return "", dtmimp.ErrOngoing
}
return "", fmt.Errorf("http result should contains SUCCESS|FAILURE|ONGOING. grpc error should return nil|Aborted with message(FAILURE|ONGOING). \nrefer to: https://dtm.pub/summary/arch.html#http\nunkown result will be retried: %s", body)
}
func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error {
status, err := t.getBranchResult(branch)
if status != "" {
branch.changeStatus(db, status)
}
branchMetrics(t, branch, status == dtmcli.StatusSucceed)
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > config.RetryInterval && t.NextCronInterval > t.RetryInterval {
t.touch(db, cronReset)
} else if err == dtmimp.ErrOngoing {
t.touch(db, cronKeep)
} else if err != nil {
t.touch(db, cronBackoff)
}
return err
}
func (t *TransGlobal) saveNew(db *common.DB) error {
return db.Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
t.setNextCron(cronReset)
t.Options = dtmimp.MustMarshalString(t.TransOptions)
if t.Options == "{}" {
t.Options = ""
}
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(t)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
return errUniqueConflict
}
branches := t.getProcessor().GenBranches()
if len(branches) > 0 {
checkLocalhost(branches)
db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&branches)
}
return nil
})
}
// TransFromContext TransFromContext
func TransFromContext(c *gin.Context) *TransGlobal {
b, err := c.GetRawData()
e2p(err)
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
dtmimp.Logf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))
}
for _, d := range m.Steps {
if d["data"] != "" {
m.BinPayloads = append(m.BinPayloads, []byte(d["data"]))
}
}
m.Protocol = "http"
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal {
r := TransGlobal{
Gid: c.Gid,
TransType: c.TransType,
QueryPrepared: c.QueryPrepared,
Protocol: "grpc",
BinPayloads: c.BinPayloads,
}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
return &r
}

127
dtmsvr/trans_class.go

@ -0,0 +1,127 @@
package dtmsvr
import (
"errors"
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"gorm.io/gorm"
)
var errUniqueConflict = errors.New("unique key conflict error")
// TransGlobal global transaction
type TransGlobal struct {
common.ModelBase
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Steps []map[string]string `json:"steps" gorm:"-"`
Payloads []string `json:"payloads" gorm:"-"`
BinPayloads [][]byte `json:"-" gorm:"-"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
Protocol string `json:"protocol"`
CommitTime *time.Time
FinishTime *time.Time
RollbackTime *time.Time
Options string
CustomData string `json:"custom_data"`
NextCronInterval int64
NextCronTime *time.Time
dtmimp.TransOptions
lastTouched time.Time // record the start time of process
updateBranchSync bool
}
// TableName TableName
func (*TransGlobal) TableName() string {
return "dtm.trans_global"
}
// TransBranch branch transaction
type TransBranch struct {
common.ModelBase
Gid string
URL string `json:"url"`
BinData []byte
BranchID string `json:"branch_id"`
Op string
Status string
FinishTime *time.Time
RollbackTime *time.Time
}
// TableName TableName
func (*TransBranch) TableName() string {
return "dtm.trans_branch_op"
}
type transProcessor interface {
GenBranches() []TransBranch
ProcessOnce(db *common.DB, branches []TransBranch) error
}
type processorCreator func(*TransGlobal) transProcessor
var processorFac = map[string]processorCreator{}
func registorProcessorCreator(transType string, creator processorCreator) {
processorFac[transType] = creator
}
func (t *TransGlobal) getProcessor() transProcessor {
return processorFac[t.TransType](t)
}
type cronType int
const (
cronBackoff cronType = iota
cronReset
cronKeep
)
// TransFromContext TransFromContext
func TransFromContext(c *gin.Context) *TransGlobal {
b, err := c.GetRawData()
e2p(err)
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
dtmimp.Logf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))
}
for _, d := range m.Steps {
if d["data"] != "" {
m.BinPayloads = append(m.BinPayloads, []byte(d["data"]))
}
}
m.Protocol = "http"
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal {
r := TransGlobal{
Gid: c.Gid,
TransType: c.TransType,
QueryPrepared: c.QueryPrepared,
Protocol: "grpc",
BinPayloads: c.BinPayloads,
}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
return &r
}
func checkAffected(db1 *gorm.DB) {
if db1.RowsAffected == 0 {
panic(fmt.Errorf("rows affected 0, please check for abnormal trans"))
}
}

83
dtmsvr/trans_process.go

@ -0,0 +1,83 @@
package dtmsvr
import (
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// Process process global transaction once
func (t *TransGlobal) Process(db *common.DB) map[string]interface{} {
r := t.process(db)
transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess)
return r
}
func (t *TransGlobal) process(db *common.DB) map[string]interface{} {
if t.Options != "" {
dtmimp.MustUnmarshalString(t.Options, &t.TransOptions)
}
if !t.WaitResult {
go t.processInner(db)
return dtmcli.MapSuccess
}
submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(db)
if err != nil {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
if submitting && t.Status != dtmcli.StatusSucceed {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"}
}
return dtmcli.MapSuccess
}
func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil {
dtmimp.LogRedf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {
dtmimp.Logf("processed: %s", t.Gid)
TransProcessedTestChan <- t.Gid
dtmimp.Logf("notified: %s", t.Gid)
}
}()
dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status)
branches := []TransBranch{}
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(db, branches)
return
}
func (t *TransGlobal) saveNew(db *common.DB) error {
return db.Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
t.setNextCron(cronReset)
t.Options = dtmimp.MustMarshalString(t.TransOptions)
if t.Options == "{}" {
t.Options = ""
}
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(t)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
return errUniqueConflict
}
branches := t.getProcessor().GenBranches()
if len(branches) > 0 {
checkLocalhost(branches)
db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&branches)
}
return nil
})
}

159
dtmsvr/trans_status.go

@ -0,0 +1,159 @@
package dtmsvr
import (
"fmt"
"strings"
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
func (t *TransGlobal) touch(db *common.DB, ctype cronType) *gorm.DB {
t.lastTouched = time.Now()
updates := t.setNextCron(ctype)
return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Select(updates).Updates(t)
}
func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB {
old := t.Status
t.Status = status
updates := t.setNextCron(cronReset)
updates = append(updates, "status")
now := time.Now()
if status == dtmcli.StatusSucceed {
t.FinishTime = &now
updates = append(updates, "finish_time")
} else if status == dtmcli.StatusFailed {
t.RollbackTime = &now
updates = append(updates, "rollback_time")
}
dbr := db.Must().Model(&TransGlobal{}).Where("status=? and gid=?", old, t.Gid).Select(updates).Updates(t)
checkAffected(dbr)
return dbr
}
func (t *TransGlobal) changeBranchStatus(db *common.DB, b *TransBranch, status string) {
if common.DtmConfig.UpdateBranchSync > 0 || t.updateBranchSync {
err := db.Transaction(func(tx *gorm.DB) error {
dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, t.Status).Find(&[]TransGlobal{})
checkAffected(dbr) // check TransGlobal is not modified
dbr = tx.Model(b).Updates(map[string]interface{}{
"status": status,
"finish_time": time.Now(),
})
checkAffected(dbr)
return dbr.Error
})
e2p(err)
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: b.ID, status: status}
}
b.Status = status
}
func (t *TransGlobal) isTimeout() bool {
timeout := t.TimeoutToFail
if t.TimeoutToFail == 0 && t.TransType != "saga" {
timeout = config.TimeoutToFail
}
if timeout == 0 {
return false
}
return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second
}
func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) {
if t.Protocol == "grpc" {
dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url))
server, method := dtmgimp.GetServerAndMethod(url)
conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "")
err := conn.Invoke(ctx, method, branchPayload, []byte{})
if err == nil {
return dtmcli.ResultSuccess, nil
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.Aborted {
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ResultOngoing, nil
} else if st.Message() == dtmcli.ResultFailure {
return dtmcli.ResultFailure, nil
}
}
return "", err
}
dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url))
resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)).
SetQueryParams(map[string]string{
"gid": t.Gid,
"trans_type": t.TransType,
"branch_id": branchID,
"op": op,
}).
SetHeader("Content-type", "application/json").
Execute(dtmimp.If(branchPayload != nil || t.TransType == "xa", "POST", "GET").(string), url)
if err != nil {
return "", err
}
return resp.String(), nil
}
func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
body, err := t.getURLResult(branch.URL, branch.BranchID, branch.Op, branch.BinData)
if err != nil {
return "", err
}
if strings.Contains(body, dtmcli.ResultSuccess) {
return dtmcli.StatusSucceed, nil
} else if strings.HasSuffix(t.TransType, "saga") && branch.Op == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) {
return dtmcli.StatusFailed, nil
} else if strings.Contains(body, dtmcli.ResultOngoing) {
return "", dtmimp.ErrOngoing
}
return "", fmt.Errorf("http result should contains SUCCESS|FAILURE|ONGOING. grpc error should return nil|Aborted with message(FAILURE|ONGOING). \nrefer to: https://dtm.pub/summary/arch.html#http\nunkown result will be retried: %s", body)
}
func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error {
status, err := t.getBranchResult(branch)
if status != "" {
t.changeBranchStatus(db, branch, status)
}
branchMetrics(t, branch, status == dtmcli.StatusSucceed)
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > config.RetryInterval && t.NextCronInterval > t.RetryInterval {
t.touch(db, cronReset)
} else if err == dtmimp.ErrOngoing {
t.touch(db, cronKeep)
} else if err != nil {
t.touch(db, cronBackoff)
}
return err
}
func (t *TransGlobal) setNextCron(ctype cronType) []string {
if ctype == cronBackoff {
t.NextCronInterval = t.NextCronInterval * 2
} else if ctype == cronKeep {
// do nothing
} else if t.RetryInterval != 0 {
t.NextCronInterval = t.RetryInterval
} else {
t.NextCronInterval = config.RetryInterval
}
next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second)
t.NextCronTime = &next
return []string{"next_cron_interval", "next_cron_time"}
}

0
dtmsvr/trans_msg.go → dtmsvr/trans_type_msg.go

3
dtmsvr/trans_saga.go → dtmsvr/trans_type_saga.go

@ -59,6 +59,9 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
if t.CustomData != "" {
dtmimp.MustUnmarshalString(t.CustomData, &csc)
}
if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync
t.updateBranchSync = true
}
// resultStats
var rsAToStart, rsAStarted, rsADone, rsAFailed, rsASucceed, rsCToStart, rsCDone, rsCSucceed int
branchResults := make([]branchResult, n) // save the branch result

0
dtmsvr/trans_tcc.go → dtmsvr/trans_type_tcc.go

0
dtmsvr/trans_xa.go → dtmsvr/trans_type_xa.go

8
dtmsvr/utils.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type branchStatus struct {
@ -65,9 +66,12 @@ func getOneHexIP() string {
}
// transFromDb construct trans from db
func transFromDb(db *common.DB, gid string) *TransGlobal {
func transFromDb(db *gorm.DB, gid string, lock bool) *TransGlobal {
m := TransGlobal{}
dbr := db.Must().Model(&m).Where("gid=?", gid).First(&m)
if lock {
db = db.Clauses(clause.Locking{Strength: "UPDATE"})
}
dbr := db.Model(&m).Where("gid=?", gid).First(&m)
if dbr.Error == gorm.ErrRecordNotFound {
return nil
}

1
examples/base_http.go

@ -70,7 +70,6 @@ func (s *AutoEmptyString) SetOnce(v string) {
// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {
dtmimp.Logf("fetch result is: %s", s.value)
v := s.value
s.value = ""
return v

4
examples/examples.postgres.sql

@ -9,8 +9,8 @@ create table if not exists dtm_busi.user_account(
user_id int UNIQUE,
balance DECIMAL(10, 2) not null default '0',
trading_balance DECIMAL(10, 2) not null default '0',
create_time timestamp(0) DEFAULT now(),
update_time timestamp(0) DEFAULT now()
create_time timestamp(0) with time zone DEFAULT now(),
update_time timestamp(0) with time zone DEFAULT now()
);
-- SQLINES LICENSE FOR EVALUATION USE ONLY
create index if not exists create_idx on dtm_busi.user_account(create_time);

6
go.mod

@ -12,6 +12,8 @@ require (
github.com/lib/pq v1.10.3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
github.com/yedf/dtmcli v1.5.1
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa // indirect
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67 // indirect
@ -19,7 +21,7 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.3.0
gorm.io/driver/mysql v1.0.3
gorm.io/driver/postgres v1.1.2
gorm.io/gorm v1.21.15
gorm.io/driver/postgres v1.2.1
gorm.io/gorm v1.22.2
// gotest.tools v2.2.0+incompatible
)

9
go.sum

@ -243,6 +243,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/yedf/dtmcli v1.5.1 h1:KGeFpRc9nOJ382YfT06I21jX2R9urfmL7JG9SLIBERA=
github.com/yedf/dtmcli v1.5.1/go.mod h1:errG1rA5vaT70B00s6cAKo7O/tD1CLI8CGrESSaTsqw=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
@ -271,6 +273,8 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa h1:idItI2DDfCokpg0N51B2VtiLdJ4vAuXC9fnCb2gACo4=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -423,9 +427,14 @@ gorm.io/driver/mysql v1.0.3 h1:+JKBYPfn1tygR1/of/Fh2T8iwuVwzt+PEJmKaXzMQXg=
gorm.io/driver/mysql v1.0.3/go.mod h1:twGxftLBlFgNVNakL7F+P/x9oYqoymG3YYT8cAfI9oI=
gorm.io/driver/postgres v1.1.2 h1:Amy3hCvLqM+/ICzjCnQr8wKFLVJTeOTdlMT7kCP+J1Q=
gorm.io/driver/postgres v1.1.2/go.mod h1:/AGV0zvqF3mt9ZtzLzQmXWQ/5vr+1V1TyHZGZVjzmwI=
gorm.io/driver/postgres v1.2.1 h1:JDQKnF7MC51dgL09Vbydc5kl83KkVDlcXfSPJ+xhh68=
gorm.io/driver/postgres v1.2.1/go.mod h1:SHRZhu+D0tLOHV5qbxZRUM6kBcf3jp/kxPz2mYMTsNY=
gorm.io/gorm v1.20.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/gorm v1.21.15 h1:gAyaDoPw0lCyrSFWhBlahbUA1U4P5RViC1uIqoB+1Rk=
gorm.io/gorm v1.21.15/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
gorm.io/gorm v1.22.0/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
gorm.io/gorm v1.22.2 h1:1iKcvyJnR5bHydBhDqTwasOkoo6+o4Ms5cknSt6qP7I=
gorm.io/gorm v1.22.2/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

8
test/xa_test.go

@ -40,9 +40,11 @@ func TestXaDuplicate(t *testing.T) {
assert.Nil(t, err)
sdb, err := dtmimp.StandaloneDB(common.DtmConfig.DB)
assert.Nil(t, err)
_, err = dtmimp.DBExec(sdb, "xa recover")
assert.Nil(t, err)
_, err = dtmimp.DBExec(sdb, fmt.Sprintf("xa commit '%s-01'", gid)) // 先把某一个事务提交,模拟重复请求
if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql {
_, err = dtmimp.DBExec(sdb, "xa recover")
assert.Nil(t, err)
}
_, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // 先把某一个事务提交,模拟重复请求
assert.Nil(t, err)
return xa.CallBranch(req, examples.Busi+"/TransInXa")
})

Loading…
Cancel
Save