|
|
|
@ -8,33 +8,22 @@ import ( |
|
|
|
"github.com/yedf/dtm/common" |
|
|
|
) |
|
|
|
|
|
|
|
type Trans interface { |
|
|
|
GetDataBranches() []TransBranchModel |
|
|
|
ProcessOnce(db *common.MyDb, branches []TransBranchModel) error |
|
|
|
type TransProcessor interface { |
|
|
|
GenBranches() []TransBranch |
|
|
|
ProcessOnce(db *common.MyDb, branches []TransBranch) error |
|
|
|
} |
|
|
|
|
|
|
|
func GetTrans(trans *TransGlobalModel) Trans { |
|
|
|
if trans.TransType == "saga" { |
|
|
|
return &TransSaga{TransGlobalModel: trans} |
|
|
|
} else if trans.TransType == "tcc" { |
|
|
|
return &TransTcc{TransGlobalModel: trans} |
|
|
|
} else if trans.TransType == "xa" { |
|
|
|
return &TransXa{TransGlobalModel: trans} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
type TransSaga struct { |
|
|
|
*TransGlobalModel |
|
|
|
type TransSagaProcessor struct { |
|
|
|
*TransGlobal |
|
|
|
} |
|
|
|
|
|
|
|
func (t *TransSaga) GetDataBranches() []TransBranchModel { |
|
|
|
nsteps := []TransBranchModel{} |
|
|
|
func (t *TransSagaProcessor) GenBranches() []TransBranch { |
|
|
|
nsteps := []TransBranch{} |
|
|
|
steps := []M{} |
|
|
|
common.MustUnmarshalString(t.Data, &steps) |
|
|
|
for _, step := range steps { |
|
|
|
for _, branchType := range []string{"compensate", "action"} { |
|
|
|
nsteps = append(nsteps, TransBranchModel{ |
|
|
|
nsteps = append(nsteps, TransBranch{ |
|
|
|
Gid: t.Gid, |
|
|
|
Branch: fmt.Sprintf("%d", len(nsteps)+1), |
|
|
|
Data: step["data"].(string), |
|
|
|
@ -47,7 +36,7 @@ func (t *TransSaga) GetDataBranches() []TransBranchModel { |
|
|
|
return nsteps |
|
|
|
} |
|
|
|
|
|
|
|
func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { |
|
|
|
func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { |
|
|
|
current := 0 // 当前正在处理的步骤
|
|
|
|
for ; current < len(branches); current++ { |
|
|
|
step := branches[current] |
|
|
|
@ -99,17 +88,17 @@ func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) er |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
type TransTcc struct { |
|
|
|
*TransGlobalModel |
|
|
|
type TransTccProcessor struct { |
|
|
|
*TransGlobal |
|
|
|
} |
|
|
|
|
|
|
|
func (t *TransTcc) GetDataBranches() []TransBranchModel { |
|
|
|
nsteps := []TransBranchModel{} |
|
|
|
func (t *TransTccProcessor) GenBranches() []TransBranch { |
|
|
|
nsteps := []TransBranch{} |
|
|
|
steps := []M{} |
|
|
|
common.MustUnmarshalString(t.Data, &steps) |
|
|
|
for _, step := range steps { |
|
|
|
for _, branchType := range []string{"rollback", "commit", "prepare"} { |
|
|
|
nsteps = append(nsteps, TransBranchModel{ |
|
|
|
nsteps = append(nsteps, TransBranch{ |
|
|
|
Gid: t.Gid, |
|
|
|
Branch: fmt.Sprintf("%d", len(nsteps)+1), |
|
|
|
Data: step["data"].(string), |
|
|
|
@ -122,7 +111,7 @@ func (t *TransTcc) GetDataBranches() []TransBranchModel { |
|
|
|
return nsteps |
|
|
|
} |
|
|
|
|
|
|
|
func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { |
|
|
|
func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { |
|
|
|
gid := t.Gid |
|
|
|
current := 0 // 当前正在处理的步骤
|
|
|
|
for ; current < len(branches); current++ { |
|
|
|
@ -136,7 +125,7 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err |
|
|
|
return err |
|
|
|
} |
|
|
|
body := resp.String() |
|
|
|
db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次
|
|
|
|
db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次
|
|
|
|
if strings.Contains(body, "SUCCESS") { |
|
|
|
writeTransLog(gid, "step finished", "finished", step.Branch, "") |
|
|
|
dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ |
|
|
|
@ -160,7 +149,7 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err |
|
|
|
//////////////////////////////////////////////////
|
|
|
|
if current == len(branches) { // tcc 事务完成
|
|
|
|
writeTransLog(gid, "saga finished", "finished", "", "") |
|
|
|
dbr := db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ |
|
|
|
dbr := db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "committed").Updates(M{ |
|
|
|
"status": "finished", |
|
|
|
"finish_time": time.Now(), |
|
|
|
}) |
|
|
|
@ -192,7 +181,7 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err |
|
|
|
return fmt.Errorf("saga current not -1") |
|
|
|
} |
|
|
|
writeTransLog(gid, "saga rollbacked", "rollbacked", "", "") |
|
|
|
dbr := db.Must().Model(&TransGlobalModel{}).Where("status=? and gid=?", "committed", gid).Updates(M{ |
|
|
|
dbr := db.Must().Model(&TransGlobal{}).Where("status=? and gid=?", "committed", gid).Updates(M{ |
|
|
|
"status": "rollbacked", |
|
|
|
"rollback_time": time.Now(), |
|
|
|
}) |
|
|
|
@ -200,15 +189,15 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
type TransXa struct { |
|
|
|
*TransGlobalModel |
|
|
|
type TransXaProcessor struct { |
|
|
|
*TransGlobal |
|
|
|
} |
|
|
|
|
|
|
|
func (t *TransXa) GetDataBranches() []TransBranchModel { |
|
|
|
return []TransBranchModel{} |
|
|
|
func (t *TransXaProcessor) GenBranches() []TransBranch { |
|
|
|
return []TransBranch{} |
|
|
|
} |
|
|
|
|
|
|
|
func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { |
|
|
|
func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { |
|
|
|
gid := t.Gid |
|
|
|
if t.Status == "finished" { |
|
|
|
return nil |
|
|
|
@ -218,7 +207,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro |
|
|
|
if branch.Status == "finished" { |
|
|
|
continue |
|
|
|
} |
|
|
|
db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次
|
|
|
|
db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次
|
|
|
|
resp, err := common.RestyClient.R().SetBody(M{ |
|
|
|
"branch": branch.Branch, |
|
|
|
"action": "commit", |
|
|
|
@ -238,7 +227,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro |
|
|
|
}) |
|
|
|
} |
|
|
|
writeTransLog(gid, "xa finished", "finished", "", "") |
|
|
|
db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ |
|
|
|
db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "committed").Updates(M{ |
|
|
|
"status": "finished", |
|
|
|
"finish_time": time.Now(), |
|
|
|
}) |
|
|
|
@ -247,7 +236,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro |
|
|
|
if branch.Status == "rollbacked" { |
|
|
|
continue |
|
|
|
} |
|
|
|
db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次
|
|
|
|
db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次
|
|
|
|
resp, err := common.RestyClient.R().SetBody(M{ |
|
|
|
"branch": branch.Branch, |
|
|
|
"action": "rollback", |
|
|
|
@ -267,7 +256,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro |
|
|
|
}) |
|
|
|
} |
|
|
|
writeTransLog(gid, "xa rollbacked", "rollbacked", "", "") |
|
|
|
db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "prepared").Updates(M{ |
|
|
|
db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "prepared").Updates(M{ |
|
|
|
"status": "rollbacked", |
|
|
|
"finish_time": time.Now(), |
|
|
|
}) |
|
|
|
|