diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java index c3a9ff0385..1e13b1cfe9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java @@ -39,26 +39,26 @@ public class RedisOtaPackageDataCache implements OtaPackageDataCache { public byte[] get(String key, int chunkSize, int chunk) { try (RedisConnection connection = redisConnectionFactory.getConnection()) { if (chunkSize == 0) { - return connection.get(toOtaPackageCacheKey(key)); + return connection.stringCommands().get(toOtaPackageCacheKey(key)); } int startIndex = chunkSize * chunk; int endIndex = startIndex + chunkSize - 1; - return connection.getRange(toOtaPackageCacheKey(key), startIndex, endIndex); + return connection.stringCommands().getRange(toOtaPackageCacheKey(key), startIndex, endIndex); } } @Override public void put(String key, byte[] value) { try (RedisConnection connection = redisConnectionFactory.getConnection()) { - connection.set(toOtaPackageCacheKey(key), value); + connection.stringCommands().set(toOtaPackageCacheKey(key), value); } } @Override public void evict(String key) { try (RedisConnection connection = redisConnectionFactory.getConnection()) { - connection.del(toOtaPackageCacheKey(key)); + connection.keyCommands().del(toOtaPackageCacheKey(key)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MDtlsSessionRedisStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MDtlsSessionRedisStore.java index ead9269108..fa78bbb7bb 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MDtlsSessionRedisStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MDtlsSessionRedisStore.java @@ -33,7 +33,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore { try (var c = connectionFactory.getConnection()) { var serializedMsg = JavaSerDesUtil.encode(msg); if (serializedMsg != null) { - c.set(getKey(endpoint), serializedMsg); + c.stringCommands().set(getKey(endpoint), serializedMsg); } else { throw new RuntimeException("Problem with serialization of message: " + msg); } @@ -43,7 +43,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore { @Override public TbX509DtlsSessionInfo get(String endpoint) { try (var c = connectionFactory.getConnection()) { - var data = c.get(getKey(endpoint)); + var data = c.stringCommands().get(getKey(endpoint)); if (data != null) { return JavaSerDesUtil.decode(data); } else { @@ -55,7 +55,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore { @Override public void remove(String endpoint) { try (var c = connectionFactory.getConnection()) { - c.del(getKey(endpoint)); + c.keyCommands().del(getKey(endpoint)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisClientOtaInfoStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisClientOtaInfoStore.java index 8301d4a448..c75a5ff8a0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisClientOtaInfoStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisClientOtaInfoStore.java @@ -33,7 +33,7 @@ public class TbLwM2mRedisClientOtaInfoStore implements TbLwM2MClientOtaInfoStore private void put(OtaPackageType type, LwM2MClientOtaInfo info) { try (var connection = connectionFactory.getConnection()) { - connection.set((OTA_EP + type + info.getEndpoint()).getBytes(), JacksonUtil.toString(info).getBytes()); + connection.stringCommands().set((OTA_EP + type + info.getEndpoint()).getBytes(), JacksonUtil.toString(info).getBytes()); } } @@ -59,7 +59,7 @@ public class TbLwM2mRedisClientOtaInfoStore implements TbLwM2MClientOtaInfoStore private > T getLwM2MClientOtaInfo(OtaPackageType type, String endpoint, Class clazz) { try (var connection = connectionFactory.getConnection()) { - byte[] data = connection.get((OTA_EP + type + endpoint).getBytes()); + byte[] data = connection.stringCommands().get((OTA_EP + type + endpoint).getBytes()); return JacksonUtil.fromBytes(data, clazz); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java index 8158e82a81..06f91729e4 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java @@ -163,15 +163,15 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab lock.lock(); // add registration byte[] k = toEndpointKey(registration.getEndpoint()); - byte[] old = connection.getSet(k, serializeReg(registration)); + byte[] old = connection.stringCommands().getSet(k, serializeReg(registration)); // add registration: secondary indexes byte[] regid_idx = toRegIdKey(registration.getId()); - connection.set(regid_idx, registration.getEndpoint().getBytes(UTF_8)); + connection.stringCommands().set(regid_idx, registration.getEndpoint().getBytes(UTF_8)); byte[] addr_idx = toRegAddrKey(registration.getSocketAddress()); - connection.set(addr_idx, registration.getEndpoint().getBytes(UTF_8)); + connection.stringCommands().set(addr_idx, registration.getEndpoint().getBytes(UTF_8)); byte[] identity_idx = toRegIdentityKey(registration.getClientTransportData().getIdentity()); - connection.set(identity_idx, registration.getEndpoint().getBytes(UTF_8)); + connection.stringCommands().set(identity_idx, registration.getEndpoint().getBytes(UTF_8)); // Add or update expiration addOrUpdateExpiration(connection, registration); @@ -180,7 +180,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab Registration oldRegistration = deserializeReg(old); // remove old secondary index if (!registration.getId().equals(oldRegistration.getId())) - connection.del(toRegIdKey(oldRegistration.getId())); + connection.keyCommands().del(toRegIdKey(oldRegistration.getId())); if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) { removeAddrIndex(connection, oldRegistration); } @@ -209,7 +209,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab try (var connection = connectionFactory.getConnection()) { // Fetch the registration ep by registration ID index - byte[] ep = connection.get(toRegIdKey(update.getRegistrationId())); + byte[] ep = connection.stringCommands().get(toRegIdKey(update.getRegistrationId())); if (ep == null) { return null; } @@ -220,7 +220,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab lock.lock(); // Fetch the registration - byte[] data = connection.get(toEndpointKey(ep)); + byte[] data = connection.stringCommands().get(toEndpointKey(ep)); if (data == null) { return null; } @@ -230,7 +230,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab Registration updatedRegistration = update.update(r); // Store the new registration - connection.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration)); + connection.stringCommands().set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration)); // Add or update expiration addOrUpdateExpiration(connection, updatedRegistration); @@ -239,7 +239,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab * If registration is already associated to this address we don't care as we only want to keep the most * recent binding. */ byte[] addr_idx = toRegAddrKey(updatedRegistration.getSocketAddress()); - connection.set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8)); + connection.stringCommands().set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8)); if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) { removeAddrIndex(connection, r); } @@ -265,11 +265,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab } private Registration getRegistration(RedisConnection connection, String registrationId) { - byte[] ep = connection.get(toRegIdKey(registrationId)); + byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId)); if (ep == null) { return null; } - byte[] data = connection.get(toEndpointKey(ep)); + byte[] data = connection.stringCommands().get(toEndpointKey(ep)); if (data == null) { return null; } @@ -281,7 +281,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab public Registration getRegistrationByEndpoint(String endpoint) { Validate.notNull(endpoint); try (var connection = connectionFactory.getConnection()) { - byte[] data = connection.get(toEndpointKey(endpoint)); + byte[] data = connection.stringCommands().get(toEndpointKey(endpoint)); if (data == null) { return null; } @@ -293,11 +293,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab public Registration getRegistrationByAdress(InetSocketAddress address) { Validate.notNull(address); try (var connection = connectionFactory.getConnection()) { - byte[] ep = connection.get(toRegAddrKey(address)); + byte[] ep = connection.stringCommands().get(toRegAddrKey(address)); if (ep == null) { return null; } - byte[] data = connection.get(toEndpointKey(ep)); + byte[] data = connection.stringCommands().get(toEndpointKey(ep)); if (data == null) { return null; } @@ -309,11 +309,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab public Registration getRegistrationByIdentity(LwM2mIdentity identity) { Validate.notNull(identity); try (var connection = connectionFactory.getConnection()) { - byte[] ep = connection.get(toRegIdentityKey(identity)); + byte[] ep = connection.stringCommands().get(toRegIdentityKey(identity)); if (ep == null) { return null; } - byte[] data = connection.get(toEndpointKey(ep)); + byte[] data = connection.stringCommands().get(toEndpointKey(ep)); if (data == null) { return null; } @@ -332,12 +332,12 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab clusterConnection.clusterGetNodes().forEach(node -> scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(scanConnection.scan(scanOptions)); + scans.add(scanConnection.keyCommands().scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = getConnection.get(key); + byte[] element = getConnection.stringCommands().get(key); if (element != null) { list.add(deserializeReg(element)); } @@ -357,7 +357,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab private Deregistration removeRegistration(RedisConnection connection, String registrationId, boolean removeOnlyIfNotAlive) { // fetch the client ep by registration ID index - byte[] ep = connection.get(toRegIdKey(registrationId)); + byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId)); if (ep == null) { return null; } @@ -369,16 +369,16 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab lock.lock(); // fetch the client - byte[] data = connection.get(toEndpointKey(ep)); + byte[] data = connection.stringCommands().get(toEndpointKey(ep)); if (data == null) { return null; } Registration r = deserializeReg(data); if (!removeOnlyIfNotAlive || !r.isAlive(gracePeriod)) { - long nbRemoved = connection.del(toRegIdKey(r.getId())); + long nbRemoved = connection.keyCommands().del(toRegIdKey(r.getId())); if (nbRemoved > 0) { - connection.del(toEndpointKey(r.getEndpoint())); + connection.keyCommands().del(toEndpointKey(r.getEndpoint())); Collection obsRemoved = unsafeRemoveAllObservations(connection, r.getId()); removeAddrIndex(connection, r); removeIdentityIndex(connection, r); @@ -407,12 +407,12 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab // Watch the key to remove. // connection.watch(indexKey); - byte[] epFromAddr = connection.get(indexKey); + byte[] epFromAddr = connection.stringCommands().get(indexKey); // Delete the key if needed. if (Arrays.equals(epFromAddr, endpointName.getBytes(UTF_8))) { // Try to delete the key // connection.multi(); - connection.del(indexKey); + connection.keyCommands().del(indexKey); // connection.exec(); // if transaction failed this is not an issue as the index is probably reused and we don't need to // delete it anymore. @@ -423,11 +423,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab } private void addOrUpdateExpiration(RedisConnection connection, Registration registration) { - connection.zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); + connection.zSetCommands().zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); } private void removeExpiration(RedisConnection connection, Registration registration) { - connection.zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); + connection.zSetCommands().zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); } private byte[] toRegIdKey(String registrationId) { @@ -556,7 +556,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab @Override public Observation getObservation(ObservationIdentifier observationId) { try (var connection = connectionFactory.getConnection()) { - byte[] observationValue = connection.get(toKey(OBS_TKN, observationId.getBytes())); + byte[] observationValue = connection.stringCommands().get(toKey(OBS_TKN, observationId.getBytes())); return deserializeObs(observationValue); } } @@ -570,7 +570,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab try (var connection = connectionFactory.getConnection()) { // fetch the client ep by registration ID index - byte[] ep = connection.get(toRegIdKey(registrationId)); + byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId)); if (ep == null) { return null; } @@ -635,7 +635,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab public Observation get(Token token) { try (var connection = connectionFactory.getConnection()) { - byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes())); + byte[] obs = connection.stringCommands().get(toKey(OBS_TKN, token.getBytes())); if (obs == null) { return null; } else { @@ -664,14 +664,14 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab byte[] regIdKey = toKey(OBS_TKNS_REGID_IDX, registrationId); // fetch all observations by token - for (byte[] token : connection.lRange(regIdKey, 0, -1)) { - byte[] obs = connection.get(toKey(OBS_TKN, token)); + for (byte[] token : connection.listCommands().lRange(regIdKey, 0, -1)) { + byte[] obs = connection.stringCommands().get(toKey(OBS_TKN, token)); if (obs != null) { removed.add(deserializeObs(obs)); } - connection.del(toKey(OBS_TKN, token)); + connection.keyCommands().del(toKey(OBS_TKN, token)); } - connection.del(regIdKey); + connection.keyCommands().del(regIdKey); return removed; } @@ -752,7 +752,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab System.currentTimeMillis(), 0, cleanLimit); for (byte[] endpoint : endpointsExpired) { - byte[] data = connection.get(toEndpointKey(endpoint)); + byte[] data = connection.stringCommands().get(toEndpointKey(endpoint)); if (data != null && data.length > 0) { Registration r = deserializeReg(data); if (!r.isAlive(gracePeriod)) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java index b07156d0ef..1e5d480c81 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java @@ -52,7 +52,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { try (var connection = connectionFactory.getConnection()) { lock = redisLock.obtain(toLockKey(endpoint)); lock.lock(); - byte[] data = connection.get((SEC_EP + endpoint).getBytes()); + byte[] data = connection.stringCommands().get((SEC_EP + endpoint).getBytes()); if (data == null || data.length == 0) { return null; } else { @@ -96,11 +96,11 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { try (var connection = connectionFactory.getConnection()) { lock = redisLock.obtain(toLockKey(identity)); lock.lock(); - byte[] ep = connection.hGet(PSKID_SEC.getBytes(), identity.getBytes()); + byte[] ep = connection.hashCommands().hGet(PSKID_SEC.getBytes(), identity.getBytes()); if (ep == null) { return null; } else { - byte[] data = connection.get((SEC_EP + new String(ep)).getBytes()); + byte[] data = connection.stringCommands().get((SEC_EP + new String(ep)).getBytes()); if (data == null || data.length == 0) { return null; } else { @@ -128,17 +128,17 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { lock = redisLock.obtain(tbSecurityInfo.getEndpoint()); lock.lock(); if (info != null && info.getPskIdentity() != null) { - byte[] oldEndpointBytes = connection.hGet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes()); + byte[] oldEndpointBytes = connection.hashCommands().hGet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes()); if (oldEndpointBytes != null) { String oldEndpoint = new String(oldEndpointBytes); if (!oldEndpoint.equals(info.getEndpoint())) { throw new NonUniqueSecurityInfoException("PSK Identity " + info.getPskIdentity() + " is already used"); } - connection.hSet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes(), info.getEndpoint().getBytes()); + connection.hashCommands().hSet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes(), info.getEndpoint().getBytes()); } } - byte[] previousData = connection.getSet((SEC_EP + tbSecurityInfo.getEndpoint()).getBytes(), tbSecurityInfoSerialized); + byte[] previousData = connection.stringCommands().getSet((SEC_EP + tbSecurityInfo.getEndpoint()).getBytes(), tbSecurityInfoSerialized); // for tests: redis connect NoSec (securityInfo == null) log.info("lwm2m redis connect. Endpoint: [{}], secMode: [{}] key: [{}], tbSecurityInfoSerialized [{}]", @@ -147,7 +147,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { if (previousData != null && info != null) { String previousIdentity = ((TbLwM2MSecurityInfo) JavaSerDesUtil.decode(previousData)).getSecurityInfo().getPskIdentity(); if (previousIdentity != null && !previousIdentity.equals(info.getPskIdentity())) { - connection.hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes()); + connection.hashCommands().hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes()); } } } finally { @@ -163,7 +163,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { try (var connection = connectionFactory.getConnection()) { lock = redisLock.obtain(endpoint); lock.lock(); - byte[] data = connection.get((SEC_EP + endpoint).getBytes()); + byte[] data = connection.stringCommands().get((SEC_EP + endpoint).getBytes()); if (data != null && data.length > 0) { return JavaSerDesUtil.decode(data); } else { @@ -182,13 +182,13 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { try (var connection = connectionFactory.getConnection()) { lock = redisLock.obtain(endpoint); lock.lock(); - byte[] data = connection.get((SEC_EP + endpoint).getBytes()); + byte[] data = connection.stringCommands().get((SEC_EP + endpoint).getBytes()); if (data != null && data.length > 0) { SecurityInfo info = ((TbLwM2MSecurityInfo) JavaSerDesUtil.decode(data)).getSecurityInfo(); if (info != null && info.getPskIdentity() != null) { - connection.hDel(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes()); + connection.hashCommands().hDel(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes()); } - connection.del((SEC_EP + endpoint).getBytes()); + connection.keyCommands().del((SEC_EP + endpoint).getBytes()); } } finally { if (lock != null) { @@ -203,7 +203,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { private SecurityMode getSecurityModeByRegistration (RedisConnection connection, String endpoint) { try { - byte[] data = connection.get((REG_EP + endpoint).getBytes()); + byte[] data = connection.stringCommands().get((REG_EP + endpoint).getBytes()); JsonNode registrationNode = JacksonUtil.fromString(new String(data != null ? data : new byte[0]), JsonNode.class); String typeModeStr = registrationNode.get("transportdata").get("identity").get("type").asText(); return "unsecure".equals(typeModeStr) ? SecurityMode.NO_SEC : null; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java index 4beefb4896..73d4bd95da 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java @@ -45,7 +45,7 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { @Override public LwM2mClient get(String endpoint) { try (var connection = connectionFactory.getConnection()) { - byte[] data = connection.get(getKey(endpoint)); + byte[] data = connection.stringCommands().get(getKey(endpoint)); if (data == null) { return null; } else { @@ -70,12 +70,12 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { clusterConnection.clusterGetNodes().forEach(node -> scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(scanConnection.scan(scanOptions)); + scans.add(scanConnection.keyCommands().scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = getConnection.get(key); + byte[] element = getConnection.stringCommands().get(key); if (element != null) { try { clients.add(deserialize(element)); @@ -97,7 +97,7 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { try { byte[] clientSerialized = serialize(client); try (var connection = connectionFactory.getConnection()) { - connection.getSet(getKey(client.getEndpoint()), clientSerialized); + connection.stringCommands().getSet(getKey(client.getEndpoint()), clientSerialized); } } catch (Exception e) { log.warn("Failed to serialize client: {}", client, e); @@ -108,7 +108,7 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { @Override public void remove(String endpoint) { try (var connection = connectionFactory.getConnection()) { - connection.del(getKey(endpoint)); + connection.keyCommands().del(getKey(endpoint)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java index 31a78234d0..65ed90d15d 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java @@ -44,12 +44,12 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore { clusterConnection.clusterGetNodes().forEach(node -> scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(scanConnection.scan(scanOptions)); + scans.add(scanConnection.keyCommands().scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = getConnection.get(key); + byte[] element = getConnection.stringCommands().get(key); if (element != null) { configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); } @@ -63,14 +63,14 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore { public void put(LwM2MModelConfig modelConfig) { byte[] clientSerialized = JacksonUtil.writeValueAsBytes(modelConfig); try (var connection = connectionFactory.getConnection()) { - connection.getSet(getKey(modelConfig.getEndpoint()), clientSerialized); + connection.stringCommands().getSet(getKey(modelConfig.getEndpoint()), clientSerialized); } } @Override public void remove(String endpoint) { try (var connection = connectionFactory.getConnection()) { - connection.del(getKey(endpoint)); + connection.keyCommands().del(getKey(endpoint)); } } diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java index 9fe29c1188..fe8d66365f 100644 --- a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.lwm2m.server.store; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.data.redis.connection.RedisConnection; @@ -51,10 +52,10 @@ class TbRedisLwM2MClientStoreTest { @Mock RedisConnectionFactory connectionFactory; - @Mock + @Mock(answer = Answers.RETURNS_DEEP_STUBS) RedisConnection scanConnection; - @Mock + @Mock(answer = Answers.RETURNS_DEEP_STUBS) RedisConnection getConnection; TbRedisLwM2MClientStore store; @@ -77,8 +78,8 @@ class TbRedisLwM2MClientStoreTest { // Cursor created before thenReturn to avoid Mockito unfinished-stubbing error Cursor cursor = cursorOf(key); - when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); - when(getConnection.get(key)).thenReturn(value); + when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor); + when(getConnection.stringCommands().get(key)).thenReturn(value); Set result = store.getAll(); @@ -89,29 +90,29 @@ class TbRedisLwM2MClientStoreTest { @Test void getAll_getIsNeverCalledOnScanConnection() { Cursor cursor = cursorOf(); - when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor); store.getAll(); - verify(scanConnection, never()).get(any(byte[].class)); + verify(scanConnection.stringCommands(), never()).get(any(byte[].class)); } @Test void getAll_scanIsNeverCalledOnGetConnection() { Cursor cursor = cursorOf(); - when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor); store.getAll(); - verify(getConnection, never()).scan(any(ScanOptions.class)); + verify(getConnection.keyCommands(), never()).scan(any(ScanOptions.class)); } @Test void getAll_skipsKeyWhenValueIsNull() { byte[] key = "CLIENT#EP#gone".getBytes(); Cursor cursor = cursorOf(key); - when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); - // getConnection.get(key) returns null by default — no stubbing needed + when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor); + // getConnection.stringCommands().get(key) returns null by default — no stubbing needed Set result = store.getAll();