Browse Source

change implementation of CompensateErrorBranch

pull/331/head
yedf2 4 years ago
parent
commit
0975ae5f32
  1. 10
      dtmgrpc/workflow/imp.go
  2. 7
      dtmgrpc/workflow/workflow.go

10
dtmgrpc/workflow/imp.go

@ -18,7 +18,6 @@ type workflowImp struct {
currentActionAdded bool //nolint currentActionAdded bool //nolint
currentCommitAdded bool //nolint currentCommitAdded bool //nolint
currentRollbackAdded bool //nolint currentRollbackAdded bool //nolint
currentRollbackItem *workflowPhase2Item // nolint
progresses map[string]*stepResult //nolint progresses map[string]*stepResult //nolint
currentOp string currentOp string
succeededOps []workflowPhase2Item succeededOps []workflowPhase2Item
@ -161,10 +160,13 @@ func (wf *Workflow) callPhase2(branchID string, fn WfPhase2Func) error {
func (wf *Workflow) recordedDo(fn func(bb *dtmcli.BranchBarrier) *stepResult) *stepResult { func (wf *Workflow) recordedDo(fn func(bb *dtmcli.BranchBarrier) *stepResult) *stepResult {
sr := wf.recordedDoInner(fn) sr := wf.recordedDoInner(fn)
if wf.currentRollbackItem != nil && (sr.Status == dtmcli.StatusSucceed || sr.Status == dtmcli.StatusFailed && wf.Options.CompensateErrorBranch) { // donot compensate the failed branch if !CompensateErrorBranch
wf.failedOps = append(wf.failedOps, *wf.currentRollbackItem) if !wf.Options.CompensateErrorBranch && sr.Status == dtmcli.StatusFailed {
lastFailed := len(wf.failedOps) - 1
if lastFailed >= 0 && wf.failedOps[lastFailed].branchID == wf.currentBranch {
wf.failedOps = wf.failedOps[:lastFailed]
}
} }
wf.currentRollbackItem = nil
return sr return sr
} }

7
dtmgrpc/workflow/workflow.go

@ -122,12 +122,11 @@ func (wf *Workflow) OnRollback(compensate WfPhase2Func) *Workflow {
branchID := wf.currentBranch branchID := wf.currentBranch
dtmimp.PanicIf(wf.currentRollbackAdded, fmt.Errorf("one branch can only add one rollback callback")) dtmimp.PanicIf(wf.currentRollbackAdded, fmt.Errorf("one branch can only add one rollback callback"))
wf.currentRollbackAdded = true wf.currentRollbackAdded = true
item := workflowPhase2Item{ wf.failedOps = append(wf.failedOps, workflowPhase2Item{
branchID: branchID, branchID: branchID,
op: dtmimp.OpRollback, op: dtmimp.OpCommit,
fn: compensate, fn: compensate,
} })
wf.currentRollbackItem = &item
return wf return wf
} }

Loading…
Cancel
Save