|
|
|
@ -3,40 +3,64 @@ const env = require("../environment") |
|
|
|
const Redis = env.isTest() ? require("ioredis-mock") : require("ioredis") |
|
|
|
const { addDbPrefix, removeDbPrefix, getRedisOptions } = require("./utils") |
|
|
|
|
|
|
|
const RETRY_PERIOD_MS = 2000 |
|
|
|
const MAX_RETRIES = 20 |
|
|
|
const CLUSTERED = false |
|
|
|
|
|
|
|
// for testing just generate the client once
|
|
|
|
let CONNECTED = false |
|
|
|
let CLIENT = env.isTest() ? new Redis(getRedisOptions()) : null |
|
|
|
|
|
|
|
function retryConnection() { |
|
|
|
setTimeout(init, RETRY_PERIOD_MS) |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Inits the system, will error if unable to connect to redis cluster (may take up to 10 seconds) otherwise |
|
|
|
* will return the ioredis client which will be ready to use. |
|
|
|
* @return {Promise<object>} The ioredis client. |
|
|
|
*/ |
|
|
|
function init() { |
|
|
|
return new Promise((resolve, reject) => { |
|
|
|
// testing uses a single in memory client
|
|
|
|
if (env.isTest() || (CLIENT && CONNECTED)) { |
|
|
|
return resolve(CLIENT) |
|
|
|
} |
|
|
|
const { opts, host, port } = getRedisOptions(CLUSTERED) |
|
|
|
if (CLUSTERED) { |
|
|
|
CLIENT = new Redis.Cluster([{ host, port }], opts) |
|
|
|
} else { |
|
|
|
CLIENT = new Redis(opts) |
|
|
|
function errorOccurred(err) { |
|
|
|
CONNECTED = false; |
|
|
|
console.error("Redis connection failed - " + err) |
|
|
|
setTimeout(() => { |
|
|
|
init() |
|
|
|
}, RETRY_PERIOD_MS) |
|
|
|
} |
|
|
|
// testing uses a single in memory client
|
|
|
|
if (env.isTest() || (CLIENT && CONNECTED)) { |
|
|
|
return |
|
|
|
} |
|
|
|
if (CLIENT) { |
|
|
|
CLIENT.disconnect() |
|
|
|
} |
|
|
|
const { opts, host, port } = getRedisOptions(CLUSTERED) |
|
|
|
if (CLUSTERED) { |
|
|
|
CLIENT = new Redis.Cluster([{ host, port }], opts) |
|
|
|
} else { |
|
|
|
CLIENT = new Redis(opts) |
|
|
|
} |
|
|
|
CLIENT.on("end", err => { |
|
|
|
errorOccurred(err) |
|
|
|
}) |
|
|
|
CLIENT.on("error", err => { |
|
|
|
errorOccurred(err) |
|
|
|
}) |
|
|
|
CLIENT.on("connect", () => { |
|
|
|
CONNECTED = true |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
function waitForConnection() { |
|
|
|
return new Promise(resolve => { |
|
|
|
if (CLIENT == null) { |
|
|
|
init() |
|
|
|
} else if (CONNECTED) { |
|
|
|
resolve() |
|
|
|
return |
|
|
|
} |
|
|
|
CLIENT.on("end", err => { |
|
|
|
reject(err) |
|
|
|
CONNECTED = false |
|
|
|
}) |
|
|
|
CLIENT.on("error", err => { |
|
|
|
reject(err) |
|
|
|
CONNECTED = false |
|
|
|
}) |
|
|
|
CLIENT.on("connect", () => { |
|
|
|
resolve(CLIENT) |
|
|
|
CONNECTED = true |
|
|
|
resolve() |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
@ -85,31 +109,30 @@ class RedisWrapper { |
|
|
|
} |
|
|
|
|
|
|
|
async init() { |
|
|
|
this._client = await init() |
|
|
|
init() |
|
|
|
await waitForConnection() |
|
|
|
return this |
|
|
|
} |
|
|
|
|
|
|
|
async finish() { |
|
|
|
this._client.disconnect() |
|
|
|
CLIENT.disconnect() |
|
|
|
} |
|
|
|
|
|
|
|
async scan() { |
|
|
|
const db = this._db, |
|
|
|
client = this._client |
|
|
|
const db = this._db |
|
|
|
let stream |
|
|
|
if (CLUSTERED) { |
|
|
|
let node = client.nodes("master") |
|
|
|
let node = CLIENT.nodes("master") |
|
|
|
stream = node[0].scanStream({ match: db + "-*", count: 100 }) |
|
|
|
} else { |
|
|
|
stream = client.scanStream({ match: db + "-*", count: 100 }) |
|
|
|
stream = CLIENT.scanStream({ match: db + "-*", count: 100 }) |
|
|
|
} |
|
|
|
return promisifyStream(stream) |
|
|
|
} |
|
|
|
|
|
|
|
async get(key) { |
|
|
|
const db = this._db, |
|
|
|
client = this._client |
|
|
|
let response = await client.get(addDbPrefix(db, key)) |
|
|
|
const db = this._db |
|
|
|
let response = await CLIENT.get(addDbPrefix(db, key)) |
|
|
|
// overwrite the prefixed key
|
|
|
|
if (response != null && response.key) { |
|
|
|
response.key = key |
|
|
|
@ -123,22 +146,20 @@ class RedisWrapper { |
|
|
|
} |
|
|
|
|
|
|
|
async store(key, value, expirySeconds = null) { |
|
|
|
const db = this._db, |
|
|
|
client = this._client |
|
|
|
const db = this._db |
|
|
|
if (typeof value === "object") { |
|
|
|
value = JSON.stringify(value) |
|
|
|
} |
|
|
|
const prefixedKey = addDbPrefix(db, key) |
|
|
|
await client.set(prefixedKey, value) |
|
|
|
await CLIENT.set(prefixedKey, value) |
|
|
|
if (expirySeconds) { |
|
|
|
await client.expire(prefixedKey, expirySeconds) |
|
|
|
await CLIENT.expire(prefixedKey, expirySeconds) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
async delete(key) { |
|
|
|
const db = this._db, |
|
|
|
client = this._client |
|
|
|
await client.del(addDbPrefix(db, key)) |
|
|
|
const db = this._db |
|
|
|
await CLIENT.del(addDbPrefix(db, key)) |
|
|
|
} |
|
|
|
|
|
|
|
async clear() { |
|
|
|
|