From 1d72b3842f4dc950eef4299fbc0ef9c84dbd9e67 Mon Sep 17 00:00:00 2001 From: xyctruth <398041993@qq.com> Date: Thu, 24 Feb 2022 11:23:36 +0800 Subject: [PATCH 1/3] support resetCronTime --- dtmsvr/api_http.go | 14 ++++++- dtmsvr/storage/boltdb/boltdb.go | 33 ++++++++++++++- dtmsvr/storage/redis/redis.go | 25 ++++++++++++ dtmsvr/storage/sql/sql.go | 28 +++++++++++-- dtmsvr/storage/store.go | 1 + test/api_test.go | 14 +++++++ test/store_test.go | 72 +++++++++++++++++++++++++++++++++ 7 files changed, 179 insertions(+), 8 deletions(-) diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index 6027100..de85586 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -8,6 +8,8 @@ package dtmsvr import ( "errors" + "strconv" + "time" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -26,6 +28,7 @@ func addRoute(engine *gin.Engine) { engine.POST("/api/dtmsvr/registerTccBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query)) engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all)) + engine.GET("/api/dtmsvr/resetCronTime", dtmutil.WrapHandler2(resetCronTime)) // add prometheus exporter h := promhttp.Handler() @@ -75,7 +78,14 @@ func query(c *gin.Context) interface{} { func all(c *gin.Context) interface{} { position := c.Query("position") - slimit := dtmimp.OrString(c.Query("limit"), "100") - globals := GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(slimit))) + sLimit := dtmimp.OrString(c.Query("limit"), "100") + globals := GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit))) return map[string]interface{}{"transactions": globals, "next_position": position} } + +func resetCronTime(c *gin.Context) interface{} { + sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) + sLimit := dtmimp.OrString(c.Query("limit"), "100") + timeout := time.Duration(dtmimp.MustAtoi(sTimeoutSecond)) * time.Second + return GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit))) +} diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index 278e898..e6fe28c 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -11,15 +11,17 @@ import ( "strings" "time" - bolt "go.etcd.io/bbolt" - "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmutil" + bolt "go.etcd.io/bbolt" ) +var conf = &config.Config + // Store implements storage.Store, and storage with boltdb type Store struct { boltDb *bolt.DB @@ -409,3 +411,30 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS dtmimp.E2P(err) return trans } + +func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { + next := time.Now() + var trans *storage.TransGlobalStore + min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix()) + err := s.boltDb.Update(func(t *bolt.Tx) error { + + cursor := t.Bucket(bucketIndex).Cursor() + i := 0 + for k, v := cursor.Seek([]byte(min)); k != nil && i < int(limit); k, v = cursor.Next() { + trans = tGetGlobal(t, string(v)) + err := t.Bucket(bucketIndex).Delete(k) + dtmimp.E2P(err) + + if trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed { + continue + } + + trans.NextCronTime = &next + tPutGlobal(t, trans) + tPutIndex(t, next.Unix(), trans.Gid) + i++ + } + return nil + }) + return err +} diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 0f7e965..fa4b5b8 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -260,6 +260,31 @@ return gid } } +// ResetCronTime rest nextCronTime +func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { + next := time.Now().Unix() + timeoutTimestamp := time.Now().Add(timeout).Unix() + args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit) + lua := `-- ResetCronTime +local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'WITHSCORES', 'LIMIT', 0, ARGV[5]) +local index = 1 +while(true) +do + local gid = r[index] + if gid == nil then + break + end + redis.call('ZADD', KEYS[3], ARGV[4], gid) + index = index + 2 +end +` + _, err := callLua(args, lua) + if errors.Is(err, storage.ErrNotFound) { + return nil + } + return err +} + // TouchCronTime updates cronTime func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) { global.UpdateTime = dtmutil.GetNextTime(0) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 6c90822..3ec146b 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -5,14 +5,13 @@ import ( "math" "time" - "github.com/lithammer/shortuuid/v3" - "gorm.io/gorm" - "gorm.io/gorm/clause" - "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmutil" + "github.com/lithammer/shortuuid/v3" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) var conf = &config.Config @@ -157,6 +156,27 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS return global } +func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { + db := dbGet() + getTime := func(second int) string { + return map[string]string{ + "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), + "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), + }[conf.Store.Driver] + } + timeoutSecond := int(timeout / time.Second) + whereTime := fmt.Sprintf("next_cron_time > %s", getTime(timeoutSecond)) + global := &storage.TransGlobalStore{} + dbr := db.Must().Model(global). + Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). + Limit(int(limit)). + Select([]string{"next_cron_time"}). + Updates(&storage.TransGlobalStore{ + NextCronTime: dtmutil.GetNextTime(0), + }) + return dbr.Error +} + // SetDBConn sets db conn pool func SetDBConn(db *gorm.DB) { sqldb, _ := db.DB() diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index a03a912..55e822e 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -30,4 +30,5 @@ type Store interface { ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore + ResetCronTime(timeout time.Duration, limit int64) error } diff --git a/test/api_test.go b/test/api_test.go index c864101..5db7fc3 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -8,6 +8,7 @@ package test import ( "fmt" + "strconv" "testing" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -79,3 +80,16 @@ func TestDtmMetrics(t *testing.T) { assert.Nil(t, err) assert.Equal(t, rest.StatusCode(), 200) } + +func TestAPIResetCronTime(t *testing.T) { + testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error { + sTimeout := strconv.FormatInt(timeout, 10) + sLimit := strconv.FormatInt(limit, 10) + + _, err := dtmimp.RestyClient.R().SetQueryParams(map[string]string{ + "timeout": sTimeout, + "limit": sLimit, + }).Get(dtmutil.DefaultHTTPServer + "/resetCronTime") + return err + }) +} diff --git a/test/store_test.go b/test/store_test.go index 46869e0..bdcca7e 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -1,6 +1,7 @@ package test import ( + "fmt" "testing" "time" @@ -13,6 +14,10 @@ import ( func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) { next := time.Now().Add(10 * time.Second) + return initTransGlobalByNextCronTime(gid, next) +} + +func initTransGlobalByNextCronTime(gid string, next time.Time) (*storage.TransGlobalStore, storage.Store) { g := &storage.TransGlobalStore{Gid: gid, Status: "prepared", NextCronTime: &next} bs := []storage.TransBranchStore{ {Gid: gid, BranchID: "01"}, @@ -88,6 +93,73 @@ func TestStoreLockTrans(t *testing.T) { assert.Nil(t, g2) } +func TestStoreResetCronTime(t *testing.T) { + s := registry.GetStore() + testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error { + return s.ResetCronTime(time.Duration(timeout)*time.Second, limit) + }) +} + +func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) error) { + s := registry.GetStore() + var restTimeTimeout, lockExpireIn, limit, i int64 + restTimeTimeout = 100 //The time that will be ResetCronTime + lockExpireIn = 2 //The time that will be LockOneGlobalTrans + limit = 10 // rest limit + + // Will be reset + for i = 0; i < limit; i++ { + gid := funcName + fmt.Sprintf("%d", i) + _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout+10)*time.Second)) + } + + // Will not be reset + gid := funcName + fmt.Sprintf("%d", 10) + _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout-10)*time.Second)) + + // Not Fount + g := s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.Nil(t, g) + + // Rest limit-1 count + err := restCronHandler(restTimeTimeout, limit-1) + assert.Nil(t, err) + // Fount limit-1 count + for i = 0; i < limit-1; i++ { + g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.NotNil(t, g) + s.ChangeGlobalStatus(g, "succeed", []string{}, true) + } + + // Not Fount + g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.Nil(t, g) + + // Rest 1 count + err = restCronHandler(restTimeTimeout, limit) + // Fount 1 count + g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.NotNil(t, g) + s.ChangeGlobalStatus(g, "succeed", []string{}, true) + + // Not Fount + g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.Nil(t, g) + + // Increase the restTimeTimeout, Rest 1 count + err = restCronHandler(restTimeTimeout-12, limit) + assert.Nil(t, err) + // Fount 1 count + g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.NotNil(t, g) + s.ChangeGlobalStatus(g, "succeed", []string{}, true) + + // Not Fount + g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) + assert.Nil(t, g) + +} + func TestUpdateBranches(t *testing.T) { if !conf.Store.IsDB() { _, err := registry.GetStore().UpdateBranches(nil, nil) From 2c7e53cfe74224ccafe5b0401c260e427472853a Mon Sep 17 00:00:00 2001 From: xyctruth <398041993@qq.com> Date: Thu, 24 Feb 2022 11:32:18 +0800 Subject: [PATCH 2/3] golang ci lint --- dtmsvr/api_http.go | 2 ++ dtmsvr/storage/boltdb/boltdb.go | 5 ++--- dtmsvr/storage/redis/redis.go | 1 + dtmsvr/storage/sql/sql.go | 2 ++ test/store_test.go | 9 +++++---- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index de85586..e3507b8 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -83,6 +83,8 @@ func all(c *gin.Context) interface{} { return map[string]interface{}{"transactions": globals, "next_position": position} } +// resetCronTime rest nextCronTime +// Prevent multiple backoff from causing NextCronTime to be too long func resetCronTime(c *gin.Context) interface{} { sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) sLimit := dtmimp.OrString(c.Query("limit"), "100") diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index e6fe28c..a09eb12 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -14,14 +14,11 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmutil" bolt "go.etcd.io/bbolt" ) -var conf = &config.Config - // Store implements storage.Store, and storage with boltdb type Store struct { boltDb *bolt.DB @@ -412,6 +409,8 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS return trans } +// ResetCronTime rest nextCronTime +// Prevent multiple backoff from causing NextCronTime to be too long func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { next := time.Now() var trans *storage.TransGlobalStore diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index fa4b5b8..d4200c7 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -261,6 +261,7 @@ return gid } // ResetCronTime rest nextCronTime +// Prevent multiple backoff from causing NextCronTime to be too long func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { next := time.Now().Unix() timeoutTimestamp := time.Now().Add(timeout).Unix() diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 3ec146b..d263955 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -156,6 +156,8 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS return global } +// ResetCronTime rest nextCronTime +// Prevent multiple backoff from causing NextCronTime to be too long func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { db := dbGet() getTime := func(second int) string { diff --git a/test/store_test.go b/test/store_test.go index bdcca7e..75503f1 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -136,7 +136,8 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func( assert.Nil(t, g) // Rest 1 count - err = restCronHandler(restTimeTimeout, limit) + err1 := restCronHandler(restTimeTimeout, limit) + assert.Nil(t, err1) // Fount 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) @@ -146,9 +147,9 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func( g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Increase the restTimeTimeout, Rest 1 count - err = restCronHandler(restTimeTimeout-12, limit) - assert.Nil(t, err) + // reduce the restTimeTimeout, Rest 1 count + err2 := restCronHandler(restTimeTimeout-12, limit) + assert.Nil(t, err2) // Fount 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) From 1dbf2ccaeb56c3fd17e2a7bf724d232926cd5ba3 Mon Sep 17 00:00:00 2001 From: xyctruth <398041993@qq.com> Date: Thu, 24 Feb 2022 15:42:50 +0800 Subject: [PATCH 3/3] resetCronTime add has_remaining succeed_count --- dtmsvr/api_http.go | 7 ++++++- dtmsvr/storage/boltdb/boltdb.go | 18 +++++++++++------- dtmsvr/storage/redis/redis.go | 32 +++++++++++++++++++------------- dtmsvr/storage/sql/sql.go | 13 +++++++++++-- dtmsvr/storage/store.go | 2 +- test/api_test.go | 13 ++++++++++--- test/store_test.go | 25 ++++++++++++++++++------- 7 files changed, 76 insertions(+), 34 deletions(-) diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index e3507b8..2d8dea0 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -89,5 +89,10 @@ func resetCronTime(c *gin.Context) interface{} { sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) sLimit := dtmimp.OrString(c.Query("limit"), "100") timeout := time.Duration(dtmimp.MustAtoi(sTimeoutSecond)) * time.Second - return GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit))) + + succeedCount, hasRemaining, err := GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit))) + if err != nil { + return err + } + return map[string]interface{}{"has_remaining": hasRemaining, "succeed_count": succeedCount} } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index a09eb12..21abfb1 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -411,15 +411,19 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS // ResetCronTime rest nextCronTime // Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { +func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { next := time.Now() var trans *storage.TransGlobalStore min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix()) - err := s.boltDb.Update(func(t *bolt.Tx) error { - + err = s.boltDb.Update(func(t *bolt.Tx) error { cursor := t.Bucket(bucketIndex).Cursor() - i := 0 - for k, v := cursor.Seek([]byte(min)); k != nil && i < int(limit); k, v = cursor.Next() { + succeedCount = 0 + for k, v := cursor.Seek([]byte(min)); k != nil && succeedCount <= limit; k, v = cursor.Next() { + if succeedCount == limit { + hasRemaining = true + break + } + trans = tGetGlobal(t, string(v)) err := t.Bucket(bucketIndex).Delete(k) dtmimp.E2P(err) @@ -431,9 +435,9 @@ func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { trans.NextCronTime = &next tPutGlobal(t, trans) tPutIndex(t, next.Unix(), trans.Gid) - i++ + succeedCount++ } return nil }) - return err + return } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index d4200c7..0dcf992 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -262,28 +262,34 @@ return gid // ResetCronTime rest nextCronTime // Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { +func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { next := time.Now().Unix() timeoutTimestamp := time.Now().Add(timeout).Unix() args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit) lua := `-- ResetCronTime -local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'WITHSCORES', 'LIMIT', 0, ARGV[5]) -local index = 1 -while(true) -do - local gid = r[index] - if gid == nil then - break +local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'LIMIT', 0, ARGV[5]+1) +local i = 0 +for score,gid in pairs(r) do + if i == tonumber(ARGV[5]) then + i = i + 1 + break end redis.call('ZADD', KEYS[3], ARGV[4], gid) - index = index + 2 + i = i + 1 end +return tostring(i) ` - _, err := callLua(args, lua) - if errors.Is(err, storage.ErrNotFound) { - return nil + r := "" + r, err = callLua(args, lua) + if err != nil { + return } - return err + succeedCount = int64(dtmimp.MustAtoi(r)) + if succeedCount > limit { + hasRemaining = true + succeedCount = limit + } + return } // TouchCronTime updates cronTime diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index d263955..566d2f8 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -158,7 +158,7 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS // ResetCronTime rest nextCronTime // Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { +func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { db := dbGet() getTime := func(second int) string { return map[string]string{ @@ -176,7 +176,16 @@ func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error { Updates(&storage.TransGlobalStore{ NextCronTime: dtmutil.GetNextTime(0), }) - return dbr.Error + succeedCount = dbr.RowsAffected + if succeedCount == limit { + var count int64 + db.Must().Model(global).Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").Limit(1).Count(&count) + if count > 0 { + hasRemaining = true + } + } + + return succeedCount, hasRemaining, dbr.Error } // SetDBConn sets db conn pool diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 55e822e..391c4dd 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -30,5 +30,5 @@ type Store interface { ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore - ResetCronTime(timeout time.Duration, limit int64) error + ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) } diff --git a/test/api_test.go b/test/api_test.go index 5db7fc3..54e869d 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -82,14 +82,21 @@ func TestDtmMetrics(t *testing.T) { } func TestAPIResetCronTime(t *testing.T) { - testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error { + testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) (int64, bool, error) { sTimeout := strconv.FormatInt(timeout, 10) sLimit := strconv.FormatInt(limit, 10) - _, err := dtmimp.RestyClient.R().SetQueryParams(map[string]string{ + resp, err := dtmimp.RestyClient.R().SetQueryParams(map[string]string{ "timeout": sTimeout, "limit": sLimit, }).Get(dtmutil.DefaultHTTPServer + "/resetCronTime") - return err + + m := map[string]interface{}{} + dtmimp.MustUnmarshalString(resp.String(), &m) + hasRemaining, ok := m["has_remaining"].(bool) + assert.Equal(t, ok, true) + succeedCount, ok := m["succeed_count"].(float64) + assert.Equal(t, ok, true) + return int64(succeedCount), hasRemaining, err }) } diff --git a/test/store_test.go b/test/store_test.go index 75503f1..b9bb31e 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -95,12 +95,12 @@ func TestStoreLockTrans(t *testing.T) { func TestStoreResetCronTime(t *testing.T) { s := registry.GetStore() - testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error { + testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) (int64, bool, error) { return s.ResetCronTime(time.Duration(timeout)*time.Second, limit) }) } -func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) error) { +func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) (int64, bool, error)) { s := registry.GetStore() var restTimeTimeout, lockExpireIn, limit, i int64 restTimeTimeout = 100 //The time that will be ResetCronTime @@ -122,7 +122,9 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func( assert.Nil(t, g) // Rest limit-1 count - err := restCronHandler(restTimeTimeout, limit-1) + succeedCount, hasRemaining, err := restCronHandler(restTimeTimeout, limit-1) + assert.Equal(t, hasRemaining, true) + assert.Equal(t, succeedCount, limit-1) assert.Nil(t, err) // Fount limit-1 count for i = 0; i < limit-1; i++ { @@ -136,8 +138,10 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func( assert.Nil(t, g) // Rest 1 count - err1 := restCronHandler(restTimeTimeout, limit) - assert.Nil(t, err1) + succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout, limit) + assert.Equal(t, hasRemaining, false) + assert.Equal(t, succeedCount, int64(1)) + assert.Nil(t, err) // Fount 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) @@ -148,8 +152,10 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func( assert.Nil(t, g) // reduce the restTimeTimeout, Rest 1 count - err2 := restCronHandler(restTimeTimeout-12, limit) - assert.Nil(t, err2) + succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit) + assert.Equal(t, hasRemaining, false) + assert.Equal(t, succeedCount, int64(1)) + assert.Nil(t, err) // Fount 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) @@ -159,6 +165,11 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func( g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) + // Not Fount + succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit) + assert.Equal(t, hasRemaining, false) + assert.Equal(t, succeedCount, int64(0)) + assert.Nil(t, err) } func TestUpdateBranches(t *testing.T) {