Browse Source

split trans.go to three files

pull/55/head
yedf2 4 years ago
parent
commit
7c527fcc39
  1. 126
      dtmsvr/trans_class.go
  2. 83
      dtmsvr/trans_process.go
  3. 212
      dtmsvr/trans_status.go
  4. 0
      dtmsvr/trans_type_msg.go
  5. 0
      dtmsvr/trans_type_saga.go
  6. 0
      dtmsvr/trans_type_tcc.go
  7. 0
      dtmsvr/trans_type_xa.go

126
dtmsvr/trans_class.go

@ -0,0 +1,126 @@
package dtmsvr
import (
"errors"
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"gorm.io/gorm"
)
var errUniqueConflict = errors.New("unique key conflict error")
// TransGlobal global transaction
type TransGlobal struct {
common.ModelBase
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Steps []map[string]string `json:"steps" gorm:"-"`
Payloads []string `json:"payloads" gorm:"-"`
BinPayloads [][]byte `json:"-" gorm:"-"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
Protocol string `json:"protocol"`
CommitTime *time.Time
FinishTime *time.Time
RollbackTime *time.Time
Options string
CustomData string `json:"custom_data"`
NextCronInterval int64
NextCronTime *time.Time
dtmimp.TransOptions
lastTouched time.Time // record the start time of process
}
// TableName TableName
func (*TransGlobal) TableName() string {
return "dtm.trans_global"
}
// TransBranch branch transaction
type TransBranch struct {
common.ModelBase
Gid string
URL string `json:"url"`
BinData []byte
BranchID string `json:"branch_id"`
Op string
Status string
FinishTime *time.Time
RollbackTime *time.Time
}
// TableName TableName
func (*TransBranch) TableName() string {
return "dtm.trans_branch_op"
}
type transProcessor interface {
GenBranches() []TransBranch
ProcessOnce(db *common.DB, branches []TransBranch) error
}
type processorCreator func(*TransGlobal) transProcessor
var processorFac = map[string]processorCreator{}
func registorProcessorCreator(transType string, creator processorCreator) {
processorFac[transType] = creator
}
func (t *TransGlobal) getProcessor() transProcessor {
return processorFac[t.TransType](t)
}
type cronType int
const (
cronBackoff cronType = iota
cronReset
cronKeep
)
// TransFromContext TransFromContext
func TransFromContext(c *gin.Context) *TransGlobal {
b, err := c.GetRawData()
e2p(err)
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
dtmimp.Logf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))
}
for _, d := range m.Steps {
if d["data"] != "" {
m.BinPayloads = append(m.BinPayloads, []byte(d["data"]))
}
}
m.Protocol = "http"
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal {
r := TransGlobal{
Gid: c.Gid,
TransType: c.TransType,
QueryPrepared: c.QueryPrepared,
Protocol: "grpc",
BinPayloads: c.BinPayloads,
}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
return &r
}
func checkAffected(db1 *gorm.DB) {
if db1.RowsAffected == 0 {
panic(fmt.Errorf("rows affected 0, please check for abnormal trans"))
}
}

83
dtmsvr/trans_process.go

@ -0,0 +1,83 @@
package dtmsvr
import (
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// Process process global transaction once
func (t *TransGlobal) Process(db *common.DB) map[string]interface{} {
r := t.process(db)
transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess)
return r
}
func (t *TransGlobal) process(db *common.DB) map[string]interface{} {
if t.Options != "" {
dtmimp.MustUnmarshalString(t.Options, &t.TransOptions)
}
if !t.WaitResult {
go t.processInner(db)
return dtmcli.MapSuccess
}
submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(db)
if err != nil {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
if submitting && t.Status != dtmcli.StatusSucceed {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"}
}
return dtmcli.MapSuccess
}
func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil {
dtmimp.LogRedf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {
dtmimp.Logf("processed: %s", t.Gid)
TransProcessedTestChan <- t.Gid
dtmimp.Logf("notified: %s", t.Gid)
}
}()
dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status)
branches := []TransBranch{}
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(db, branches)
return
}
func (t *TransGlobal) saveNew(db *common.DB) error {
return db.Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
t.setNextCron(cronReset)
t.Options = dtmimp.MustMarshalString(t.TransOptions)
if t.Options == "{}" {
t.Options = ""
}
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(t)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
return errUniqueConflict
}
branches := t.getProcessor().GenBranches()
if len(branches) > 0 {
checkLocalhost(branches)
db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&branches)
}
return nil
})
}

212
dtmsvr/trans.go → dtmsvr/trans_status.go

@ -1,12 +1,10 @@
package dtmsvr
import (
"errors"
"fmt"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
@ -14,43 +12,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var errUniqueConflict = errors.New("unique key conflict error")
// TransGlobal global transaction
type TransGlobal struct {
common.ModelBase
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Steps []map[string]string `json:"steps" gorm:"-"`
Payloads []string `json:"payloads" gorm:"-"`
BinPayloads [][]byte `json:"-" gorm:"-"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
Protocol string `json:"protocol"`
CommitTime *time.Time
FinishTime *time.Time
RollbackTime *time.Time
Options string
CustomData string `json:"custom_data"`
NextCronInterval int64
NextCronTime *time.Time
dtmimp.TransOptions
lastTouched time.Time // record the start time of process
}
// TableName TableName
func (*TransGlobal) TableName() string {
return "dtm.trans_global"
}
type transProcessor interface {
GenBranches() []TransBranch
ProcessOnce(db *common.DB, branches []TransBranch) error
}
func (t *TransGlobal) touch(db *common.DB, ctype cronType) *gorm.DB {
t.lastTouched = time.Now()
updates := t.setNextCron(ctype)
@ -104,113 +67,6 @@ func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
// TransBranch branch transaction
type TransBranch struct {
common.ModelBase
Gid string
URL string `json:"url"`
BinData []byte
BranchID string `json:"branch_id"`
Op string
Status string
FinishTime *time.Time
RollbackTime *time.Time
}
// TableName TableName
func (*TransBranch) TableName() string {
return "dtm.trans_branch_op"
}
func checkAffected(db1 *gorm.DB) {
if db1.RowsAffected == 0 {
panic(fmt.Errorf("rows affected 0, please check for abnormal trans"))
}
}
type processorCreator func(*TransGlobal) transProcessor
var processorFac = map[string]processorCreator{}
func registorProcessorCreator(transType string, creator processorCreator) {
processorFac[transType] = creator
}
func (t *TransGlobal) getProcessor() transProcessor {
return processorFac[t.TransType](t)
}
// Process process global transaction once
func (t *TransGlobal) Process(db *common.DB) map[string]interface{} {
r := t.process(db)
transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess)
return r
}
func (t *TransGlobal) process(db *common.DB) map[string]interface{} {
if t.Options != "" {
dtmimp.MustUnmarshalString(t.Options, &t.TransOptions)
}
if !t.WaitResult {
go t.processInner(db)
return dtmcli.MapSuccess
}
submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(db)
if err != nil {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
if submitting && t.Status != dtmcli.StatusSucceed {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"}
}
return dtmcli.MapSuccess
}
func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil {
dtmimp.LogRedf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {
dtmimp.Logf("processed: %s", t.Gid)
TransProcessedTestChan <- t.Gid
dtmimp.Logf("notified: %s", t.Gid)
}
}()
dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status)
branches := []TransBranch{}
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(db, branches)
return
}
type cronType int
const (
cronBackoff cronType = iota
cronReset
cronKeep
)
func (t *TransGlobal) setNextCron(ctype cronType) []string {
if ctype == cronBackoff {
t.NextCronInterval = t.NextCronInterval * 2
} else if ctype == cronKeep {
// do nothing
} else if t.RetryInterval != 0 {
t.NextCronInterval = t.RetryInterval
} else {
t.NextCronInterval = config.RetryInterval
}
next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second)
t.NextCronTime = &next
return []string{"next_cron_interval", "next_cron_time"}
}
func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) {
if t.Protocol == "grpc" {
dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url))
@ -280,62 +136,18 @@ func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error {
return err
}
func (t *TransGlobal) saveNew(db *common.DB) error {
return db.Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
t.setNextCron(cronReset)
t.Options = dtmimp.MustMarshalString(t.TransOptions)
if t.Options == "{}" {
t.Options = ""
}
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(t)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
return errUniqueConflict
}
branches := t.getProcessor().GenBranches()
if len(branches) > 0 {
checkLocalhost(branches)
db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&branches)
}
return nil
})
}
// TransFromContext TransFromContext
func TransFromContext(c *gin.Context) *TransGlobal {
b, err := c.GetRawData()
e2p(err)
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
dtmimp.Logf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))
}
for _, d := range m.Steps {
if d["data"] != "" {
m.BinPayloads = append(m.BinPayloads, []byte(d["data"]))
}
func (t *TransGlobal) setNextCron(ctype cronType) []string {
if ctype == cronBackoff {
t.NextCronInterval = t.NextCronInterval * 2
} else if ctype == cronKeep {
// do nothing
} else if t.RetryInterval != 0 {
t.NextCronInterval = t.RetryInterval
} else {
t.NextCronInterval = config.RetryInterval
}
m.Protocol = "http"
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(c *dtmgimp.DtmRequest) *TransGlobal {
r := TransGlobal{
Gid: c.Gid,
TransType: c.TransType,
QueryPrepared: c.QueryPrepared,
Protocol: "grpc",
BinPayloads: c.BinPayloads,
}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
return &r
next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second)
t.NextCronTime = &next
return []string{"next_cron_interval", "next_cron_time"}
}

0
dtmsvr/trans_msg.go → dtmsvr/trans_type_msg.go

0
dtmsvr/trans_saga.go → dtmsvr/trans_type_saga.go

0
dtmsvr/trans_tcc.go → dtmsvr/trans_type_tcc.go

0
dtmsvr/trans_xa.go → dtmsvr/trans_type_xa.go

Loading…
Cancel
Save