From 3d79a396faacd395e4ad3c1a1a77e1cb1cd90a65 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 12 Jul 2022 15:47:29 +0800 Subject: [PATCH] balance checked --- client/dtmcli/barrier.go | 7 ++++--- client/workflow/imp.go | 2 +- test/workflow_grpc_test.go | 4 +++- test/workflow_http_test.go | 2 ++ test/workflow_ongoing_test.go | 4 ++++ test/workflow_xa_test.go | 4 ++++ 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/client/dtmcli/barrier.go b/client/dtmcli/barrier.go index 832b4e1..7c1224d 100644 --- a/client/dtmcli/barrier.go +++ b/client/dtmcli/barrier.go @@ -68,8 +68,9 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) return tx.Rollback() }) originOp := map[string]string{ - dtmimp.OpCancel: dtmimp.OpTry, - dtmimp.OpCompensate: dtmimp.OpAction, + dtmimp.OpCancel: dtmimp.OpTry, // tcc + dtmimp.OpCompensate: dtmimp.OpAction, // saga + dtmimp.OpRollback: dtmimp.OpAction, // workflow }[bb.Op] originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName) @@ -84,7 +85,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) rerr = oerr } - if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate) && originAffected > 0 || // null compensate + if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate || bb.Op == dtmimp.OpRollback) && originAffected > 0 || // null compensate currentAffected == 0 { // repeated request or dangled request return } diff --git a/client/workflow/imp.go b/client/workflow/imp.go index 090a24d..a987807 100644 --- a/client/workflow/imp.go +++ b/client/workflow/imp.go @@ -86,7 +86,7 @@ func (wf *Workflow) initRestyClient() { "gid": wf.Gid, "trans_type": wf.TransType, "branch_id": wf.currentBranch, - "op": dtmimp.OpAction, + "op": wf.currentOp, }) err := dtmimp.BeforeRequest(c, r) return err diff --git a/test/workflow_grpc_test.go b/test/workflow_grpc_test.go index 4238c85..175535c 100644 --- a/test/workflow_grpc_test.go +++ b/test/workflow_grpc_test.go @@ -37,7 +37,7 @@ func TestWorkflowGrpcSimple(t *testing.T) { assert.Equal(t, StatusFailed, getTransStatus(gid)) } -func TestWorkflowGrpcNormal(t *testing.T) { +func TestWorkflowGrpcRollback(t *testing.T) { workflow.SetProtocolForTest(dtmimp.ProtocolGRPC) req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"} gid := dtmimp.GetFuncName() @@ -59,9 +59,11 @@ func TestWorkflowGrpcNormal(t *testing.T) { _, err = busi.BusiCli.TransInBSaga(wf.Context, &req) return err }) + before := getBeforeBalances("mysql") err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Error(t, err, dtmcli.ErrFailure) assert.Equal(t, StatusFailed, getTransStatus(gid)) + assertSameBalance(t, before, "mysql") } func TestWorkflowMixed(t *testing.T) { diff --git a/test/workflow_http_test.go b/test/workflow_http_test.go index 4f68a7f..d79086e 100644 --- a/test/workflow_http_test.go +++ b/test/workflow_http_test.go @@ -80,10 +80,12 @@ func TestWorkflowRollback(t *testing.T) { } return nil }) + before := getBeforeBalances("mysql") err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err, dtmcli.ErrFailure) assert.Equal(t, StatusFailed, getTransStatus(gid)) + assertSameBalance(t, before, "mysql") } func TestWorkflowError(t *testing.T) { diff --git a/test/workflow_ongoing_test.go b/test/workflow_ongoing_test.go index 971c2fe..4aa31c6 100644 --- a/test/workflow_ongoing_test.go +++ b/test/workflow_ongoing_test.go @@ -92,6 +92,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) { }, func(wf *workflow.Workflow) { wf.Options.CompensateErrorBranch = true }) + before := getBeforeBalances("mysql") req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"} err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Error(t, err, dtmcli.ErrOngoing) @@ -106,6 +107,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) { assert.Equal(t, StatusPrepared, getTransStatus(gid)) cronTransOnceForwardNow(t, gid, 1000) assert.Equal(t, StatusFailed, getTransStatus(gid)) + assertSameBalance(t, before, "mysql") } func TestWorkflowXaResume(t *testing.T) { @@ -137,6 +139,7 @@ func TestWorkflowXaResume(t *testing.T) { return err }) + before := getBeforeBalances("mysql") err := workflow.Execute(gid, gid, nil) assert.Equal(t, dtmcli.ErrOngoing, err) @@ -146,4 +149,5 @@ func TestWorkflowXaResume(t *testing.T) { assert.Equal(t, StatusPrepared, getTransStatus(gid)) cronTransOnceForwardNow(t, gid, 1000) assert.Equal(t, StatusSucceed, getTransStatus(gid)) + assertNotSameBalance(t, before, "mysql") } diff --git a/test/workflow_xa_test.go b/test/workflow_xa_test.go index 1cb7ce7..84030e6 100644 --- a/test/workflow_xa_test.go +++ b/test/workflow_xa_test.go @@ -33,9 +33,11 @@ func TestWorkflowXaAction(t *testing.T) { }) return err }) + before := getBeforeBalances("mysql") err := workflow.Execute(gid, gid, nil) assert.Nil(t, err) assert.Equal(t, StatusSucceed, getTransStatus(gid)) + assertNotSameBalance(t, before, "mysql") } func TestWorkflowXaRollback(t *testing.T) { @@ -55,7 +57,9 @@ func TestWorkflowXaRollback(t *testing.T) { }) return err }) + before := getBeforeBalances("mysql") err := workflow.Execute(gid, gid, nil) assert.Equal(t, dtmcli.ErrFailure, err) assert.Equal(t, StatusFailed, getTransStatus(gid)) + assertSameBalance(t, before, "mysql") }