Browse Source

fix

pull/459/head
Makonike 3 years ago
parent
commit
56796afdbb
  1. 2
      dtmsvr/trans_class.go
  2. 12
      dtmsvr/trans_process.go
  3. 17
      dtmsvr/trans_status.go
  4. 18
      dtmsvr/trans_type_msg.go
  5. 13
      dtmsvr/trans_type_saga.go
  6. 5
      dtmsvr/trans_type_tcc.go
  7. 5
      dtmsvr/trans_type_workflow.go
  8. 5
      dtmsvr/trans_type_xa.go

2
dtmsvr/trans_class.go

@ -42,7 +42,7 @@ type TransBranch = storage.TransBranchStore
type transProcessor interface { type transProcessor interface {
GenBranches() []TransBranch GenBranches() []TransBranch
ProcessOnce(branches []TransBranch) error ProcessOnce(ctx context.Context, branches []TransBranch) error
} }
type processorCreator func(*TransGlobal) transProcessor type processorCreator func(*TransGlobal) transProcessor

12
dtmsvr/trans_process.go

@ -33,17 +33,17 @@ func (t *TransGlobal) process(branches []TransBranch) error {
dtmimp.MustUnmarshalString(t.ExtData, &t.Ext) dtmimp.MustUnmarshalString(t.ExtData, &t.Ext)
} }
if !t.WaitResult { if !t.WaitResult {
ctx := CopyContext(t.Context)
go func(ctx context.Context) { go func(ctx context.Context) {
t.Context = CopyContext(ctx) err := t.processInner(ctx, branches)
err := t.processInner(branches)
if err != nil && !errors.Is(err, dtmimp.ErrOngoing) { if err != nil && !errors.Is(err, dtmimp.ErrOngoing) {
logger.Errorf("processInner err: %v", err) logger.Errorf("processInner err: %v", err)
} }
}(t.Context) }(ctx)
return nil return nil
} }
submitting := t.Status == dtmcli.StatusSubmitted submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(branches) err := t.processInner(t.Context, branches)
if err != nil { if err != nil {
return err return err
} }
@ -57,7 +57,7 @@ func (t *TransGlobal) process(branches []TransBranch) error {
return nil return nil
} }
func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) { func (t *TransGlobal) processInner(ctx context.Context, branches []TransBranch) (rerr error) {
defer handlePanic(&rerr) defer handlePanic(&rerr)
defer func() { defer func() {
if rerr != nil && !errors.Is(rerr, dtmcli.ErrOngoing) { if rerr != nil && !errors.Is(rerr, dtmcli.ErrOngoing) {
@ -71,7 +71,7 @@ func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) {
}() }()
logger.Debugf("processing: %s status: %s", t.Gid, t.Status) logger.Debugf("processing: %s status: %s", t.Gid, t.Status)
t.lastTouched = time.Now() t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(branches) rerr = t.getProcessor().ProcessOnce(ctx, branches)
return return
} }

17
dtmsvr/trans_status.go

@ -1,6 +1,7 @@
package dtmsvr package dtmsvr
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"math" "math"
@ -127,7 +128,7 @@ func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout() return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
} }
func (t *TransGlobal) getURLResult(uri string, branchID, op string, branchPayload []byte) error { func (t *TransGlobal) getURLResult(ctx context.Context, uri string, branchID, op string, branchPayload []byte) error {
if uri == "" { // empty url is success if uri == "" { // empty url is success
return nil return nil
} }
@ -137,7 +138,7 @@ func (t *TransGlobal) getURLResult(uri string, branchID, op string, branchPayloa
} }
return t.getHTTPResult(uri, branchID, op, branchPayload) return t.getHTTPResult(uri, branchID, op, branchPayload)
} }
return t.getGrpcResult(uri, branchID, op, branchPayload) return t.getGrpcResult(ctx, uri, branchID, op, branchPayload)
} }
func (t *TransGlobal) getHTTPResult(uri string, branchID, op string, branchPayload []byte) error { func (t *TransGlobal) getHTTPResult(uri string, branchID, op string, branchPayload []byte) error {
@ -192,7 +193,7 @@ func (t *TransGlobal) getJSONRPCResult(uri string, branchID, op string, branchPa
return err return err
} }
func (t *TransGlobal) getGrpcResult(uri string, branchID, op string, branchPayload []byte) error { func (t *TransGlobal) getGrpcResult(ctx context.Context, uri string, branchID, op string, branchPayload []byte) error {
// grpc handler // grpc handler
server, method, err := dtmdriver.GetDriver().ParseServerMethod(uri) server, method, err := dtmdriver.GetDriver().ParseServerMethod(uri)
if err != nil { if err != nil {
@ -200,7 +201,7 @@ func (t *TransGlobal) getGrpcResult(uri string, branchID, op string, branchPaylo
} }
conn := dtmgimp.MustGetGrpcConn(server, true) conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Context, t.Gid, t.TransType, branchID, op, "") ctx = dtmgimp.TransInfo2Ctx(ctx, t.Gid, t.TransType, branchID, op, "")
kvs := dtmgimp.Map2Kvs(t.Ext.Headers) kvs := dtmgimp.Map2Kvs(t.Ext.Headers)
kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...) kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...)
ctx = metadata.AppendToOutgoingContext(ctx, kvs...) ctx = metadata.AppendToOutgoingContext(ctx, kvs...)
@ -212,8 +213,8 @@ func (t *TransGlobal) getGrpcResult(uri string, branchID, op string, branchPaylo
return dtmgrpc.GrpcError2DtmError(err) return dtmgrpc.GrpcError2DtmError(err)
} }
func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) { func (t *TransGlobal) getBranchResult(ctx context.Context, branch *TransBranch) (string, error) {
err := t.getURLResult(branch.URL, branch.BranchID, branch.Op, branch.BinData) err := t.getURLResult(ctx, branch.URL, branch.BranchID, branch.Op, branch.BinData)
if err == nil { if err == nil {
return dtmcli.StatusSucceed, nil return dtmcli.StatusSucceed, nil
} else if t.TransType == "saga" && branch.Op == dtmimp.OpAction && errors.Is(err, dtmcli.ErrFailure) { } else if t.TransType == "saga" && branch.Op == dtmimp.OpAction && errors.Is(err, dtmcli.ErrFailure) {
@ -225,8 +226,8 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
return "", fmt.Errorf("your http/grpc result should be specified as in:\nhttp://d.dtm.pub/practice/arch.html#proto\nunkown result will be retried: %w", err) return "", fmt.Errorf("your http/grpc result should be specified as in:\nhttp://d.dtm.pub/practice/arch.html#proto\nunkown result will be retried: %w", err)
} }
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error { func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branchPos int) error {
status, err := t.getBranchResult(branch) status, err := t.getBranchResult(ctx, branch)
if status != "" { if status != "" {
t.changeBranchStatus(branch, status, branchPos) t.changeBranchStatus(branch, status, branchPos)
} }

18
dtmsvr/trans_type_msg.go

@ -7,6 +7,7 @@
package dtmsvr package dtmsvr
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
@ -51,11 +52,11 @@ type cMsgCustom struct {
Delay uint64 //delay call branch, unit second Delay uint64 //delay call branch, unit second
} }
func (t *TransGlobal) mayQueryPrepared() { func (t *TransGlobal) mayQueryPrepared(ctx context.Context) {
if !t.needProcess() || t.Status == dtmcli.StatusSubmitted { if !t.needProcess() || t.Status == dtmcli.StatusSubmitted {
return return
} }
err := t.getURLResult(t.QueryPrepared, "00", "msg", nil) err := t.getURLResult(ctx, t.QueryPrepared, "00", "msg", nil)
if err == nil { if err == nil {
t.changeStatus(dtmcli.StatusSubmitted) t.changeStatus(dtmcli.StatusSubmitted)
} else if errors.Is(err, dtmcli.ErrFailure) { } else if errors.Is(err, dtmcli.ErrFailure) {
@ -68,8 +69,8 @@ func (t *TransGlobal) mayQueryPrepared() {
} }
} }
func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error { func (t *transMsgProcessor) ProcessOnce(ctx context.Context, branches []TransBranch) error {
t.mayQueryPrepared() t.mayQueryPrepared(ctx)
if !t.needProcess() || t.Status == dtmcli.StatusPrepared { if !t.needProcess() || t.Status == dtmcli.StatusPrepared {
return nil return nil
} }
@ -91,12 +92,13 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
continue continue
} }
if t.Concurrent { if t.Concurrent {
copyCtx := CopyContext(ctx)
started++ started++
go func(pos int) { go func(ctx context.Context, pos int) {
resultsChan <- t.execBranch(b, pos) resultsChan <- t.execBranch(ctx, b, pos)
}(i) }(copyCtx, i)
} else { } else {
err = t.execBranch(b, i) err = t.execBranch(ctx, b, i)
if err != nil { if err != nil {
break break
} }

13
dtmsvr/trans_type_saga.go

@ -1,6 +1,7 @@
package dtmsvr package dtmsvr
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"time" "time"
@ -52,7 +53,7 @@ type branchResult struct {
err error err error
} }
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { func (t *transSagaProcessor) ProcessOnce(ctx context.Context, branches []TransBranch) error {
// when saga tasks is fetched, it always need to process // when saga tasks is fetched, it always need to process
logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout()) logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout())
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
@ -121,7 +122,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
return true return true
} }
resultChan := make(chan branchResult, n) resultChan := make(chan branchResult, n)
asyncExecBranch := func(i int) { asyncExecBranch := func(ctx context.Context, i int) {
var err error var err error
defer func() { defer func() {
if x := recover(); x != nil { if x := recover(); x != nil {
@ -132,7 +133,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
logger.Errorf("exec branch %s %s %s error: %v", branches[i].BranchID, branches[i].Op, branches[i].URL, err) logger.Errorf("exec branch %s %s %s error: %v", branches[i].BranchID, branches[i].Op, branches[i].URL, err)
} }
}() }()
err = t.execBranch(&branches[i], i) err = t.execBranch(ctx, &branches[i], i)
} }
pickToRunActions := func() []int { pickToRunActions := func() []int {
toRun := []int{} toRun := []int{}
@ -162,7 +163,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
if branchResults[b].op == dtmimp.OpAction { if branchResults[b].op == dtmimp.OpAction {
rsAStarted++ rsAStarted++
} }
go asyncExecBranch(b) copyCtx := CopyContext(ctx)
go asyncExecBranch(copyCtx, b)
} }
} }
waitDoneOnce := func() { waitDoneOnce := func() {
@ -178,7 +180,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
t.RetryCount++ t.RetryCount++
logger.Infof("Retrying branch %s %s %s, t.RetryLimit: %d, t.RetryCount: %d", logger.Infof("Retrying branch %s %s %s, t.RetryLimit: %d, t.RetryCount: %d",
branches[r.index].BranchID, branches[r.index].Op, branches[r.index].URL, t.RetryLimit, t.RetryCount) branches[r.index].BranchID, branches[r.index].Op, branches[r.index].URL, t.RetryLimit, t.RetryCount)
go asyncExecBranch(r.index) copyCtx := CopyContext(ctx)
go asyncExecBranch(copyCtx, r.index)
break break
} }
// if t.RetryCount = t.RetryLimit, trans will be aborted // if t.RetryCount = t.RetryLimit, trans will be aborted

