From c9a6e1bef206ca726633f11df5c0eb8d75688bbb Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 17:44:04 +0800 Subject: [PATCH] fix scan return more records --- dtmsvr/storage/redis/redis.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 6f4d732..624f6a0 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -70,6 +70,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s globals := []storage.TransGlobalStore{} redis := redisGet() for { + limit -= int64(len(globals)) keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys)) @@ -87,14 +88,15 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s (condition.CreateTimeEnd.IsZero() || global.CreateTime.Before(condition.CreateTimeEnd)) { globals = append(globals, global) } - if len(globals) == int(limit) { + // redis.Scan may return more records than limit + if len(globals) >= int(limit) { break } } } lid = nextCursor - if len(globals) == int(limit) || nextCursor == 0 { + if len(globals) >= int(limit) || nextCursor == 0 { break } } @@ -374,12 +376,14 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt kvs := []storage.KVStore{} redis := redisGet() for { + limit -= int64(len(kvs)) keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys)) dtmimp.E2P(err) if len(keys) > 0 { values, err := redis.MGet(ctx, keys...).Result() dtmimp.E2P(err) + logger.Debugf("keys: %s values: %s", dtmimp.MustMarshalString(keys), dtmimp.MustMarshalString(values)) for _, v := range values { if v == nil { continue @@ -391,7 +395,8 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt } lid = nextCursor - if len(kvs) == int(limit) || nextCursor == 0 { + // for redis, `count` in `scan` command is only a hint, may return more than `count` items + if len(kvs) >= int(limit) || nextCursor == 0 { break } }