Browse Source

saga http failed rollback reason

pull/311/head
xyctruth 4 years ago
parent
commit
707b07cf12
  1. 18
      dtmsvr/storage/trans.go
  2. 2
      dtmsvr/trans_status.go
  3. 18
      dtmsvr/trans_type_saga.go
  4. 1
      test/busi/base_types.go
  5. 6
      test/busi/busi.go
  6. 5
      test/saga_test.go

18
dtmsvr/storage/trans.go

@ -55,15 +55,15 @@ func (g *TransGlobalStore) String() string {
// TransBranchStore branch transaction
type TransBranchStore struct {
dtmutil.ModelBase
Gid string `json:"gid,omitempty"`
URL string `json:"url,omitempty"`
BinData []byte
BranchID string `json:"branch_id,omitempty"`
Op string `json:"op,omitempty"`
Status string `json:"status,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
RollbackReason string `json:"-" gorm:"-"`
Gid string `json:"gid,omitempty"`
URL string `json:"url,omitempty"`
BinData []byte
BranchID string `json:"branch_id,omitempty"`
Op string `json:"op,omitempty"`
Status string `json:"status,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
Error error `json:"-" gorm:"-"`
}
// TableName TableName

2
dtmsvr/trans_status.go

@ -190,7 +190,7 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
if err == nil {
return dtmcli.StatusSucceed, nil
} else if t.TransType == "saga" && branch.Op == dtmimp.OpAction && errors.Is(err, dtmcli.ErrFailure) {
branch.RollbackReason = fmt.Sprintf("url:%s return failed: %s", branch.URL, err.Error())
branch.Error = fmt.Errorf("url:%s return failed: %w", branch.URL, err)
return dtmcli.StatusFailed, nil
} else if errors.Is(err, dtmcli.ErrOngoing) {
return "", dtmcli.ErrOngoing

18
dtmsvr/trans_type_saga.go

@ -45,11 +45,11 @@ type cSagaCustom struct {
}
type branchResult struct {
index int
status string
started bool
op string
rollbackReason string
index int
status string
started bool
op string
err error
}
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
@ -74,7 +74,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
// resultStats
var rsAToStart, rsAStarted, rsADone, rsAFailed, rsASucceed, rsCToStart, rsCDone, rsCSucceed int
var rollbackReason string
var failureError error
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
@ -127,7 +127,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
if x := recover(); x != nil {
err = dtmimp.AsError(x)
}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, rollbackReason: branches[i].RollbackReason}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, err: branches[i].Error}
if err != nil && !errors.Is(err, dtmcli.ErrOngoing) {
logger.Errorf("exec branch %s %s %s error: %v", branches[i].BranchID, branches[i].Op, branches[i].URL, err)
}
@ -174,7 +174,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
rsADone++
if r.status == dtmcli.StatusFailed {
rsAFailed++
rollbackReason = r.rollbackReason
failureError = r.err
} else if r.status == dtmcli.StatusSucceed {
rsASucceed++
}
@ -223,7 +223,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
return nil
}
if t.Status == dtmcli.StatusSubmitted && rsAFailed > 0 {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(rollbackReason))
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(failureError.Error()))
}
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))

1
test/busi/base_types.go

@ -142,6 +142,7 @@ type mainSwitchType struct {
QueryPreparedResult AutoEmptyString
NextResult AutoEmptyString
JrpcResult AutoEmptyString
FailureReason AutoEmptyString
}
// MainSwitch controls busi success or fail

6
test/busi/busi.go

@ -48,6 +48,12 @@ func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi
if res == "ERROR" {
return errors.New("ERROR from user")
}
if res == dtmimp.ResultFailure {
failureReason := MainSwitch.FailureReason.Fetch()
if failureReason != "" {
return fmt.Errorf("%s. %w", failureReason, dtmimp.ErrFailure)
}
}
return dtmcli.String2DtmError(res)
}

5
test/saga_test.go

@ -27,13 +27,13 @@ func TestSagaNormal(t *testing.T) {
func TestSagaRollback(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, true)
saga.Concurrent = false
busi.MainSwitch.FailureReason.SetOnce("Insufficient balance")
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, "url:http://localhost:8081/api/busi/TransIn return failed: {\"error\":\"FAILURE\"}. FAILURE", getTrans(saga.Gid).RollbackReason)
assert.Equal(t, "url:http://localhost:8081/api/busi/TransIn return failed: {\"error\":\"Insufficient balance. FAILURE\"}. FAILURE", getTrans(saga.Gid).RollbackReason)
}
func TestSagaOngoingSucceed(t *testing.T) {
@ -61,7 +61,6 @@ func TestSagaFailed(t *testing.T) {
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, "url:http://localhost:8081/api/busi/TransIn return failed: {\"error\":\"FAILURE\"}. FAILURE", getTrans(saga.Gid).RollbackReason)
}
func TestSagaAbnormal(t *testing.T) {

Loading…
Cancel
Save