Browse Source

resetCronTime add has_remaining succeed_count

pull/227/head
xyctruth 4 years ago
parent
commit
1dbf2ccaeb
  1. 7
      dtmsvr/api_http.go
  2. 18
      dtmsvr/storage/boltdb/boltdb.go
  3. 32
      dtmsvr/storage/redis/redis.go
  4. 13
      dtmsvr/storage/sql/sql.go
  5. 2
      dtmsvr/storage/store.go
  6. 13
      test/api_test.go
  7. 25
      test/store_test.go

7
dtmsvr/api_http.go

@ -89,5 +89,10 @@ func resetCronTime(c *gin.Context) interface{} {
sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10))
sLimit := dtmimp.OrString(c.Query("limit"), "100")
timeout := time.Duration(dtmimp.MustAtoi(sTimeoutSecond)) * time.Second
return GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit)))
succeedCount, hasRemaining, err := GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit)))
if err != nil {
return err
}
return map[string]interface{}{"has_remaining": hasRemaining, "succeed_count": succeedCount}
}

18
dtmsvr/storage/boltdb/boltdb.go

@ -411,15 +411,19 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
// ResetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
next := time.Now()
var trans *storage.TransGlobalStore
min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix())
err := s.boltDb.Update(func(t *bolt.Tx) error {
err = s.boltDb.Update(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
i := 0
for k, v := cursor.Seek([]byte(min)); k != nil && i < int(limit); k, v = cursor.Next() {
succeedCount = 0
for k, v := cursor.Seek([]byte(min)); k != nil && succeedCount <= limit; k, v = cursor.Next() {
if succeedCount == limit {
hasRemaining = true
break
}
trans = tGetGlobal(t, string(v))
err := t.Bucket(bucketIndex).Delete(k)
dtmimp.E2P(err)
@ -431,9 +435,9 @@ func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
trans.NextCronTime = &next
tPutGlobal(t, trans)
tPutIndex(t, next.Unix(), trans.Gid)
i++
succeedCount++
}
return nil
})
return err
return
}

32
dtmsvr/storage/redis/redis.go

@ -262,28 +262,34 @@ return gid
// ResetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
next := time.Now().Unix()
timeoutTimestamp := time.Now().Add(timeout).Unix()
args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit)
lua := `-- ResetCronTime
local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'WITHSCORES', 'LIMIT', 0, ARGV[5])
local index = 1
while(true)
do
local gid = r[index]
if gid == nil then
break
local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'LIMIT', 0, ARGV[5]+1)
local i = 0
for score,gid in pairs(r) do
if i == tonumber(ARGV[5]) then
i = i + 1
break
end
redis.call('ZADD', KEYS[3], ARGV[4], gid)
index = index + 2
i = i + 1
end
return tostring(i)
`
_, err := callLua(args, lua)
if errors.Is(err, storage.ErrNotFound) {
return nil
r := ""
r, err = callLua(args, lua)
if err != nil {
return
}
return err
succeedCount = int64(dtmimp.MustAtoi(r))
if succeedCount > limit {
hasRemaining = true
succeedCount = limit
}
return
}
// TouchCronTime updates cronTime

13
dtmsvr/storage/sql/sql.go

@ -158,7 +158,7 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
// ResetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
db := dbGet()
getTime := func(second int) string {
return map[string]string{
@ -176,7 +176,16 @@ func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
Updates(&storage.TransGlobalStore{
NextCronTime: dtmutil.GetNextTime(0),
})
return dbr.Error
succeedCount = dbr.RowsAffected
if succeedCount == limit {
var count int64
db.Must().Model(global).Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").Limit(1).Count(&count)
if count > 0 {
hasRemaining = true
}
}
return succeedCount, hasRemaining, dbr.Error
}
// SetDBConn sets db conn pool

2
dtmsvr/storage/store.go

@ -30,5 +30,5 @@ type Store interface {
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)
TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(timeout time.Duration, limit int64) error
ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
}

13
test/api_test.go

@ -82,14 +82,21 @@ func TestDtmMetrics(t *testing.T) {
}
func TestAPIResetCronTime(t *testing.T) {
testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error {
testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) (int64, bool, error) {
sTimeout := strconv.FormatInt(timeout, 10)
sLimit := strconv.FormatInt(limit, 10)
_, err := dtmimp.RestyClient.R().SetQueryParams(map[string]string{
resp, err := dtmimp.RestyClient.R().SetQueryParams(map[string]string{
"timeout": sTimeout,
"limit": sLimit,
}).Get(dtmutil.DefaultHTTPServer + "/resetCronTime")
return err
m := map[string]interface{}{}
dtmimp.MustUnmarshalString(resp.String(), &m)
hasRemaining, ok := m["has_remaining"].(bool)
assert.Equal(t, ok, true)
succeedCount, ok := m["succeed_count"].(float64)
assert.Equal(t, ok, true)
return int64(succeedCount), hasRemaining, err
})
}

25
test/store_test.go

@ -95,12 +95,12 @@ func TestStoreLockTrans(t *testing.T) {
func TestStoreResetCronTime(t *testing.T) {
s := registry.GetStore()
testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error {
testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) (int64, bool, error) {
return s.ResetCronTime(time.Duration(timeout)*time.Second, limit)
})
}
func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) error) {
func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) (int64, bool, error)) {
s := registry.GetStore()
var restTimeTimeout, lockExpireIn, limit, i int64
restTimeTimeout = 100 //The time that will be ResetCronTime
@ -122,7 +122,9 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(
assert.Nil(t, g)
// Rest limit-1 count
err := restCronHandler(restTimeTimeout, limit-1)
succeedCount, hasRemaining, err := restCronHandler(restTimeTimeout, limit-1)
assert.Equal(t, hasRemaining, true)
assert.Equal(t, succeedCount, limit-1)
assert.Nil(t, err)
// Fount limit-1 count
for i = 0; i < limit-1; i++ {
@ -136,8 +138,10 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(
assert.Nil(t, g)
// Rest 1 count
err1 := restCronHandler(restTimeTimeout, limit)
assert.Nil(t, err1)
succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout, limit)
assert.Equal(t, hasRemaining, false)
assert.Equal(t, succeedCount, int64(1))
assert.Nil(t, err)
// Fount 1 count
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
@ -148,8 +152,10 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(
assert.Nil(t, g)
// reduce the restTimeTimeout, Rest 1 count
err2 := restCronHandler(restTimeTimeout-12, limit)
assert.Nil(t, err2)
succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit)
assert.Equal(t, hasRemaining, false)
assert.Equal(t, succeedCount, int64(1))
assert.Nil(t, err)
// Fount 1 count
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
@ -159,6 +165,11 @@ func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Not Fount
succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit)
assert.Equal(t, hasRemaining, false)
assert.Equal(t, succeedCount, int64(0))
assert.Nil(t, err)
}
func TestUpdateBranches(t *testing.T) {

Loading…
Cancel
Save