From 7c527fcc3911252a46fb76d1b70019aeee8cddec Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 10 Nov 2021 23:24:13 +0800 Subject: [PATCH] split trans.go to three files --- dtmsvr/trans_class.go | 126 +++++++++++ dtmsvr/trans_process.go | 83 ++++++++ dtmsvr/{trans.go => trans_status.go} | 212 ++----------------- dtmsvr/{trans_msg.go => trans_type_msg.go} | 0 dtmsvr/{trans_saga.go => trans_type_saga.go} | 0 dtmsvr/{trans_tcc.go => trans_type_tcc.go} | 0 dtmsvr/{trans_xa.go => trans_type_xa.go} | 0 7 files changed, 221 insertions(+), 200 deletions(-) create mode 100644 dtmsvr/trans_class.go create mode 100644 dtmsvr/trans_process.go rename dtmsvr/{trans.go => trans_status.go} (50%) rename dtmsvr/{trans_msg.go => trans_type_msg.go} (100%) rename dtmsvr/{trans_saga.go => trans_type_saga.go} (100%) rename dtmsvr/{trans_tcc.go => trans_type_tcc.go} (100%) rename dtmsvr/{trans_xa.go => trans_type_xa.go} (100%) diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go new file mode 100644 index 0000000..02f08da --- /dev/null +++ b/dtmsvr/trans_class.go @@ -0,0 +1,126 @@ +package dtmsvr + +import ( + "errors" + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmgrpc/dtmgimp" + "gorm.io/gorm" +) + +var errUniqueConflict = errors.New("unique key conflict error") + +// TransGlobal global transaction +type TransGlobal struct { + common.ModelBase + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []map[string]string `json:"steps" gorm:"-"` + Payloads []string `json:"payloads" gorm:"-"` + BinPayloads [][]byte `json:"-" gorm:"-"` + Status string `json:"status"` + QueryPrepared string `json:"query_prepared"` + Protocol string `json:"protocol"` + CommitTime *time.Time + FinishTime *time.Time + RollbackTime *time.Time + Options string + CustomData string `json:"custom_data"` + NextCronInterval int64 + NextCronTime *time.Time + dtmimp.TransOptions + lastTouched time.Time // record the start time of process +} + +// TableName TableName +func (*TransGlobal) TableName() string { + return "dtm.trans_global" +} + +// TransBranch branch transaction +type TransBranch struct { + common.ModelBase + Gid string + URL string `json:"url"` + BinData []byte + BranchID string `json:"branch_id"` + Op string + Status string + FinishTime *time.Time + RollbackTime *time.Time +} + +// TableName TableName +func (*TransBranch) TableName() string { + return "dtm.trans_branch_op" +} + +type transProcessor interface { + GenBranches() []TransBranch + ProcessOnce(db *common.DB, branches []TransBranch) error +} + +type processorCreator func(*TransGlobal) transProcessor + +var processorFac = map[string]processorCreator{} + +func registorProcessorCreator(transType string, creator processorCreator) { + processorFac[transType] = creator +} + +func (t *TransGlobal) getProcessor() transProcessor { + return processorFac[t.TransType](t) +} + +type cronType int + +const ( + cronBackoff cronType = iota + cronReset + cronKeep +) + +// TransFromContext TransFromContext +func TransFromContext(c *gin.Context) *TransGlobal { + b, err := c.GetRawData() + e2p(err) + m := TransGlobal{} + dtmimp.MustUnmarshal(b, &m) + dtmimp.Logf("creating trans in prepare") + // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal + for _, p := range m.Payloads { + m.BinPayloads = append(m.BinPayloads, []byte(p)) + } + for _, d := range m.Steps { + if d["data"] != "" { + m.BinPayloads = append(m.BinPayloads, []byte(d["data"])) + } + } + m.Protocol = "http" + return &m +} + +// TransFromDtmRequest TransFromContext +func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal { + r := TransGlobal{ + Gid: c.Gid, + TransType: c.TransType, + QueryPrepared: c.QueryPrepared, + Protocol: "grpc", + BinPayloads: c.BinPayloads, + } + if c.Steps != "" { + dtmimp.MustUnmarshalString(c.Steps, &r.Steps) + } + return &r +} + +func checkAffected(db1 *gorm.DB) { + if db1.RowsAffected == 0 { + panic(fmt.Errorf("rows affected 0, please check for abnormal trans")) + } +} diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go new file mode 100644 index 0000000..e932c2a --- /dev/null +++ b/dtmsvr/trans_process.go @@ -0,0 +1,83 @@ +package dtmsvr + +import ( + "time" + + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmcli/dtmimp" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// Process process global transaction once +func (t *TransGlobal) Process(db *common.DB) map[string]interface{} { + r := t.process(db) + transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess) + return r +} + +func (t *TransGlobal) process(db *common.DB) map[string]interface{} { + if t.Options != "" { + dtmimp.MustUnmarshalString(t.Options, &t.TransOptions) + } + + if !t.WaitResult { + go t.processInner(db) + return dtmcli.MapSuccess + } + submitting := t.Status == dtmcli.StatusSubmitted + err := t.processInner(db) + if err != nil { + return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()} + } + if submitting && t.Status != dtmcli.StatusSucceed { + return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"} + } + return dtmcli.MapSuccess +} + +func (t *TransGlobal) processInner(db *common.DB) (rerr error) { + defer handlePanic(&rerr) + defer func() { + if rerr != nil { + dtmimp.LogRedf("processInner got error: %s", rerr.Error()) + } + if TransProcessedTestChan != nil { + dtmimp.Logf("processed: %s", t.Gid) + TransProcessedTestChan <- t.Gid + dtmimp.Logf("notified: %s", t.Gid) + } + }() + dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status) + branches := []TransBranch{} + db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) + t.lastTouched = time.Now() + rerr = t.getProcessor().ProcessOnce(db, branches) + return +} + +func (t *TransGlobal) saveNew(db *common.DB) error { + return db.Transaction(func(db1 *gorm.DB) error { + db := &common.DB{DB: db1} + t.setNextCron(cronReset) + t.Options = dtmimp.MustMarshalString(t.TransOptions) + if t.Options == "{}" { + t.Options = "" + } + dbr := db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(t) + if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 + return errUniqueConflict + } + branches := t.getProcessor().GenBranches() + if len(branches) > 0 { + checkLocalhost(branches) + db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&branches) + } + return nil + }) +} diff --git a/dtmsvr/trans.go b/dtmsvr/trans_status.go similarity index 50% rename from dtmsvr/trans.go rename to dtmsvr/trans_status.go index b0e8193..625ebff 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans_status.go @@ -1,12 +1,10 @@ package dtmsvr import ( - "errors" "fmt" "strings" "time" - "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" @@ -14,43 +12,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "gorm.io/gorm" - "gorm.io/gorm/clause" ) -var errUniqueConflict = errors.New("unique key conflict error") - -// TransGlobal global transaction -type TransGlobal struct { - common.ModelBase - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []map[string]string `json:"steps" gorm:"-"` - Payloads []string `json:"payloads" gorm:"-"` - BinPayloads [][]byte `json:"-" gorm:"-"` - Status string `json:"status"` - QueryPrepared string `json:"query_prepared"` - Protocol string `json:"protocol"` - CommitTime *time.Time - FinishTime *time.Time - RollbackTime *time.Time - Options string - CustomData string `json:"custom_data"` - NextCronInterval int64 - NextCronTime *time.Time - dtmimp.TransOptions - lastTouched time.Time // record the start time of process -} - -// TableName TableName -func (*TransGlobal) TableName() string { - return "dtm.trans_global" -} - -type transProcessor interface { - GenBranches() []TransBranch - ProcessOnce(db *common.DB, branches []TransBranch) error -} - func (t *TransGlobal) touch(db *common.DB, ctype cronType) *gorm.DB { t.lastTouched = time.Now() updates := t.setNextCron(ctype) @@ -104,113 +67,6 @@ func (t *TransGlobal) needProcess() bool { return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout() } -// TransBranch branch transaction -type TransBranch struct { - common.ModelBase - Gid string - URL string `json:"url"` - BinData []byte - BranchID string `json:"branch_id"` - Op string - Status string - FinishTime *time.Time - RollbackTime *time.Time -} - -// TableName TableName -func (*TransBranch) TableName() string { - return "dtm.trans_branch_op" -} - -func checkAffected(db1 *gorm.DB) { - if db1.RowsAffected == 0 { - panic(fmt.Errorf("rows affected 0, please check for abnormal trans")) - } -} - -type processorCreator func(*TransGlobal) transProcessor - -var processorFac = map[string]processorCreator{} - -func registorProcessorCreator(transType string, creator processorCreator) { - processorFac[transType] = creator -} - -func (t *TransGlobal) getProcessor() transProcessor { - return processorFac[t.TransType](t) -} - -// Process process global transaction once -func (t *TransGlobal) Process(db *common.DB) map[string]interface{} { - r := t.process(db) - transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess) - return r -} - -func (t *TransGlobal) process(db *common.DB) map[string]interface{} { - if t.Options != "" { - dtmimp.MustUnmarshalString(t.Options, &t.TransOptions) - } - - if !t.WaitResult { - go t.processInner(db) - return dtmcli.MapSuccess - } - submitting := t.Status == dtmcli.StatusSubmitted - err := t.processInner(db) - if err != nil { - return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()} - } - if submitting && t.Status != dtmcli.StatusSucceed { - return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"} - } - return dtmcli.MapSuccess -} - -func (t *TransGlobal) processInner(db *common.DB) (rerr error) { - defer handlePanic(&rerr) - defer func() { - if rerr != nil { - dtmimp.LogRedf("processInner got error: %s", rerr.Error()) - } - if TransProcessedTestChan != nil { - dtmimp.Logf("processed: %s", t.Gid) - TransProcessedTestChan <- t.Gid - dtmimp.Logf("notified: %s", t.Gid) - } - }() - dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status) - branches := []TransBranch{} - db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) - t.lastTouched = time.Now() - rerr = t.getProcessor().ProcessOnce(db, branches) - return -} - -type cronType int - -const ( - cronBackoff cronType = iota - cronReset - cronKeep -) - -func (t *TransGlobal) setNextCron(ctype cronType) []string { - if ctype == cronBackoff { - t.NextCronInterval = t.NextCronInterval * 2 - } else if ctype == cronKeep { - // do nothing - } else if t.RetryInterval != 0 { - t.NextCronInterval = t.RetryInterval - } else { - t.NextCronInterval = config.RetryInterval - } - - next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second) - t.NextCronTime = &next - return []string{"next_cron_interval", "next_cron_time"} -} - func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) { if t.Protocol == "grpc" { dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url)) @@ -280,62 +136,18 @@ func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error { return err } -func (t *TransGlobal) saveNew(db *common.DB) error { - return db.Transaction(func(db1 *gorm.DB) error { - db := &common.DB{DB: db1} - t.setNextCron(cronReset) - t.Options = dtmimp.MustMarshalString(t.TransOptions) - if t.Options == "{}" { - t.Options = "" - } - dbr := db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(t) - if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 - return errUniqueConflict - } - branches := t.getProcessor().GenBranches() - if len(branches) > 0 { - checkLocalhost(branches) - db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(&branches) - } - return nil - }) -} - -// TransFromContext TransFromContext -func TransFromContext(c *gin.Context) *TransGlobal { - b, err := c.GetRawData() - e2p(err) - m := TransGlobal{} - dtmimp.MustUnmarshal(b, &m) - dtmimp.Logf("creating trans in prepare") - // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal - for _, p := range m.Payloads { - m.BinPayloads = append(m.BinPayloads, []byte(p)) - } - for _, d := range m.Steps { - if d["data"] != "" { - m.BinPayloads = append(m.BinPayloads, []byte(d["data"])) - } +func (t *TransGlobal) setNextCron(ctype cronType) []string { + if ctype == cronBackoff { + t.NextCronInterval = t.NextCronInterval * 2 + } else if ctype == cronKeep { + // do nothing + } else if t.RetryInterval != 0 { + t.NextCronInterval = t.RetryInterval + } else { + t.NextCronInterval = config.RetryInterval } - m.Protocol = "http" - return &m -} -// TransFromDtmRequest TransFromContext -func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal { - r := TransGlobal{ - Gid: c.Gid, - TransType: c.TransType, - QueryPrepared: c.QueryPrepared, - Protocol: "grpc", - BinPayloads: c.BinPayloads, - } - if c.Steps != "" { - dtmimp.MustUnmarshalString(c.Steps, &r.Steps) - } - return &r + next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second) + t.NextCronTime = &next + return []string{"next_cron_interval", "next_cron_time"} } diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_type_msg.go similarity index 100% rename from dtmsvr/trans_msg.go rename to dtmsvr/trans_type_msg.go diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_type_saga.go similarity index 100% rename from dtmsvr/trans_saga.go rename to dtmsvr/trans_type_saga.go diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_type_tcc.go similarity index 100% rename from dtmsvr/trans_tcc.go rename to dtmsvr/trans_type_tcc.go diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_type_xa.go similarity index 100% rename from dtmsvr/trans_xa.go rename to dtmsvr/trans_type_xa.go