Browse Source

feat:msg support concurrent

pull/234/head
wuweishuo 4 years ago
parent
commit
e73e0d1aeb
  1. 1
      dtmcli/dtmimp/trans_base.go
  2. 13
      dtmcli/saga.go
  3. 2
      dtmgrpc/saga.go
  4. 59
      dtmsvr/trans_type_msg.go
  5. 10
      test/msg_options_test.go
  6. 2
      test/saga_concurrent_test.go

1
dtmcli/dtmimp/trans_base.go

@ -48,6 +48,7 @@ type TransOptions struct {
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"`
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"`
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
} }
// TransBase base for all trans // TransBase base for all trans

13
dtmcli/saga.go

@ -13,8 +13,7 @@ import (
// Saga struct of saga // Saga struct of saga
type Saga struct { type Saga struct {
dtmimp.TransBase dtmimp.TransBase
orders map[int][]int orders map[int][]int
concurrent bool
} }
// NewSaga create a saga // NewSaga create a saga
@ -35,9 +34,9 @@ func (s *Saga) AddBranchOrder(branch int, preBranches []int) *Saga {
return s return s
} }
// EnableConcurrent enable the concurrent exec of sub trans // SetConcurrent enable the concurrent exec of sub trans
func (s *Saga) EnableConcurrent() *Saga { func (s *Saga) SetConcurrent() *Saga {
s.concurrent = true s.Concurrent = true
return s return s
} }
@ -49,7 +48,7 @@ func (s *Saga) Submit() error {
// BuildCustomOptions add custom options to the request context // BuildCustomOptions add custom options to the request context
func (s *Saga) BuildCustomOptions() { func (s *Saga) BuildCustomOptions() {
if s.concurrent { if s.Concurrent {
s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.concurrent}) s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.Concurrent})
} }
} }

2
dtmgrpc/saga.go

@ -37,7 +37,7 @@ func (s *SagaGrpc) AddBranchOrder(branch int, preBranches []int) *SagaGrpc {
// EnableConcurrent enable the concurrent exec of sub trans // EnableConcurrent enable the concurrent exec of sub trans
func (s *SagaGrpc) EnableConcurrent() *SagaGrpc { func (s *SagaGrpc) EnableConcurrent() *SagaGrpc {
s.Saga.EnableConcurrent() s.Saga.SetConcurrent()
return s return s
} }

59
dtmsvr/trans_type_msg.go

@ -65,7 +65,6 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
if !t.needProcess() || t.Status == dtmcli.StatusPrepared { if !t.needProcess() || t.Status == dtmcli.StatusPrepared {
return nil return nil
} }
cmc := cMsgCustom{Delay: 0} cmc := cMsgCustom{Delay: 0}
if t.CustomData != "" { if t.CustomData != "" {
dtmimp.MustUnmarshalString(t.CustomData, &cmc) dtmimp.MustUnmarshalString(t.CustomData, &cmc)
@ -75,22 +74,66 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
t.touchCronTime(cronKeep, cmc.Delay) t.touchCronTime(cronKeep, cmc.Delay)
return nil return nil
} }
execBranch := func(current int) (bool, error) {
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current] branch := &branches[current]
if branch.Op != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared { if branch.Op != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared {
continue return true, nil
} }
err := t.execBranch(branch, current) err := t.execBranch(branch, current)
if err != nil { if err != nil {
return err if !errors.Is(err, dtmcli.ErrOngoing) {
logger.Errorf("exec branch error: %v", err)
}
return false, err
} }
if branch.Status != dtmcli.StatusSucceed { if branch.Status != dtmcli.StatusSucceed {
break return false, nil
}
return true, nil
}
type branchResult struct {
success bool
err error
}
waitChan := make(chan branchResult, len(branches))
consumeWork := func(i int) error {
success, err := execBranch(i)
waitChan <- branchResult{
success: success,
err: err,
}
return err
}
produceWork := func() {
for i := 0; i < len(branches); i++ {
if t.Concurrent {
go func(i int) {
_ = consumeWork(i)
}(i)
continue
}
err := consumeWork(i)
if err != nil {
return
}
}
}
go produceWork()
successCnt := 0
var err error
for i := 0; i < len(branches); i++ {
result := <-waitChan
if result.err != nil {
err = result.err
if !t.Concurrent {
return err
}
}
if result.success {
successCnt++
} }
} }
if current == len(branches) { // msg 事务完成 if successCnt == len(branches) { // msg 事务完成
t.changeStatus(dtmcli.StatusSucceed) t.changeStatus(dtmcli.StatusSucceed)
return nil return nil
} }

10
test/msg_options_test.go

@ -51,3 +51,13 @@ func TestMsgOptionsTimeoutFailed(t *testing.T) {
cronTransOnceForwardNow(t, gid, 180) cronTransOnceForwardNow(t, gid, 180)
assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid))
} }
func TestMsgConcurrent(t *testing.T) {
msg := genMsg(dtmimp.GetFuncName())
msg.Concurrent = true
msg.Submit()
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}

2
test/saga_concurrent_test.go

@ -16,7 +16,7 @@ import (
) )
func genSagaCon(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { func genSagaCon(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
return genSaga(gid, outFailed, inFailed).EnableConcurrent() return genSaga(gid, outFailed, inFailed).SetConcurrent()
} }
func TestSagaConNormal(t *testing.T) { func TestSagaConNormal(t *testing.T) {

Loading…
Cancel
Save