From a9a14750f5ccfc45f0c4b2d1fc81b5005a34388e Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 10 Nov 2021 22:27:19 +0800 Subject: [PATCH 1/6] changeBranchStatus will update timeoutToFail saga sync --- dtmsvr/trans.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 70ca608..b0e8193 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -75,6 +75,20 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB { return dbr } +func (t *TransGlobal) changeBranchStatus(db *common.DB, b *TransBranch, status string) *gorm.DB { + if common.DtmConfig.UpdateBranchSync > 0 || t.TransType == "saga" && t.TimeoutToFail > 0 { + dbr := db.Must().Model(b).Updates(map[string]interface{}{ + "status": status, + "finish_time": time.Now(), + }) + checkAffected(dbr) + } else { // 为了性能优化,把branch的status更新异步化 + updateBranchAsyncChan <- branchStatus{id: b.ID, status: status} + } + b.Status = status + return db.DB +} + func (t *TransGlobal) isTimeout() bool { timeout := t.TimeoutToFail if t.TimeoutToFail == 0 && t.TransType != "saga" { @@ -108,20 +122,6 @@ func (*TransBranch) TableName() string { return "dtm.trans_branch_op" } -func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB { - if common.DtmConfig.UpdateBranchSync > 0 { - dbr := db.Must().Model(t).Updates(map[string]interface{}{ - "status": status, - "finish_time": time.Now(), - }) - checkAffected(dbr) - } else { // 为了性能优化,把branch的status更新异步化 - updateBranchAsyncChan <- branchStatus{id: t.ID, status: status} - } - t.Status = status - return db.DB -} - func checkAffected(db1 *gorm.DB) { if db1.RowsAffected == 0 { panic(fmt.Errorf("rows affected 0, please check for abnormal trans")) @@ -265,7 +265,7 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) { func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error { status, err := t.getBranchResult(branch) if status != "" { - branch.changeStatus(db, status) + t.changeBranchStatus(db, branch, status) } branchMetrics(t, branch, status == dtmcli.StatusSucceed) // if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval From 7c527fcc3911252a46fb76d1b70019aeee8cddec Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 10 Nov 2021 23:24:13 +0800 Subject: [PATCH 2/6] split trans.go to three files --- dtmsvr/trans_class.go | 126 +++++++++++ dtmsvr/trans_process.go | 83 ++++++++ dtmsvr/{trans.go => trans_status.go} | 212 ++----------------- dtmsvr/{trans_msg.go => trans_type_msg.go} | 0 dtmsvr/{trans_saga.go => trans_type_saga.go} | 0 dtmsvr/{trans_tcc.go => trans_type_tcc.go} | 0 dtmsvr/{trans_xa.go => trans_type_xa.go} | 0 7 files changed, 221 insertions(+), 200 deletions(-) create mode 100644 dtmsvr/trans_class.go create mode 100644 dtmsvr/trans_process.go rename dtmsvr/{trans.go => trans_status.go} (50%) rename dtmsvr/{trans_msg.go => trans_type_msg.go} (100%) rename dtmsvr/{trans_saga.go => trans_type_saga.go} (100%) rename dtmsvr/{trans_tcc.go => trans_type_tcc.go} (100%) rename dtmsvr/{trans_xa.go => trans_type_xa.go} (100%) diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go new file mode 100644 index 0000000..02f08da --- /dev/null +++ b/dtmsvr/trans_class.go @@ -0,0 +1,126 @@ +package dtmsvr + +import ( + "errors" + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmgrpc/dtmgimp" + "gorm.io/gorm" +) + +var errUniqueConflict = errors.New("unique key conflict error") + +// TransGlobal global transaction +type TransGlobal struct { + common.ModelBase + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []map[string]string `json:"steps" gorm:"-"` + Payloads []string `json:"payloads" gorm:"-"` + BinPayloads [][]byte `json:"-" gorm:"-"` + Status string `json:"status"` + QueryPrepared string `json:"query_prepared"` + Protocol string `json:"protocol"` + CommitTime *time.Time + FinishTime *time.Time + RollbackTime *time.Time + Options string + CustomData string `json:"custom_data"` + NextCronInterval int64 + NextCronTime *time.Time + dtmimp.TransOptions + lastTouched time.Time // record the start time of process +} + +// TableName TableName +func (*TransGlobal) TableName() string { + return "dtm.trans_global" +} + +// TransBranch branch transaction +type TransBranch struct { + common.ModelBase + Gid string + URL string `json:"url"` + BinData []byte + BranchID string `json:"branch_id"` + Op string + Status string + FinishTime *time.Time + RollbackTime *time.Time +} + +// TableName TableName +func (*TransBranch) TableName() string { + return "dtm.trans_branch_op" +} + +type transProcessor interface { + GenBranches() []TransBranch + ProcessOnce(db *common.DB, branches []TransBranch) error +} + +type processorCreator func(*TransGlobal) transProcessor + +var processorFac = map[string]processorCreator{} + +func registorProcessorCreator(transType string, creator processorCreator) { + processorFac[transType] = creator +} + +func (t *TransGlobal) getProcessor() transProcessor { + return processorFac[t.TransType](t) +} + +type cronType int + +const ( + cronBackoff cronType = iota + cronReset + cronKeep +) + +// TransFromContext TransFromContext +func TransFromContext(c *gin.Context) *TransGlobal { + b, err := c.GetRawData() + e2p(err) + m := TransGlobal{} + dtmimp.MustUnmarshal(b, &m) + dtmimp.Logf("creating trans in prepare") + // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal + for _, p := range m.Payloads { + m.BinPayloads = append(m.BinPayloads, []byte(p)) + } + for _, d := range m.Steps { + if d["data"] != "" { + m.BinPayloads = append(m.BinPayloads, []byte(d["data"])) + } + } + m.Protocol = "http" + return &m +} + +// TransFromDtmRequest TransFromContext +func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal { + r := TransGlobal{ + Gid: c.Gid, + TransType: c.TransType, + QueryPrepared: c.QueryPrepared, + Protocol: "grpc", + BinPayloads: c.BinPayloads, + } + if c.Steps != "" { + dtmimp.MustUnmarshalString(c.Steps, &r.Steps) + } + return &r +} + +func checkAffected(db1 *gorm.DB) { + if db1.RowsAffected == 0 { + panic(fmt.Errorf("rows affected 0, please check for abnormal trans")) + } +} diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go new file mode 100644 index 0000000..e932c2a --- /dev/null +++ b/dtmsvr/trans_process.go @@ -0,0 +1,83 @@ +package dtmsvr + +import ( + "time" + + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmcli/dtmimp" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// Process process global transaction once +func (t *TransGlobal) Process(db *common.DB) map[string]interface{} { + r := t.process(db) + transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess) + return r +} + +func (t *TransGlobal) process(db *common.DB) map[string]interface{} { + if t.Options != "" { + dtmimp.MustUnmarshalString(t.Options, &t.TransOptions) + } + + if !t.WaitResult { + go t.processInner(db) + return dtmcli.MapSuccess + } + submitting := t.Status == dtmcli.StatusSubmitted + err := t.processInner(db) + if err != nil { + return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()} + } + if submitting && t.Status != dtmcli.StatusSucceed { + return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"} + } + return dtmcli.MapSuccess +} + +func (t *TransGlobal) processInner(db *common.DB) (rerr error) { + defer handlePanic(&rerr) + defer func() { + if rerr != nil { + dtmimp.LogRedf("processInner got error: %s", rerr.Error()) + } + if TransProcessedTestChan != nil { + dtmimp.Logf("processed: %s", t.Gid) + TransProcessedTestChan <- t.Gid + dtmimp.Logf("notified: %s", t.Gid) + } + }() + dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status) + branches := []TransBranch{} + db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) + t.lastTouched = time.Now() + rerr = t.getProcessor().ProcessOnce(db, branches) + return +} + +func (t *TransGlobal) saveNew(db *common.DB) error { + return db.Transaction(func(db1 *gorm.DB) error { + db := &common.DB{DB: db1} + t.setNextCron(cronReset) + t.Options = dtmimp.MustMarshalString(t.TransOptions) + if t.Options == "{}" { + t.Options = "" + } + dbr := db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(t) + if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 + return errUniqueConflict + } + branches := t.getProcessor().GenBranches() + if len(branches) > 0 { + checkLocalhost(branches) + db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&branches) + } + return nil + }) +} diff --git a/dtmsvr/trans.go b/dtmsvr/trans_status.go similarity index 50% rename from dtmsvr/trans.go rename to dtmsvr/trans_status.go index b0e8193..625ebff 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans_status.go @@ -1,12 +1,10 @@ package dtmsvr import ( - "errors" "fmt" "strings" "time" - "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" @@ -14,43 +12,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "gorm.io/gorm" - "gorm.io/gorm/clause" ) -var errUniqueConflict = errors.New("unique key conflict error") - -// TransGlobal global transaction -type TransGlobal struct { - common.ModelBase - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []map[string]string `json:"steps" gorm:"-"` - Payloads []string `json:"payloads" gorm:"-"` - BinPayloads [][]byte `json:"-" gorm:"-"` - Status string `json:"status"` - QueryPrepared string `json:"query_prepared"` - Protocol string `json:"protocol"` - CommitTime *time.Time - FinishTime *time.Time - RollbackTime *time.Time - Options string - CustomData string `json:"custom_data"` - NextCronInterval int64 - NextCronTime *time.Time - dtmimp.TransOptions - lastTouched time.Time // record the start time of process -} - -// TableName TableName -func (*TransGlobal) TableName() string { - return "dtm.trans_global" -} - -type transProcessor interface { - GenBranches() []TransBranch - ProcessOnce(db *common.DB, branches []TransBranch) error -} - func (t *TransGlobal) touch(db *common.DB, ctype cronType) *gorm.DB { t.lastTouched = time.Now() updates := t.setNextCron(ctype) @@ -104,113 +67,6 @@ func (t *TransGlobal) needProcess() bool { return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout() } -// TransBranch branch transaction -type TransBranch struct { - common.ModelBase - Gid string - URL string `json:"url"` - BinData []byte - BranchID string `json:"branch_id"` - Op string - Status string - FinishTime *time.Time - RollbackTime *time.Time -} - -// TableName TableName -func (*TransBranch) TableName() string { - return "dtm.trans_branch_op" -} - -func checkAffected(db1 *gorm.DB) { - if db1.RowsAffected == 0 { - panic(fmt.Errorf("rows affected 0, please check for abnormal trans")) - } -} - -type processorCreator func(*TransGlobal) transProcessor - -var processorFac = map[string]processorCreator{} - -func registorProcessorCreator(transType string, creator processorCreator) { - processorFac[transType] = creator -} - -func (t *TransGlobal) getProcessor() transProcessor { - return processorFac[t.TransType](t) -} - -// Process process global transaction once -func (t *TransGlobal) Process(db *common.DB) map[string]interface{} { - r := t.process(db) - transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess) - return r -} - -func (t *TransGlobal) process(db *common.DB) map[string]interface{} { - if t.Options != "" { - dtmimp.MustUnmarshalString(t.Options, &t.TransOptions) - } - - if !t.WaitResult { - go t.processInner(db) - return dtmcli.MapSuccess - } - submitting := t.Status == dtmcli.StatusSubmitted - err := t.processInner(db) - if err != nil { - return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()} - } - if submitting && t.Status != dtmcli.StatusSucceed { - return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"} - } - return dtmcli.MapSuccess -} - -func (t *TransGlobal) processInner(db *common.DB) (rerr error) { - defer handlePanic(&rerr) - defer func() { - if rerr != nil { - dtmimp.LogRedf("processInner got error: %s", rerr.Error()) - } - if TransProcessedTestChan != nil { - dtmimp.Logf("processed: %s", t.Gid) - TransProcessedTestChan <- t.Gid - dtmimp.Logf("notified: %s", t.Gid) - } - }() - dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status) - branches := []TransBranch{} - db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) - t.lastTouched = time.Now() - rerr = t.getProcessor().ProcessOnce(db, branches) - return -} - -type cronType int - -const ( - cronBackoff cronType = iota - cronReset - cronKeep -) - -func (t *TransGlobal) setNextCron(ctype cronType) []string { - if ctype == cronBackoff { - t.NextCronInterval = t.NextCronInterval * 2 - } else if ctype == cronKeep { - // do nothing - } else if t.RetryInterval != 0 { - t.NextCronInterval = t.RetryInterval - } else { - t.NextCronInterval = config.RetryInterval - } - - next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second) - t.NextCronTime = &next - return []string{"next_cron_interval", "next_cron_time"} -} - func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) { if t.Protocol == "grpc" { dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url)) @@ -280,62 +136,18 @@ func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error { return err } -func (t *TransGlobal) saveNew(db *common.DB) error { - return db.Transaction(func(db1 *gorm.DB) error { - db := &common.DB{DB: db1} - t.setNextCron(cronReset) - t.Options = dtmimp.MustMarshalString(t.TransOptions) - if t.Options == "{}" { - t.Options = "" - } - dbr := db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(t) - if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 - return errUniqueConflict - } - branches := t.getProcessor().GenBranches() - if len(branches) > 0 { - checkLocalhost(branches) - db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(&branches) - } - return nil - }) -} - -// TransFromContext TransFromContext -func TransFromContext(c *gin.Context) *TransGlobal { - b, err := c.GetRawData() - e2p(err) - m := TransGlobal{} - dtmimp.MustUnmarshal(b, &m) - dtmimp.Logf("creating trans in prepare") - // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal - for _, p := range m.Payloads { - m.BinPayloads = append(m.BinPayloads, []byte(p)) - } - for _, d := range m.Steps { - if d["data"] != "" { - m.BinPayloads = append(m.BinPayloads, []byte(d["data"])) - } +func (t *TransGlobal) setNextCron(ctype cronType) []string { + if ctype == cronBackoff { + t.NextCronInterval = t.NextCronInterval * 2 + } else if ctype == cronKeep { + // do nothing + } else if t.RetryInterval != 0 { + t.NextCronInterval = t.RetryInterval + } else { + t.NextCronInterval = config.RetryInterval } - m.Protocol = "http" - return &m -} -// TransFromDtmRequest TransFromContext -func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal { - r := TransGlobal{ - Gid: c.Gid, - TransType: c.TransType, - QueryPrepared: c.QueryPrepared, - Protocol: "grpc", - BinPayloads: c.BinPayloads, - } - if c.Steps != "" { - dtmimp.MustUnmarshalString(c.Steps, &r.Steps) - } - return &r + next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second) + t.NextCronTime = &next + return []string{"next_cron_interval", "next_cron_time"} } diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_type_msg.go similarity index 100% rename from dtmsvr/trans_msg.go rename to dtmsvr/trans_type_msg.go diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_type_saga.go similarity index 100% rename from dtmsvr/trans_saga.go rename to dtmsvr/trans_type_saga.go diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_type_tcc.go similarity index 100% rename from dtmsvr/trans_tcc.go rename to dtmsvr/trans_type_tcc.go diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_type_xa.go similarity index 100% rename from dtmsvr/trans_xa.go rename to dtmsvr/trans_type_xa.go From 8db7e03c4d32911c0bdc7e1358eb78c1e7a196ef Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 11 Nov 2021 10:00:33 +0800 Subject: [PATCH 3/6] run change branch status within a transaction which lock transglobal --- dtmsvr/api.go | 65 +++++++++++++++++++++++------------------- dtmsvr/api_http.go | 2 +- dtmsvr/trans_status.go | 20 ++++++++----- dtmsvr/utils.go | 8 ++++-- examples/base_http.go | 1 - 5 files changed, 56 insertions(+), 40 deletions(-) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 458f752..9e93c74 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/yedf/dtm/dtmcli" + "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -13,10 +14,11 @@ func svcSubmit(t *TransGlobal) (interface{}, error) { err := t.saveNew(db) if err == errUniqueConflict { - dbt := transFromDb(db, t.Gid) + dbt := transFromDb(db.DB, t.Gid, false) if dbt.Status == dtmcli.StatusPrepared { updates := t.setNextCron(cronReset) - db.Must().Model(t).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t) + dbr := db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t) + checkAffected(dbr) } else if dbt.Status != dtmcli.StatusSubmitted { return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot sumbmit", dbt.Status)}, nil } @@ -28,7 +30,7 @@ func svcPrepare(t *TransGlobal) (interface{}, error) { t.Status = dtmcli.StatusPrepared err := t.saveNew(dbGet()) if err == errUniqueConflict { - dbt := transFromDb(dbGet(), t.Gid) + dbt := transFromDb(dbGet().DB, t.Gid, false) if dbt.Status != dtmcli.StatusPrepared { return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot prepare", dbt.Status)}, nil } @@ -38,7 +40,7 @@ func svcPrepare(t *TransGlobal) (interface{}, error) { func svcAbort(t *TransGlobal) (interface{}, error) { db := dbGet() - dbt := transFromDb(db, t.Gid) + dbt := transFromDb(db.DB, t.Gid, false) if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != dtmcli.StatusPrepared && dbt.Status != dtmcli.StatusAborting { return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: '%s' current status '%s', cannot abort", dbt.TransType, dbt.Status)}, nil } @@ -46,32 +48,37 @@ func svcAbort(t *TransGlobal) (interface{}, error) { return dbt.Process(db), nil } -func svcRegisterBranch(branch *TransBranch, data map[string]string) (interface{}, error) { - db := dbGet() - dbt := transFromDb(db, branch.Gid) - if dbt.Status != dtmcli.StatusPrepared { - return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil - } +func svcRegisterBranch(branch *TransBranch, data map[string]string) (ret interface{}, rerr error) { + err := dbGet().Transaction(func(db *gorm.DB) error { + dbt := transFromDb(db, branch.Gid, true) + if dbt.Status != dtmcli.StatusPrepared { + ret = map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)} + return nil + } - branches := []TransBranch{*branch, *branch} - if dbt.TransType == "tcc" { - for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} { - branches[i].Op = b - branches[i].URL = data[b] + branches := []TransBranch{*branch, *branch} + if dbt.TransType == "tcc" { + for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} { + branches[i].Op = b + branches[i].URL = data[b] + } + } else if dbt.TransType == "xa" { + branches[0].Op = dtmcli.BranchRollback + branches[0].URL = data["url"] + branches[1].Op = dtmcli.BranchCommit + branches[1].URL = data["url"] + } else { + rerr = fmt.Errorf("unknow trans type: %s", dbt.TransType) + return nil } - } else if dbt.TransType == "xa" { - branches[0].Op = dtmcli.BranchRollback - branches[0].URL = data["url"] - branches[1].Op = dtmcli.BranchCommit - branches[1].URL = data["url"] - } else { - return nil, fmt.Errorf("unknow trans type: %s", dbt.TransType) - } - db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(branches) - global := TransGlobal{Gid: branch.Gid} - global.touch(dbGet(), cronKeep) - return dtmcli.MapSuccess, nil + dbr := db.Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(branches) + checkAffected(dbr) + ret = dtmcli.MapSuccess + return nil + }) + e2p(err) + return } diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index b1f2e90..6cbdb9e 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -57,7 +57,7 @@ func query(c *gin.Context) (interface{}, error) { return nil, errors.New("no gid specified") } db := dbGet() - trans := transFromDb(db, gid) + trans := transFromDb(db.DB, gid, false) branches := []TransBranch{} db.Must().Where("gid", gid).Find(&branches) return map[string]interface{}{"transaction": trans, "branches": branches}, nil diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 625ebff..dcee6b1 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "gorm.io/gorm" + "gorm.io/gorm/clause" ) func (t *TransGlobal) touch(db *common.DB, ctype cronType) *gorm.DB { @@ -33,23 +34,28 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB { t.RollbackTime = &now updates = append(updates, "rollback_time") } - dbr := db.Must().Model(t).Where("status=?", old).Select(updates).Updates(t) + dbr := db.Must().Model(&TransGlobal{}).Where("status=? and gid=?", old, t.Gid).Select(updates).Updates(t) checkAffected(dbr) return dbr } -func (t *TransGlobal) changeBranchStatus(db *common.DB, b *TransBranch, status string) *gorm.DB { +func (t *TransGlobal) changeBranchStatus(db *common.DB, b *TransBranch, status string) { if common.DtmConfig.UpdateBranchSync > 0 || t.TransType == "saga" && t.TimeoutToFail > 0 { - dbr := db.Must().Model(b).Updates(map[string]interface{}{ - "status": status, - "finish_time": time.Now(), + err := db.Transaction(func(tx *gorm.DB) error { + dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, t.Status).Find(&[]TransGlobal{}) + checkAffected(dbr) // check TransGlobal is not modified + dbr = tx.Model(b).Updates(map[string]interface{}{ + "status": status, + "finish_time": time.Now(), + }) + checkAffected(dbr) + return dbr.Error }) - checkAffected(dbr) + e2p(err) } else { // 为了性能优化,把branch的status更新异步化 updateBranchAsyncChan <- branchStatus{id: b.ID, status: status} } b.Status = status - return db.DB } func (t *TransGlobal) isTimeout() bool { diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index 1aeb587..4ae90b5 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" "gorm.io/gorm" + "gorm.io/gorm/clause" ) type branchStatus struct { @@ -65,9 +66,12 @@ func getOneHexIP() string { } // transFromDb construct trans from db -func transFromDb(db *common.DB, gid string) *TransGlobal { +func transFromDb(db *gorm.DB, gid string, lock bool) *TransGlobal { m := TransGlobal{} - dbr := db.Must().Model(&m).Where("gid=?", gid).First(&m) + if lock { + db = db.Clauses(clause.Locking{Strength: "UPDATE"}) + } + dbr := db.Model(&m).Where("gid=?", gid).First(&m) if dbr.Error == gorm.ErrRecordNotFound { return nil } diff --git a/examples/base_http.go b/examples/base_http.go index 6a6dd4d..447fc47 100644 --- a/examples/base_http.go +++ b/examples/base_http.go @@ -70,7 +70,6 @@ func (s *AutoEmptyString) SetOnce(v string) { // Fetch fetch the stored value, then reset the value to empty func (s *AutoEmptyString) Fetch() string { - dtmimp.Logf("fetch result is: %s", s.value) v := s.value s.value = "" return v From cc3725d94a1bd73df41ba4b0e1b36867421d56ee Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 11 Nov 2021 21:50:43 +0800 Subject: [PATCH 4/6] update sync optimized --- dtmsvr/trans_class.go | 3 ++- dtmsvr/trans_status.go | 2 +- dtmsvr/trans_type_saga.go | 3 +++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index 02f08da..77d1843 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -33,7 +33,8 @@ type TransGlobal struct { NextCronInterval int64 NextCronTime *time.Time dtmimp.TransOptions - lastTouched time.Time // record the start time of process + lastTouched time.Time // record the start time of process + updateBranchSync bool } // TableName TableName diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index dcee6b1..188c260 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -40,7 +40,7 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB { } func (t *TransGlobal) changeBranchStatus(db *common.DB, b *TransBranch, status string) { - if common.DtmConfig.UpdateBranchSync > 0 || t.TransType == "saga" && t.TimeoutToFail > 0 { + if common.DtmConfig.UpdateBranchSync > 0 || t.updateBranchSync { err := db.Transaction(func(tx *gorm.DB) error { dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, t.Status).Find(&[]TransGlobal{}) checkAffected(dbr) // check TransGlobal is not modified diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 3760a89..46ed6d3 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -59,6 +59,9 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) if t.CustomData != "" { dtmimp.MustUnmarshalString(t.CustomData, &csc) } + if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync + t.updateBranchSync = true + } // resultStats var rsAToStart, rsAStarted, rsADone, rsAFailed, rsASucceed, rsCToStart, rsCDone, rsCSucceed int branchResults := make([]branchResult, n) // save the branch result From 7d465f63562a33b169e15a62facb9a3206299ed0 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 12 Nov 2021 10:01:33 +0800 Subject: [PATCH 5/6] add TestWaitDBUp --- common/types.go | 3 +++ common/types_test.go | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/common/types.go b/common/types.go index 4f7f74a..4be83a3 100644 --- a/common/types.go +++ b/common/types.go @@ -197,6 +197,9 @@ func checkConfig() string { func WaitDBUp() { sdb, err := dtmimp.StandaloneDB(DtmConfig.DB) dtmimp.FatalIfError(err) + defer func() { + sdb.Close() + }() for _, err := dtmimp.DBExec(sdb, "select 1"); err != nil; { // wait for mysql to start time.Sleep(3 * time.Second) _, err = dtmimp.DBExec(sdb, "select 1") diff --git a/common/types_test.go b/common/types_test.go index 84d4c32..ed3c30f 100644 --- a/common/types_test.go +++ b/common/types_test.go @@ -19,6 +19,10 @@ func TestDb(t *testing.T) { assert.NotEqual(t, nil, err) } +func TestWaitDBUp(t *testing.T) { + WaitDBUp() +} + func TestDbAlone(t *testing.T) { db, err := dtmimp.StandaloneDB(DtmConfig.DB) assert.Nil(t, err) From 7d40b37334c9db437d3271e23833404dbf44a9c0 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 12 Nov 2021 12:51:13 +0800 Subject: [PATCH 6/6] postgres test passed --- dtmcli/barrier.postgres.sql | 4 ++-- dtmcli/dtmimp/utils.go | 2 +- dtmsvr/dtmsvr.postgres.sql | 24 ++++++++++++------------ examples/examples.postgres.sql | 4 ++-- go.mod | 6 ++++-- go.sum | 9 +++++++++ test/xa_test.go | 8 +++++--- 7 files changed, 35 insertions(+), 22 deletions(-) diff --git a/dtmcli/barrier.postgres.sql b/dtmcli/barrier.postgres.sql index 7add4a0..67c7bb0 100644 --- a/dtmcli/barrier.postgres.sql +++ b/dtmcli/barrier.postgres.sql @@ -9,8 +9,8 @@ create table if not exists dtm_barrier.barrier( op varchar(45) default '', barrier_id varchar(45) default '', reason varchar(45) default '', - create_time timestamp(0) DEFAULT NULL, - update_time timestamp(0) DEFAULT NULL, + create_time timestamp(0) with time zone DEFAULT NULL, + update_time timestamp(0) with time zone DEFAULT NULL, PRIMARY KEY(id), CONSTRAINT uniq_barrier unique(gid, branch_id, op, barrier_id) ); \ No newline at end of file diff --git a/dtmcli/dtmimp/utils.go b/dtmcli/dtmimp/utils.go index 531b792..7e75d20 100644 --- a/dtmcli/dtmimp/utils.go +++ b/dtmcli/dtmimp/utils.go @@ -220,7 +220,7 @@ func GetDsn(conf map[string]string) string { dsn := map[string]string{ "mysql": fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", conf["user"], conf["password"], host, conf["port"], conf["database"]), - "postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%s sslmode=disable TimeZone=Asia/Shanghai", + "postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%s sslmode=disable", host, conf["user"], conf["password"], conf["database"], conf["port"]), }[driver] PanicIf(dsn == "", fmt.Errorf("unknow driver: %s", driver)) diff --git a/dtmsvr/dtmsvr.postgres.sql b/dtmsvr/dtmsvr.postgres.sql index 97da66c..bccb8be 100644 --- a/dtmsvr/dtmsvr.postgres.sql +++ b/dtmsvr/dtmsvr.postgres.sql @@ -11,15 +11,15 @@ CREATE TABLE if not EXISTS dtm.trans_global ( status varchar(45) NOT NULL, query_prepared varchar(128) NOT NULL, protocol varchar(45) not null, - create_time timestamp(0) DEFAULT NULL, - update_time timestamp(0) DEFAULT NULL, - commit_time timestamp(0) DEFAULT NULL, - finish_time timestamp(0) DEFAULT NULL, - rollback_time timestamp(0) DEFAULT NULL, + create_time timestamp(0) with time zone DEFAULT NULL, + update_time timestamp(0) with time zone DEFAULT NULL, + commit_time timestamp(0) with time zone DEFAULT NULL, + finish_time timestamp(0) with time zone DEFAULT NULL, + rollback_time timestamp(0) with time zone DEFAULT NULL, options varchar(256) DEFAULT '', custom_data varchar(256) DEFAULT '', next_cron_interval int default null, - next_cron_time timestamp(0) default null, + next_cron_time timestamp(0) with time zone default null, owner varchar(128) not null default '', PRIMARY KEY (id), CONSTRAINT gid UNIQUE (gid) @@ -34,14 +34,14 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch_op ( gid varchar(128) NOT NULL, url varchar(128) NOT NULL, data TEXT, - bin_data BLOB, + bin_data bytea, branch_id VARCHAR(128) NOT NULL, op varchar(45) NOT NULL, status varchar(45) NOT NULL, - finish_time timestamp(0) DEFAULT NULL, - rollback_time timestamp(0) DEFAULT NULL, - create_time timestamp(0) DEFAULT NULL, - update_time timestamp(0) DEFAULT NULL, + finish_time timestamp(0) with time zone DEFAULT NULL, + rollback_time timestamp(0) with time zone DEFAULT NULL, + create_time timestamp(0) with time zone DEFAULT NULL, + update_time timestamp(0) with time zone DEFAULT NULL, PRIMARY KEY (id), - CONSTRAINT gid_uniq UNIQUE (gid, branch_id, op) + CONSTRAINT gid_branch_uniq UNIQUE (gid, branch_id, op) ); \ No newline at end of file diff --git a/examples/examples.postgres.sql b/examples/examples.postgres.sql index 1b61bdb..11167d4 100644 --- a/examples/examples.postgres.sql +++ b/examples/examples.postgres.sql @@ -9,8 +9,8 @@ create table if not exists dtm_busi.user_account( user_id int UNIQUE, balance DECIMAL(10, 2) not null default '0', trading_balance DECIMAL(10, 2) not null default '0', - create_time timestamp(0) DEFAULT now(), - update_time timestamp(0) DEFAULT now() + create_time timestamp(0) with time zone DEFAULT now(), + update_time timestamp(0) with time zone DEFAULT now() ); -- SQLINES LICENSE FOR EVALUATION USE ONLY create index if not exists create_idx on dtm_busi.user_account(create_time); diff --git a/go.mod b/go.mod index 1316cb2..d24c1d7 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,8 @@ require ( github.com/lib/pq v1.10.3 github.com/prometheus/client_golang v1.11.0 github.com/stretchr/testify v1.7.0 + github.com/yedf/dtmcli v1.5.1 + golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa // indirect golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67 // indirect @@ -19,7 +21,7 @@ require ( google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v2 v2.3.0 gorm.io/driver/mysql v1.0.3 - gorm.io/driver/postgres v1.1.2 - gorm.io/gorm v1.21.15 + gorm.io/driver/postgres v1.2.1 + gorm.io/gorm v1.22.2 // gotest.tools v2.2.0+incompatible ) diff --git a/go.sum b/go.sum index a58d8ce..01ddbd5 100644 --- a/go.sum +++ b/go.sum @@ -243,6 +243,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/yedf/dtmcli v1.5.1 h1:KGeFpRc9nOJ382YfT06I21jX2R9urfmL7JG9SLIBERA= +github.com/yedf/dtmcli v1.5.1/go.mod h1:errG1rA5vaT70B00s6cAKo7O/tD1CLI8CGrESSaTsqw= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -271,6 +273,8 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa h1:idItI2DDfCokpg0N51B2VtiLdJ4vAuXC9fnCb2gACo4= +golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -423,9 +427,14 @@ gorm.io/driver/mysql v1.0.3 h1:+JKBYPfn1tygR1/of/Fh2T8iwuVwzt+PEJmKaXzMQXg= gorm.io/driver/mysql v1.0.3/go.mod h1:twGxftLBlFgNVNakL7F+P/x9oYqoymG3YYT8cAfI9oI= gorm.io/driver/postgres v1.1.2 h1:Amy3hCvLqM+/ICzjCnQr8wKFLVJTeOTdlMT7kCP+J1Q= gorm.io/driver/postgres v1.1.2/go.mod h1:/AGV0zvqF3mt9ZtzLzQmXWQ/5vr+1V1TyHZGZVjzmwI= +gorm.io/driver/postgres v1.2.1 h1:JDQKnF7MC51dgL09Vbydc5kl83KkVDlcXfSPJ+xhh68= +gorm.io/driver/postgres v1.2.1/go.mod h1:SHRZhu+D0tLOHV5qbxZRUM6kBcf3jp/kxPz2mYMTsNY= gorm.io/gorm v1.20.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw= gorm.io/gorm v1.21.15 h1:gAyaDoPw0lCyrSFWhBlahbUA1U4P5RViC1uIqoB+1Rk= gorm.io/gorm v1.21.15/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= +gorm.io/gorm v1.22.0/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= +gorm.io/gorm v1.22.2 h1:1iKcvyJnR5bHydBhDqTwasOkoo6+o4Ms5cknSt6qP7I= +gorm.io/gorm v1.22.2/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/test/xa_test.go b/test/xa_test.go index 0ff6ef2..bcbcfc9 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -40,9 +40,11 @@ func TestXaDuplicate(t *testing.T) { assert.Nil(t, err) sdb, err := dtmimp.StandaloneDB(common.DtmConfig.DB) assert.Nil(t, err) - _, err = dtmimp.DBExec(sdb, "xa recover") - assert.Nil(t, err) - _, err = dtmimp.DBExec(sdb, fmt.Sprintf("xa commit '%s-01'", gid)) // 先把某一个事务提交,模拟重复请求 + if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql { + _, err = dtmimp.DBExec(sdb, "xa recover") + assert.Nil(t, err) + } + _, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // 先把某一个事务提交,模拟重复请求 assert.Nil(t, err) return xa.CallBranch(req, examples.Busi+"/TransInXa") })