Browse Source

msg concurrent refacted

pull/237/head
yedf2 4 years ago
parent
commit
7e6e2ffe6f
  1. 78
      dtmsvr/trans_type_msg.go

78
dtmsvr/trans_type_msg.go

@ -74,68 +74,34 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
t.touchCronTime(cronKeep, cmc.Delay)
return nil
}
execBranch := func(current int) (bool, error) {
branch := &branches[current]
if branch.Op != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared {
return true, nil
}
err := t.execBranch(branch, current)
if err != nil {
if !errors.Is(err, dtmcli.ErrOngoing) {
logger.Errorf("exec branch error: %v", err)
}
return false, err
}
if branch.Status != dtmcli.StatusSucceed {
return false, nil
}
return true, nil
}
type branchResult struct {
success bool
err error
}
waitChan := make(chan branchResult, len(branches))
consumeWork := func(i int) error {
success, err := execBranch(i)
waitChan <- branchResult{
success: success,
err: err,
var started int
resultsChan := make(chan error, len(branches))
var err error
for i, _ := range branches {
b := &branches[i]
if b.Op != dtmcli.BranchAction || b.Status != dtmcli.StatusPrepared {
continue
}
return err
}
produceWork := func() {
for i := 0; i < len(branches); i++ {
if t.Concurrent {
go func(i int) {
_ = consumeWork(i)
}(i)
continue
}
err := consumeWork(i)
if t.Concurrent {
started++
go func(pos int) {
resultsChan <- t.execBranch(b, pos)
}(i)
} else {
err = t.execBranch(b, i)
if err != nil {
return
break
}
}
}
go produceWork()
successCnt := 0
var err error
for i := 0; i < len(branches); i++ {
result := <-waitChan
if result.err != nil {
err = result.err
if !t.Concurrent {
return err
}
}
if result.success {
successCnt++
}
for i := 0; i < started && err == nil; i++ {
err = <-resultsChan
}
if successCnt == len(branches) { // msg 事务完成
t.changeStatus(dtmcli.StatusSucceed)
if err == dtmcli.ErrOngoing {
return nil
} else if err != nil {
return err
}
panic("msg go pass all branch")
t.changeStatus(dtmcli.StatusSucceed)
return nil
}

Loading…
Cancel
Save