Browse Source

cronTransOnce refactored

pull/45/head
yedf2 4 years ago
parent
commit
74479f7a20
  1. 11
      dtmsvr/cron.go
  2. 15
      dtmsvr/utils.go
  3. 2
      test/grpc_msg_test.go
  4. 4
      test/grpc_saga_test.go
  5. 2
      test/grpc_tcc_test.go
  6. 4
      test/msg_test.go
  7. 4
      test/saga_concurrent_test.go
  8. 4
      test/saga_test.go
  9. 2
      test/tcc_test.go
  10. 27
      test/types.go
  11. 2
      test/wait_saga_test.go

11
dtmsvr/cron.go

@ -16,16 +16,13 @@ var NowForwardDuration time.Duration = time.Duration(0)
var CronForwardDuration time.Duration = time.Duration(0)
// CronTransOnce cron expired trans. use expireIn as expire time
func CronTransOnce() (hasTrans bool) {
func CronTransOnce() (gid string) {
defer handlePanic(nil)
trans := lockOneTrans(CronForwardDuration)
if trans == nil {
return
}
hasTrans = true
if TransProcessedTestChan != nil {
defer WaitTransProcessed(trans.Gid)
}
gid = trans.Gid
trans.WaitResult = true
trans.Process(dbGet())
return
@ -34,8 +31,8 @@ func CronTransOnce() (hasTrans bool) {
// CronExpiredTrans cron expired trans, num == -1 indicate for ever
func CronExpiredTrans(num int) {
for i := 0; i < num || num == -1; i++ {
hasTrans := CronTransOnce()
if !hasTrans && num != 1 {
gid := CronTransOnce()
if gid == "" && num != 1 {
sleepCronTime()
}
}

15
dtmsvr/utils.go

@ -47,21 +47,6 @@ func writeTransLog(gid string, action string, status string, branch string, deta
// TransProcessedTestChan only for test usage. when transaction processed once, write gid to this chan
var TransProcessedTestChan chan string = nil
// WaitTransProcessed only for test usage. wait for transaction processed once
func WaitTransProcessed(gid string) {
dtmcli.Logf("waiting for gid %s", gid)
select {
case id := <-TransProcessedTestChan:
for id != gid {
dtmcli.LogRedf("-------id %s not match gid %s", id, gid)
id = <-TransProcessedTestChan
}
dtmcli.Logf("finish for gid %s", gid)
case <-time.After(time.Duration(time.Second * 3)):
dtmcli.LogFatalf("Wait Trans timeout")
}
}
var gNode *snowflake.Node = nil
func init() {

2
test/grpc_msg_test.go

@ -33,7 +33,7 @@ func grpcMsgOngoing(t *testing.T) {
examples.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
}

4
test/grpc_saga_test.go

@ -30,7 +30,7 @@ func sagaGrpcCommittedOngoing(t *testing.T) {
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
}
@ -41,7 +41,7 @@ func sagaGrpcRollback(t *testing.T) {
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, dtmcli.StatusAborting, getTransStatus(saga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid))
}

2
test/grpc_tcc_test.go

@ -60,6 +60,6 @@ func tccGrpcRollback(t *testing.T) {
assert.Error(t, err)
WaitTransProcessed(gid)
assert.Equal(t, dtmcli.StatusAborting, getTransStatus(gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}

4
test/msg_test.go

@ -22,7 +22,7 @@ func msgNormal(t *testing.T) {
WaitTransProcessed(msg.Gid)
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
CronTransOnce()
cronTransOnce()
}
func msgOngoing(t *testing.T) {
@ -37,7 +37,7 @@ func msgOngoing(t *testing.T) {
examples.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing)
cronTransOnceForwardNow(180)
assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
err = msg.Prepare("")

4
test/saga_concurrent_test.go

@ -40,7 +40,7 @@ func csagaRollback(t *testing.T) {
assert.Nil(t, err)
WaitTransProcessed(csaga.Gid)
assert.Equal(t, dtmcli.StatusAborting, getTransStatus(csaga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(csaga.Gid))
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusFailed, dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid))
err = csaga.Submit()
@ -67,7 +67,7 @@ func csagaCommittedOngoing(t *testing.T) {
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid))
assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(csaga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(csaga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(csaga.Gid))
}

4
test/saga_test.go

@ -33,7 +33,7 @@ func sagaCommittedOngoing(t *testing.T) {
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
}
@ -45,7 +45,7 @@ func sagaRollback(t *testing.T) {
assert.Nil(t, err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, dtmcli.StatusAborting, getTransStatus(saga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid))
err = saga.Submit()

2
test/tcc_test.go

@ -38,6 +38,6 @@ func tccRollback(t *testing.T) {
assert.Error(t, err)
WaitTransProcessed(gid)
assert.Equal(t, dtmcli.StatusAborting, getTransStatus(gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}

27
test/types.go

@ -14,11 +14,28 @@ func dbGet() *common.DB {
return common.DbGet(config.DB)
}
// WaitTransProcessed alias
var WaitTransProcessed = dtmsvr.WaitTransProcessed
// WaitTransProcessed only for test usage. wait for transaction processed once
func WaitTransProcessed(gid string) {
dtmcli.Logf("waiting for gid %s", gid)
select {
case id := <-dtmsvr.TransProcessedTestChan:
for id != gid {
dtmcli.LogRedf("-------id %s not match gid %s", id, gid)
id = <-dtmsvr.TransProcessedTestChan
}
dtmcli.Logf("finish for gid %s", gid)
case <-time.After(time.Duration(time.Second * 3)):
dtmcli.LogFatalf("Wait Trans timeout")
}
}
func cronTransOnce() {
gid := dtmsvr.CronTransOnce()
if dtmsvr.TransProcessedTestChan != nil && gid != "" {
WaitTransProcessed(gid)
}
}
// CronTransOnce alias
var CronTransOnce = dtmsvr.CronTransOnce
var e2p = dtmcli.E2P
// TransGlobal alias
@ -33,6 +50,6 @@ type M = dtmcli.M
func cronTransOnceForwardNow(seconds int) {
old := dtmsvr.NowForwardDuration
dtmsvr.NowForwardDuration = time.Duration(seconds) * time.Second
CronTransOnce()
cronTransOnce()
dtmsvr.NowForwardDuration = old
}

2
test/wait_saga_test.go

@ -34,7 +34,7 @@ func sagaCommittedOngoingWait(t *testing.T) {
assert.Error(t, err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
CronTransOnce()
cronTransOnce()
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
}

Loading…
Cancel
Save