Browse Source

Merge pull request #425 from wooln/bugfix-redis-scan

Bugfix redis store paging query bug with SCAN command
pull/426/head
yedf2 3 years ago
committed by GitHub
parent
commit
90daddc723
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      dtmsvr/storage/redis/redis.go
  2. 6
      test/api_test.go

70
dtmsvr/storage/redis/redis.go

@ -66,20 +66,35 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
if *position != "" {
lid = uint64(dtmimp.MustAtoi(*position))
}
keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result()
dtmimp.E2P(err)
globals := []storage.TransGlobalStore{}
if len(keys) > 0 {
values, err := redisGet().MGet(ctx, keys...).Result()
redis := redisGet()
for {
keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result()
logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys))
dtmimp.E2P(err)
for _, v := range values {
global := storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(v.(string), &global)
globals = append(globals, global)
if len(keys) > 0 {
values, err := redis.MGet(ctx, keys...).Result()
dtmimp.E2P(err)
for _, v := range values {
global := storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(v.(string), &global)
globals = append(globals, global)
if len(globals) == int(limit) {
break
}
}
}
lid = nextCursor
if len(globals) == int(limit) || nextCursor == 0 {
break
}
}
if cursor > 0 {
*position = fmt.Sprintf("%d", cursor)
if lid > 0 {
*position = fmt.Sprintf("%d", lid)
} else {
*position = ""
}
@ -339,23 +354,34 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt
if *position != "" {
lid = uint64(dtmimp.MustAtoi(*position))
}
keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result()
dtmimp.E2P(err)
kvs := []storage.KVStore{}
if len(keys) > 0 {
values, err := redisGet().MGet(ctx, keys...).Result()
redis := redisGet()
for {
keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result()
logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys))
dtmimp.E2P(err)
for _, v := range values {
if v == nil {
continue
if len(keys) > 0 {
values, err := redis.MGet(ctx, keys...).Result()
dtmimp.E2P(err)
for _, v := range values {
if v == nil {
continue
}
kv := storage.KVStore{}
dtmimp.MustUnmarshalString(v.(string), &kv)
kvs = append(kvs, kv)
}
kv := storage.KVStore{}
dtmimp.MustUnmarshalString(v.(string), &kv)
kvs = append(kvs, kv)
}
lid = nextCursor
if len(kvs) == int(limit) || nextCursor == 0 {
break
}
}
if cursor > 0 {
*position = fmt.Sprintf("%d", cursor)
if lid > 0 {
*position = fmt.Sprintf("%d", lid)
} else {
*position = ""
}

6
test/api_test.go

@ -63,6 +63,7 @@ func TestAPIAll(t *testing.T) {
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos := m["next_position"].(string)
assert.NotEqual(t, "", nextPos)
// assert.Equal(t, 1, len(m["transactions"].([]interface{})))
resp, err = dtmcli.GetRestyClient().R().SetQueryParam("gid", dtmimp.GetFuncName()+"1").Get(dtmutil.DefaultHTTPServer + "/all")
assert.Nil(t, err)
@ -79,6 +80,7 @@ func TestAPIAll(t *testing.T) {
nextPos2 := m["next_position"].(string)
assert.NotEqual(t, "", nextPos2)
assert.NotEqual(t, nextPos, nextPos2)
// assert.Equal(t, 1, len(m["transactions"].([]interface{})))
resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"limit": "1000",
@ -88,6 +90,7 @@ func TestAPIAll(t *testing.T) {
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos3 := m["next_position"].(string)
assert.Equal(t, "", nextPos3)
// assert.Equal(t, 2, len(m["transactions"].([]interface{}))) // the left 2.
//fmt.Printf("pos1:%s,pos2:%s,pos3:%s", nextPos, nextPos2, nextPos3)
}
@ -105,6 +108,7 @@ func TestAPIScanKV(t *testing.T) {
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos := m["next_position"].(string)
assert.NotEqual(t, "", nextPos)
// assert.Equal(t, 1, len(m["kv"].([]interface{})))
resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
@ -116,6 +120,7 @@ func TestAPIScanKV(t *testing.T) {
nextPos2 := m["next_position"].(string)
assert.NotEqual(t, "", nextPos2)
assert.NotEqual(t, nextPos, nextPos2)
// assert.Equal(t, 1, len(m["kv"].([]interface{})))
resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
@ -126,6 +131,7 @@ func TestAPIScanKV(t *testing.T) {
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos3 := m["next_position"].(string)
assert.Equal(t, "", nextPos3)
// assert.Equal(t, 2, len(m["kv"].([]interface{}))) // the left 2.
}
func TestAPIQueryKV(t *testing.T) {

Loading…
Cancel
Save