From 74479f7a20f60bb8e94c74c0cfdfbd99c0c3c6e6 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 29 Oct 2021 22:34:08 +0800 Subject: [PATCH] cronTransOnce refactored --- dtmsvr/cron.go | 11 ++++------- dtmsvr/utils.go | 15 --------------- test/grpc_msg_test.go | 2 +- test/grpc_saga_test.go | 4 ++-- test/grpc_tcc_test.go | 2 +- test/msg_test.go | 4 ++-- test/saga_concurrent_test.go | 4 ++-- test/saga_test.go | 4 ++-- test/tcc_test.go | 2 +- test/types.go | 27 ++++++++++++++++++++++----- test/wait_saga_test.go | 2 +- 11 files changed, 38 insertions(+), 39 deletions(-) diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 78e0546..9cb1e64 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -16,16 +16,13 @@ var NowForwardDuration time.Duration = time.Duration(0) var CronForwardDuration time.Duration = time.Duration(0) // CronTransOnce cron expired trans. use expireIn as expire time -func CronTransOnce() (hasTrans bool) { +func CronTransOnce() (gid string) { defer handlePanic(nil) trans := lockOneTrans(CronForwardDuration) if trans == nil { return } - hasTrans = true - if TransProcessedTestChan != nil { - defer WaitTransProcessed(trans.Gid) - } + gid = trans.Gid trans.WaitResult = true trans.Process(dbGet()) return @@ -34,8 +31,8 @@ func CronTransOnce() (hasTrans bool) { // CronExpiredTrans cron expired trans, num == -1 indicate for ever func CronExpiredTrans(num int) { for i := 0; i < num || num == -1; i++ { - hasTrans := CronTransOnce() - if !hasTrans && num != 1 { + gid := CronTransOnce() + if gid == "" && num != 1 { sleepCronTime() } } diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index f498a5f..3dc08e3 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -47,21 +47,6 @@ func writeTransLog(gid string, action string, status string, branch string, deta // TransProcessedTestChan only for test usage. when transaction processed once, write gid to this chan var TransProcessedTestChan chan string = nil -// WaitTransProcessed only for test usage. wait for transaction processed once -func WaitTransProcessed(gid string) { - dtmcli.Logf("waiting for gid %s", gid) - select { - case id := <-TransProcessedTestChan: - for id != gid { - dtmcli.LogRedf("-------id %s not match gid %s", id, gid) - id = <-TransProcessedTestChan - } - dtmcli.Logf("finish for gid %s", gid) - case <-time.After(time.Duration(time.Second * 3)): - dtmcli.LogFatalf("Wait Trans timeout") - } -} - var gNode *snowflake.Node = nil func init() { diff --git a/test/grpc_msg_test.go b/test/grpc_msg_test.go index c6a127e..b6aa4c2 100644 --- a/test/grpc_msg_test.go +++ b/test/grpc_msg_test.go @@ -33,7 +33,7 @@ func grpcMsgOngoing(t *testing.T) { examples.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing) cronTransOnceForwardNow(180) assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid)) } diff --git a/test/grpc_saga_test.go b/test/grpc_saga_test.go index 393a1bc..56c7a64 100644 --- a/test/grpc_saga_test.go +++ b/test/grpc_saga_test.go @@ -30,7 +30,7 @@ func sagaGrpcCommittedOngoing(t *testing.T) { saga.Submit() WaitTransProcessed(saga.Gid) assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid)) } @@ -41,7 +41,7 @@ func sagaGrpcRollback(t *testing.T) { saga.Submit() WaitTransProcessed(saga.Gid) assert.Equal(t, dtmcli.StatusAborting, getTransStatus(saga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid)) } diff --git a/test/grpc_tcc_test.go b/test/grpc_tcc_test.go index 574d0f0..946a06a 100644 --- a/test/grpc_tcc_test.go +++ b/test/grpc_tcc_test.go @@ -60,6 +60,6 @@ func tccGrpcRollback(t *testing.T) { assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, dtmcli.StatusAborting, getTransStatus(gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid)) } diff --git a/test/msg_test.go b/test/msg_test.go index cac6ecb..0745a9d 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -22,7 +22,7 @@ func msgNormal(t *testing.T) { WaitTransProcessed(msg.Gid) assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(msg.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid)) - CronTransOnce() + cronTransOnce() } func msgOngoing(t *testing.T) { @@ -37,7 +37,7 @@ func msgOngoing(t *testing.T) { examples.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing) cronTransOnceForwardNow(180) assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(msg.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid)) err = msg.Prepare("") diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index e71da70..bc37a0d 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -40,7 +40,7 @@ func csagaRollback(t *testing.T) { assert.Nil(t, err) WaitTransProcessed(csaga.Gid) assert.Equal(t, dtmcli.StatusAborting, getTransStatus(csaga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, dtmcli.StatusFailed, getTransStatus(csaga.Gid)) assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusFailed, dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid)) err = csaga.Submit() @@ -67,7 +67,7 @@ func csagaCommittedOngoing(t *testing.T) { assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid)) assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(csaga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(csaga.Gid)) } diff --git a/test/saga_test.go b/test/saga_test.go index d4b6eb8..414fcf0 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -33,7 +33,7 @@ func sagaCommittedOngoing(t *testing.T) { saga.Submit() WaitTransProcessed(saga.Gid) assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid)) } @@ -45,7 +45,7 @@ func sagaRollback(t *testing.T) { assert.Nil(t, err) WaitTransProcessed(saga.Gid) assert.Equal(t, dtmcli.StatusAborting, getTransStatus(saga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid)) err = saga.Submit() diff --git a/test/tcc_test.go b/test/tcc_test.go index 2b5588d..3269f9b 100644 --- a/test/tcc_test.go +++ b/test/tcc_test.go @@ -38,6 +38,6 @@ func tccRollback(t *testing.T) { assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, dtmcli.StatusAborting, getTransStatus(gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid)) } diff --git a/test/types.go b/test/types.go index c9b84bb..dcfa774 100644 --- a/test/types.go +++ b/test/types.go @@ -14,11 +14,28 @@ func dbGet() *common.DB { return common.DbGet(config.DB) } -// WaitTransProcessed alias -var WaitTransProcessed = dtmsvr.WaitTransProcessed +// WaitTransProcessed only for test usage. wait for transaction processed once +func WaitTransProcessed(gid string) { + dtmcli.Logf("waiting for gid %s", gid) + select { + case id := <-dtmsvr.TransProcessedTestChan: + for id != gid { + dtmcli.LogRedf("-------id %s not match gid %s", id, gid) + id = <-dtmsvr.TransProcessedTestChan + } + dtmcli.Logf("finish for gid %s", gid) + case <-time.After(time.Duration(time.Second * 3)): + dtmcli.LogFatalf("Wait Trans timeout") + } +} + +func cronTransOnce() { + gid := dtmsvr.CronTransOnce() + if dtmsvr.TransProcessedTestChan != nil && gid != "" { + WaitTransProcessed(gid) + } +} -// CronTransOnce alias -var CronTransOnce = dtmsvr.CronTransOnce var e2p = dtmcli.E2P // TransGlobal alias @@ -33,6 +50,6 @@ type M = dtmcli.M func cronTransOnceForwardNow(seconds int) { old := dtmsvr.NowForwardDuration dtmsvr.NowForwardDuration = time.Duration(seconds) * time.Second - CronTransOnce() + cronTransOnce() dtmsvr.NowForwardDuration = old } diff --git a/test/wait_saga_test.go b/test/wait_saga_test.go index 9166c55..6f96464 100644 --- a/test/wait_saga_test.go +++ b/test/wait_saga_test.go @@ -34,7 +34,7 @@ func sagaCommittedOngoingWait(t *testing.T) { assert.Error(t, err) WaitTransProcessed(saga.Gid) assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid)) - CronTransOnce() + cronTransOnce() assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid)) }