Browse Source

changeBranchStatus will update timeoutToFail saga sync

pull/55/head
yedf2 4 years ago
parent
commit
a9a14750f5
  1. 30
      dtmsvr/trans.go

30
dtmsvr/trans.go

@ -75,6 +75,20 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB {
return dbr
}
func (t *TransGlobal) changeBranchStatus(db *common.DB, b *TransBranch, status string) *gorm.DB {
if common.DtmConfig.UpdateBranchSync > 0 || t.TransType == "saga" && t.TimeoutToFail > 0 {
dbr := db.Must().Model(b).Updates(map[string]interface{}{
"status": status,
"finish_time": time.Now(),
})
checkAffected(dbr)
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: b.ID, status: status}
}
b.Status = status
return db.DB
}
func (t *TransGlobal) isTimeout() bool {
timeout := t.TimeoutToFail
if t.TimeoutToFail == 0 && t.TransType != "saga" {
@ -108,20 +122,6 @@ func (*TransBranch) TableName() string {
return "dtm.trans_branch_op"
}
func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB {
if common.DtmConfig.UpdateBranchSync > 0 {
dbr := db.Must().Model(t).Updates(map[string]interface{}{
"status": status,
"finish_time": time.Now(),
})
checkAffected(dbr)
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: t.ID, status: status}
}
t.Status = status
return db.DB
}
func checkAffected(db1 *gorm.DB) {
if db1.RowsAffected == 0 {
panic(fmt.Errorf("rows affected 0, please check for abnormal trans"))
@ -265,7 +265,7 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) error {
status, err := t.getBranchResult(branch)
if status != "" {
branch.changeStatus(db, status)
t.changeBranchStatus(db, branch, status)
}
branchMetrics(t, branch, status == dtmcli.StatusSucceed)
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval

Loading…
Cancel
Save