Browse Source

Merge pull request #311 from xyctruth/main

[WIP]Support rollback_reason for saga
pull/313/head
yedf2 4 years ago
committed by GitHub
parent
commit
8b99ca7216
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dtmgrpc/type.go
  2. 1
      dtmsvr/storage/trans.go
  3. 1
      dtmsvr/trans_status.go
  4. 14
      dtmsvr/trans_type_saga.go
  5. 1
      test/busi/base_types.go
  6. 5
      test/busi/busi.go
  7. 2
      test/saga_grpc_test.go
  8. 2
      test/saga_options_test.go
  9. 4
      test/saga_test.go

4
dtmgrpc/type.go

@ -8,6 +8,7 @@ package dtmgrpc
import (
context "context"
"fmt"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -38,7 +39,8 @@ func GrpcError2DtmError(err error) error {
if st.Message() == dtmcli.ResultOngoing {
return dtmcli.ErrOngoing
}
return dtmcli.ErrFailure
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure)
} else if ok && st.Code() == codes.FailedPrecondition {
return dtmcli.ErrOngoing
}

1
dtmsvr/storage/trans.go

@ -68,6 +68,7 @@ type TransBranchStore struct {
Status string `json:"status,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
Error error `json:"-" gorm:"-"`
}
// TableName TableName

1
dtmsvr/trans_status.go

@ -190,6 +190,7 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) (string, error) {
if err == nil {
return dtmcli.StatusSucceed, nil
} else if t.TransType == "saga" && branch.Op == dtmimp.OpAction && errors.Is(err, dtmcli.ErrFailure) {
branch.Error = fmt.Errorf("url:%s return failed: %w", branch.URL, err)
return dtmcli.StatusFailed, nil
} else if errors.Is(err, dtmcli.ErrOngoing) {
return "", dtmcli.ErrOngoing

14
dtmsvr/trans_type_saga.go

@ -49,13 +49,14 @@ type branchResult struct {
status string
started bool
op string
err error
}
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
// when saga tasks is fetched, it always need to process
logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout())
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting)
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
}
n := len(branches)
@ -73,6 +74,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
// resultStats
var rsAToStart, rsAStarted, rsADone, rsAFailed, rsASucceed, rsCToStart, rsCDone, rsCSucceed int
var failureError error
branchResults := make([]branchResult, n) // save the branch result
for i := 0; i < n; i++ {
b := branches[i]
@ -125,7 +127,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
if x := recover(); x != nil {
err = dtmimp.AsError(x)
}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, err: branches[i].Error}
if err != nil && !errors.Is(err, dtmcli.ErrOngoing) {
logger.Errorf("exec branch %s %s %s error: %v", branches[i].BranchID, branches[i].Op, branches[i].URL, err)
}
@ -172,6 +174,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
rsADone++
if r.status == dtmcli.StatusFailed {
rsAFailed++
failureError = r.err
} else if r.status == dtmcli.StatusSucceed {
rsASucceed++
}
@ -219,8 +222,11 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
t.changeStatus(dtmcli.StatusSucceed)
return nil
}
if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) {
t.changeStatus(dtmcli.StatusAborting)
if t.Status == dtmcli.StatusSubmitted && rsAFailed > 0 {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(failureError.Error()))
}
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
}
if t.Status == dtmcli.StatusAborting {
prepareToCompensate()

1
test/busi/base_types.go

@ -142,6 +142,7 @@ type mainSwitchType struct {
QueryPreparedResult AutoEmptyString
NextResult AutoEmptyString
JrpcResult AutoEmptyString
FailureReason AutoEmptyString
}
// MainSwitch controls busi success or fail

5
test/busi/busi.go

@ -34,7 +34,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string
if res == dtmcli.ResultSuccess {
return nil
} else if res == dtmcli.ResultFailure {
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
return status.New(codes.Aborted, fmt.Sprintf("reason:%s", MainSwitch.FailureReason.Fetch())).Err()
} else if res == dtmcli.ResultOngoing {
return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err()
}
@ -48,6 +48,9 @@ func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi
if res == "ERROR" {
return errors.New("ERROR from user")
}
if res == dtmimp.ResultFailure {
return fmt.Errorf("reason:%s. %w", MainSwitch.FailureReason.Fetch(), dtmimp.ErrFailure)
}
return dtmcli.String2DtmError(res)
}

2
test/saga_grpc_test.go

@ -28,6 +28,7 @@ func TestSagaGrpcNormal(t *testing.T) {
func TestSagaGrpcRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
saga := genSagaGrpc(gid, false, true)
busi.MainSwitch.FailureReason.SetOnce("Insufficient balance")
busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing)
saga.Submit()
waitTransProcessed(saga.Gid)
@ -35,6 +36,7 @@ func TestSagaGrpcRollback(t *testing.T) {
cronTransOnce(t, gid)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, "url:localhost:58081/busi.Busi/TransIn return failed: reason:Insufficient balance. FAILURE", getTrans(saga.Gid).RollbackReason)
}
func TestSagaGrpcCurrent(t *testing.T) {

2
test/saga_options_test.go

@ -56,6 +56,8 @@ func TestSagaOptionsTimeout(t *testing.T) {
cronTransOnceForwardNow(t, gid, 3600)
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid))
assert.Regexp(t, `^Timeout after \d+ seconds$`, getTrans(gid).RollbackReason)
}
func TestSagaGlobalTransWithRequestTimeout(t *testing.T) {

4
test/saga_test.go

@ -22,15 +22,18 @@ func TestSagaNormal(t *testing.T) {
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, "", getTrans(saga.Gid).RollbackReason)
}
func TestSagaRollback(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, true)
busi.MainSwitch.FailureReason.SetOnce("Insufficient balance")
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, "url:http://localhost:8081/api/busi/TransIn return failed: {\"error\":\"reason:Insufficient balance. FAILURE\"}. FAILURE", getTrans(saga.Gid).RollbackReason)
}
func TestSagaOngoingSucceed(t *testing.T) {
@ -44,6 +47,7 @@ func TestSagaOngoingSucceed(t *testing.T) {
cronTransOnce(t, gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, "", getTrans(saga.Gid).RollbackReason)
}
func TestSagaFailed(t *testing.T) {

Loading…
Cancel
Save