diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index c7e6ce3..f9233ed 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -66,20 +66,35 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T if *position != "" { lid = uint64(dtmimp.MustAtoi(*position)) } - keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() - dtmimp.E2P(err) globals := []storage.TransGlobalStore{} - if len(keys) > 0 { - values, err := redisGet().MGet(ctx, keys...).Result() + redis := redisGet() + for { + keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() + logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys)) + dtmimp.E2P(err) - for _, v := range values { - global := storage.TransGlobalStore{} - dtmimp.MustUnmarshalString(v.(string), &global) - globals = append(globals, global) + + if len(keys) > 0 { + values, err := redis.MGet(ctx, keys...).Result() + dtmimp.E2P(err) + for _, v := range values { + global := storage.TransGlobalStore{} + dtmimp.MustUnmarshalString(v.(string), &global) + globals = append(globals, global) + if len(globals) == int(limit) { + break + } + } + } + + lid = nextCursor + if len(globals) == int(limit) || nextCursor == 0 { + break } } - if cursor > 0 { - *position = fmt.Sprintf("%d", cursor) + + if lid > 0 { + *position = fmt.Sprintf("%d", lid) } else { *position = "" } @@ -339,23 +354,34 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt if *position != "" { lid = uint64(dtmimp.MustAtoi(*position)) } - keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() - dtmimp.E2P(err) + kvs := []storage.KVStore{} - if len(keys) > 0 { - values, err := redisGet().MGet(ctx, keys...).Result() + redis := redisGet() + for { + keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() + logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys)) dtmimp.E2P(err) - for _, v := range values { - if v == nil { - continue + if len(keys) > 0 { + values, err := redis.MGet(ctx, keys...).Result() + dtmimp.E2P(err) + for _, v := range values { + if v == nil { + continue + } + kv := storage.KVStore{} + dtmimp.MustUnmarshalString(v.(string), &kv) + kvs = append(kvs, kv) } - kv := storage.KVStore{} - dtmimp.MustUnmarshalString(v.(string), &kv) - kvs = append(kvs, kv) + } + + lid = nextCursor + if len(kvs) == int(limit) || nextCursor == 0 { + break } } - if cursor > 0 { - *position = fmt.Sprintf("%d", cursor) + + if lid > 0 { + *position = fmt.Sprintf("%d", lid) } else { *position = "" }