diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index c7e6ce3..f9233ed 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -66,20 +66,35 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T if *position != "" { lid = uint64(dtmimp.MustAtoi(*position)) } - keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() - dtmimp.E2P(err) globals := []storage.TransGlobalStore{} - if len(keys) > 0 { - values, err := redisGet().MGet(ctx, keys...).Result() + redis := redisGet() + for { + keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() + logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys)) + dtmimp.E2P(err) - for _, v := range values { - global := storage.TransGlobalStore{} - dtmimp.MustUnmarshalString(v.(string), &global) - globals = append(globals, global) + + if len(keys) > 0 { + values, err := redis.MGet(ctx, keys...).Result() + dtmimp.E2P(err) + for _, v := range values { + global := storage.TransGlobalStore{} + dtmimp.MustUnmarshalString(v.(string), &global) + globals = append(globals, global) + if len(globals) == int(limit) { + break + } + } + } + + lid = nextCursor + if len(globals) == int(limit) || nextCursor == 0 { + break } } - if cursor > 0 { - *position = fmt.Sprintf("%d", cursor) + + if lid > 0 { + *position = fmt.Sprintf("%d", lid) } else { *position = "" } @@ -339,23 +354,34 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt if *position != "" { lid = uint64(dtmimp.MustAtoi(*position)) } - keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() - dtmimp.E2P(err) + kvs := []storage.KVStore{} - if len(keys) > 0 { - values, err := redisGet().MGet(ctx, keys...).Result() + redis := redisGet() + for { + keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() + logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys)) dtmimp.E2P(err) - for _, v := range values { - if v == nil { - continue + if len(keys) > 0 { + values, err := redis.MGet(ctx, keys...).Result() + dtmimp.E2P(err) + for _, v := range values { + if v == nil { + continue + } + kv := storage.KVStore{} + dtmimp.MustUnmarshalString(v.(string), &kv) + kvs = append(kvs, kv) } - kv := storage.KVStore{} - dtmimp.MustUnmarshalString(v.(string), &kv) - kvs = append(kvs, kv) + } + + lid = nextCursor + if len(kvs) == int(limit) || nextCursor == 0 { + break } } - if cursor > 0 { - *position = fmt.Sprintf("%d", cursor) + + if lid > 0 { + *position = fmt.Sprintf("%d", lid) } else { *position = "" } diff --git a/test/api_test.go b/test/api_test.go index addd80c..5755e0d 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -63,6 +63,7 @@ func TestAPIAll(t *testing.T) { dtmimp.MustUnmarshalString(resp.String(), &m) nextPos := m["next_position"].(string) assert.NotEqual(t, "", nextPos) + // assert.Equal(t, 1, len(m["transactions"].([]interface{}))) resp, err = dtmcli.GetRestyClient().R().SetQueryParam("gid", dtmimp.GetFuncName()+"1").Get(dtmutil.DefaultHTTPServer + "/all") assert.Nil(t, err) @@ -79,6 +80,7 @@ func TestAPIAll(t *testing.T) { nextPos2 := m["next_position"].(string) assert.NotEqual(t, "", nextPos2) assert.NotEqual(t, nextPos, nextPos2) + // assert.Equal(t, 1, len(m["transactions"].([]interface{}))) resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{ "limit": "1000", @@ -88,6 +90,7 @@ func TestAPIAll(t *testing.T) { dtmimp.MustUnmarshalString(resp.String(), &m) nextPos3 := m["next_position"].(string) assert.Equal(t, "", nextPos3) + // assert.Equal(t, 2, len(m["transactions"].([]interface{}))) // the left 2. //fmt.Printf("pos1:%s,pos2:%s,pos3:%s", nextPos, nextPos2, nextPos3) } @@ -105,6 +108,7 @@ func TestAPIScanKV(t *testing.T) { dtmimp.MustUnmarshalString(resp.String(), &m) nextPos := m["next_position"].(string) assert.NotEqual(t, "", nextPos) + // assert.Equal(t, 1, len(m["kv"].([]interface{}))) resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{ "cat": "topics", @@ -116,6 +120,7 @@ func TestAPIScanKV(t *testing.T) { nextPos2 := m["next_position"].(string) assert.NotEqual(t, "", nextPos2) assert.NotEqual(t, nextPos, nextPos2) + // assert.Equal(t, 1, len(m["kv"].([]interface{}))) resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{ "cat": "topics", @@ -126,6 +131,7 @@ func TestAPIScanKV(t *testing.T) { dtmimp.MustUnmarshalString(resp.String(), &m) nextPos3 := m["next_position"].(string) assert.Equal(t, "", nextPos3) + // assert.Equal(t, 2, len(m["kv"].([]interface{}))) // the left 2. } func TestAPIQueryKV(t *testing.T) {