From 7e6e2ffe6fa9ec83669c983a2e16a8a1c1c98b31 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Mon, 28 Feb 2022 17:54:11 +0800 Subject: [PATCH] msg concurrent refacted --- dtmsvr/trans_type_msg.go | 78 ++++++++++++---------------------------- 1 file changed, 22 insertions(+), 56 deletions(-) diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go index 8343753..e89d1a7 100644 --- a/dtmsvr/trans_type_msg.go +++ b/dtmsvr/trans_type_msg.go @@ -74,68 +74,34 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error { t.touchCronTime(cronKeep, cmc.Delay) return nil } - execBranch := func(current int) (bool, error) { - branch := &branches[current] - if branch.Op != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared { - return true, nil - } - err := t.execBranch(branch, current) - if err != nil { - if !errors.Is(err, dtmcli.ErrOngoing) { - logger.Errorf("exec branch error: %v", err) - } - return false, err - } - if branch.Status != dtmcli.StatusSucceed { - return false, nil - } - return true, nil - } - type branchResult struct { - success bool - err error - } - waitChan := make(chan branchResult, len(branches)) - consumeWork := func(i int) error { - success, err := execBranch(i) - waitChan <- branchResult{ - success: success, - err: err, + var started int + resultsChan := make(chan error, len(branches)) + var err error + for i, _ := range branches { + b := &branches[i] + if b.Op != dtmcli.BranchAction || b.Status != dtmcli.StatusPrepared { + continue } - return err - } - produceWork := func() { - for i := 0; i < len(branches); i++ { - if t.Concurrent { - go func(i int) { - _ = consumeWork(i) - }(i) - continue - } - err := consumeWork(i) + if t.Concurrent { + started++ + go func(pos int) { + resultsChan <- t.execBranch(b, pos) + }(i) + } else { + err = t.execBranch(b, i) if err != nil { - return + break } } } - go produceWork() - successCnt := 0 - var err error - for i := 0; i < len(branches); i++ { - result := <-waitChan - if result.err != nil { - err = result.err - if !t.Concurrent { - return err - } - } - if result.success { - successCnt++ - } + for i := 0; i < started && err == nil; i++ { + err = <-resultsChan } - if successCnt == len(branches) { // msg 事务完成 - t.changeStatus(dtmcli.StatusSucceed) + if err == dtmcli.ErrOngoing { return nil + } else if err != nil { + return err } - panic("msg go pass all branch") + t.changeStatus(dtmcli.StatusSucceed) + return nil }