5
dtmsvr/trans_type_tcc.go

@ -1,6 +1,7 @@
package dtmsvr package dtmsvr
import ( import (
"context"
"fmt" "fmt"
"github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli"
@ -20,7 +21,7 @@ func (t *transTccProcessor) GenBranches() []TransBranch {
return []TransBranch{} return []TransBranch{}
} }
func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error { func (t *transTccProcessor) ProcessOnce(ctx context.Context, branches []TransBranch) error {
if !t.needProcess() { if !t.needProcess() {
return nil return nil
} }
@ -31,7 +32,7 @@ func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
for current := len(branches) - 1; current >= 0; current-- { for current := len(branches) - 1; current >= 0; current-- {
if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared { if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID) logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID)
err := t.execBranch(&branches[current], current) err := t.execBranch(ctx, &branches[current], current)
if err != nil { if err != nil {
return err return err
} }

5
dtmsvr/trans_type_workflow.go

@ -1,6 +1,7 @@
package dtmsvr package dtmsvr
import ( import (
"context"
"github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
@ -24,7 +25,7 @@ type cWorkflowCustom struct {
Data []byte `json:"data"` Data []byte `json:"data"`
} }
func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error { func (t *transWorkflowProcessor) ProcessOnce(ctx context.Context, branches []TransBranch) error {
if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed { if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed {
return nil return nil
} }
@ -36,5 +37,5 @@ func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error {
wd := wfpb.WorkflowData{Data: cmc.Data} wd := wfpb.WorkflowData{Data: cmc.Data}
data = dtmgimp.MustProtoMarshal(&wd) data = dtmgimp.MustProtoMarshal(&wd)
} }
return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data) return t.getURLResult(ctx, t.QueryPrepared, "00", cmc.Name, data)
} }

5
dtmsvr/trans_type_xa.go

@ -1,6 +1,7 @@
package dtmsvr package dtmsvr
import ( import (
"context"
"fmt" "fmt"
"github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli"
@ -19,7 +20,7 @@ func (t *transXaProcessor) GenBranches() []TransBranch {
return []TransBranch{} return []TransBranch{}
} }
func (t *transXaProcessor) ProcessOnce(branches []TransBranch) error { func (t *transXaProcessor) ProcessOnce(ctx context.Context, branches []TransBranch) error {
if !t.needProcess() { if !t.needProcess() {
return nil return nil
} }
@ -29,7 +30,7 @@ func (t *transXaProcessor) ProcessOnce(branches []TransBranch) error {
currentType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpCommit, dtmimp.OpRollback).(string) currentType := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpCommit, dtmimp.OpRollback).(string)
for i, branch := range branches { for i, branch := range branches {
if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed { if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed {
err := t.execBranch(&branch, i) err := t.execBranch(ctx, &branch, i)
if err != nil { if err != nil {
return err return err
} }

Loading…
Cancel
Save