Browse Source

Modernized Redis access in LwM2M transport and OTA cache

pull/15663/head
Oleksandra Matviienko 1 month ago
parent
commit
0cb985cb92
  1. 8
      common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java
  2. 6
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MDtlsSessionRedisStore.java
  3. 4
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisClientOtaInfoStore.java
  4. 68
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java
  5. 24
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java
  6. 10
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java
  7. 8
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java
  8. 21
      common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java

8
common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java

@ -39,26 +39,26 @@ public class RedisOtaPackageDataCache implements OtaPackageDataCache {
public byte[] get(String key, int chunkSize, int chunk) {
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
if (chunkSize == 0) {
return connection.get(toOtaPackageCacheKey(key));
return connection.stringCommands().get(toOtaPackageCacheKey(key));
}
int startIndex = chunkSize * chunk;
int endIndex = startIndex + chunkSize - 1;
return connection.getRange(toOtaPackageCacheKey(key), startIndex, endIndex);
return connection.stringCommands().getRange(toOtaPackageCacheKey(key), startIndex, endIndex);
}
}
@Override
public void put(String key, byte[] value) {
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
connection.set(toOtaPackageCacheKey(key), value);
connection.stringCommands().set(toOtaPackageCacheKey(key), value);
}
}
@Override
public void evict(String key) {
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
connection.del(toOtaPackageCacheKey(key));
connection.keyCommands().del(toOtaPackageCacheKey(key));
}
}

6
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MDtlsSessionRedisStore.java

@ -33,7 +33,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore {
try (var c = connectionFactory.getConnection()) {
var serializedMsg = JavaSerDesUtil.encode(msg);
if (serializedMsg != null) {
c.set(getKey(endpoint), serializedMsg);
c.stringCommands().set(getKey(endpoint), serializedMsg);
} else {
throw new RuntimeException("Problem with serialization of message: " + msg);
}
@ -43,7 +43,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore {
@Override
public TbX509DtlsSessionInfo get(String endpoint) {
try (var c = connectionFactory.getConnection()) {
var data = c.get(getKey(endpoint));
var data = c.stringCommands().get(getKey(endpoint));
if (data != null) {
return JavaSerDesUtil.decode(data);
} else {
@ -55,7 +55,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore {
@Override
public void remove(String endpoint) {
try (var c = connectionFactory.getConnection()) {
c.del(getKey(endpoint));
c.keyCommands().del(getKey(endpoint));
}
}

4
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisClientOtaInfoStore.java

@ -33,7 +33,7 @@ public class TbLwM2mRedisClientOtaInfoStore implements TbLwM2MClientOtaInfoStore
private void put(OtaPackageType type, LwM2MClientOtaInfo<?, ?, ?> info) {
try (var connection = connectionFactory.getConnection()) {
connection.set((OTA_EP + type + info.getEndpoint()).getBytes(), JacksonUtil.toString(info).getBytes());
connection.stringCommands().set((OTA_EP + type + info.getEndpoint()).getBytes(), JacksonUtil.toString(info).getBytes());
}
}
@ -59,7 +59,7 @@ public class TbLwM2mRedisClientOtaInfoStore implements TbLwM2MClientOtaInfoStore
private <T extends LwM2MClientOtaInfo<?, ?, ?>> T getLwM2MClientOtaInfo(OtaPackageType type, String endpoint, Class<T> clazz) {
try (var connection = connectionFactory.getConnection()) {
byte[] data = connection.get((OTA_EP + type + endpoint).getBytes());
byte[] data = connection.stringCommands().get((OTA_EP + type + endpoint).getBytes());
return JacksonUtil.fromBytes(data, clazz);
}
}

68
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java

@ -163,15 +163,15 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
lock.lock();
// add registration
byte[] k = toEndpointKey(registration.getEndpoint());
byte[] old = connection.getSet(k, serializeReg(registration));
byte[] old = connection.stringCommands().getSet(k, serializeReg(registration));
// add registration: secondary indexes
byte[] regid_idx = toRegIdKey(registration.getId());
connection.set(regid_idx, registration.getEndpoint().getBytes(UTF_8));
connection.stringCommands().set(regid_idx, registration.getEndpoint().getBytes(UTF_8));
byte[] addr_idx = toRegAddrKey(registration.getSocketAddress());
connection.set(addr_idx, registration.getEndpoint().getBytes(UTF_8));
connection.stringCommands().set(addr_idx, registration.getEndpoint().getBytes(UTF_8));
byte[] identity_idx = toRegIdentityKey(registration.getClientTransportData().getIdentity());
connection.set(identity_idx, registration.getEndpoint().getBytes(UTF_8));
connection.stringCommands().set(identity_idx, registration.getEndpoint().getBytes(UTF_8));
// Add or update expiration
addOrUpdateExpiration(connection, registration);
@ -180,7 +180,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
Registration oldRegistration = deserializeReg(old);
// remove old secondary index
if (!registration.getId().equals(oldRegistration.getId()))
connection.del(toRegIdKey(oldRegistration.getId()));
connection.keyCommands().del(toRegIdKey(oldRegistration.getId()));
if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) {
removeAddrIndex(connection, oldRegistration);
}
@ -209,7 +209,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
try (var connection = connectionFactory.getConnection()) {
// Fetch the registration ep by registration ID index
byte[] ep = connection.get(toRegIdKey(update.getRegistrationId()));
byte[] ep = connection.stringCommands().get(toRegIdKey(update.getRegistrationId()));
if (ep == null) {
return null;
}
@ -220,7 +220,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
lock.lock();
// Fetch the registration
byte[] data = connection.get(toEndpointKey(ep));
byte[] data = connection.stringCommands().get(toEndpointKey(ep));
if (data == null) {
return null;
}
@ -230,7 +230,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
Registration updatedRegistration = update.update(r);
// Store the new registration
connection.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration));
connection.stringCommands().set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration));
// Add or update expiration
addOrUpdateExpiration(connection, updatedRegistration);
@ -239,7 +239,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
* If registration is already associated to this address we don't care as we only want to keep the most
* recent binding. */
byte[] addr_idx = toRegAddrKey(updatedRegistration.getSocketAddress());
connection.set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8));
connection.stringCommands().set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8));
if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) {
removeAddrIndex(connection, r);
}
@ -265,11 +265,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
}
private Registration getRegistration(RedisConnection connection, String registrationId) {
byte[] ep = connection.get(toRegIdKey(registrationId));
byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}
byte[] data = connection.get(toEndpointKey(ep));
byte[] data = connection.stringCommands().get(toEndpointKey(ep));
if (data == null) {
return null;
}
@ -281,7 +281,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
public Registration getRegistrationByEndpoint(String endpoint) {
Validate.notNull(endpoint);
try (var connection = connectionFactory.getConnection()) {
byte[] data = connection.get(toEndpointKey(endpoint));
byte[] data = connection.stringCommands().get(toEndpointKey(endpoint));
if (data == null) {
return null;
}
@ -293,11 +293,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
public Registration getRegistrationByAdress(InetSocketAddress address) {
Validate.notNull(address);
try (var connection = connectionFactory.getConnection()) {
byte[] ep = connection.get(toRegAddrKey(address));
byte[] ep = connection.stringCommands().get(toRegAddrKey(address));
if (ep == null) {
return null;
}
byte[] data = connection.get(toEndpointKey(ep));
byte[] data = connection.stringCommands().get(toEndpointKey(ep));
if (data == null) {
return null;
}
@ -309,11 +309,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
public Registration getRegistrationByIdentity(LwM2mIdentity identity) {
Validate.notNull(identity);
try (var connection = connectionFactory.getConnection()) {
byte[] ep = connection.get(toRegIdentityKey(identity));
byte[] ep = connection.stringCommands().get(toRegIdentityKey(identity));
if (ep == null) {
return null;
}
byte[] data = connection.get(toEndpointKey(ep));
byte[] data = connection.stringCommands().get(toEndpointKey(ep));
if (data == null) {
return null;
}
@ -332,12 +332,12 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
clusterConnection.clusterGetNodes().forEach(node ->
scans.add(clusterConnection.scan(node, scanOptions)));
} else {
scans.add(scanConnection.scan(scanOptions));
scans.add(scanConnection.keyCommands().scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = getConnection.get(key);
byte[] element = getConnection.stringCommands().get(key);
if (element != null) {
list.add(deserializeReg(element));
}
@ -357,7 +357,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
private Deregistration removeRegistration(RedisConnection connection, String registrationId, boolean removeOnlyIfNotAlive) {
// fetch the client ep by registration ID index
byte[] ep = connection.get(toRegIdKey(registrationId));
byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}
@ -369,16 +369,16 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
lock.lock();
// fetch the client
byte[] data = connection.get(toEndpointKey(ep));
byte[] data = connection.stringCommands().get(toEndpointKey(ep));
if (data == null) {
return null;
}
Registration r = deserializeReg(data);
if (!removeOnlyIfNotAlive || !r.isAlive(gracePeriod)) {
long nbRemoved = connection.del(toRegIdKey(r.getId()));
long nbRemoved = connection.keyCommands().del(toRegIdKey(r.getId()));
if (nbRemoved > 0) {
connection.del(toEndpointKey(r.getEndpoint()));
connection.keyCommands().del(toEndpointKey(r.getEndpoint()));
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(connection, r.getId());
removeAddrIndex(connection, r);
removeIdentityIndex(connection, r);
@ -407,12 +407,12 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
// Watch the key to remove.
// connection.watch(indexKey);
byte[] epFromAddr = connection.get(indexKey);
byte[] epFromAddr = connection.stringCommands().get(indexKey);
// Delete the key if needed.
if (Arrays.equals(epFromAddr, endpointName.getBytes(UTF_8))) {
// Try to delete the key
// connection.multi();
connection.del(indexKey);
connection.keyCommands().del(indexKey);
// connection.exec();
// if transaction failed this is not an issue as the index is probably reused and we don't need to
// delete it anymore.
@ -423,11 +423,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
}
private void addOrUpdateExpiration(RedisConnection connection, Registration registration) {
connection.zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8));
connection.zSetCommands().zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8));
}
private void removeExpiration(RedisConnection connection, Registration registration) {
connection.zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8));
connection.zSetCommands().zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8));
}
private byte[] toRegIdKey(String registrationId) {
@ -556,7 +556,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
@Override
public Observation getObservation(ObservationIdentifier observationId) {
try (var connection = connectionFactory.getConnection()) {
byte[] observationValue = connection.get(toKey(OBS_TKN, observationId.getBytes()));
byte[] observationValue = connection.stringCommands().get(toKey(OBS_TKN, observationId.getBytes()));
return deserializeObs(observationValue);
}
}
@ -570,7 +570,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
try (var connection = connectionFactory.getConnection()) {
// fetch the client ep by registration ID index
byte[] ep = connection.get(toRegIdKey(registrationId));
byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}
@ -635,7 +635,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
public Observation get(Token token) {
try (var connection = connectionFactory.getConnection()) {
byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes()));
byte[] obs = connection.stringCommands().get(toKey(OBS_TKN, token.getBytes()));
if (obs == null) {
return null;
} else {
@ -664,14 +664,14 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
byte[] regIdKey = toKey(OBS_TKNS_REGID_IDX, registrationId);
// fetch all observations by token
for (byte[] token : connection.lRange(regIdKey, 0, -1)) {
byte[] obs = connection.get(toKey(OBS_TKN, token));
for (byte[] token : connection.listCommands().lRange(regIdKey, 0, -1)) {
byte[] obs = connection.stringCommands().get(toKey(OBS_TKN, token));
if (obs != null) {
removed.add(deserializeObs(obs));
}
connection.del(toKey(OBS_TKN, token));
connection.keyCommands().del(toKey(OBS_TKN, token));
}
connection.del(regIdKey);
connection.keyCommands().del(regIdKey);
return removed;
}
@ -752,7 +752,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
System.currentTimeMillis(), 0, cleanLimit);
for (byte[] endpoint : endpointsExpired) {
byte[] data = connection.get(toEndpointKey(endpoint));
byte[] data = connection.stringCommands().get(toEndpointKey(endpoint));
if (data != null && data.length > 0) {
Registration r = deserializeReg(data);
if (!r.isAlive(gracePeriod)) {

24
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java

@ -52,7 +52,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
try (var connection = connectionFactory.getConnection()) {
lock = redisLock.obtain(toLockKey(endpoint));
lock.lock();
byte[] data = connection.get((SEC_EP + endpoint).getBytes());
byte[] data = connection.stringCommands().get((SEC_EP + endpoint).getBytes());
if (data == null || data.length == 0) {
return null;
} else {
@ -96,11 +96,11 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
try (var connection = connectionFactory.getConnection()) {
lock = redisLock.obtain(toLockKey(identity));
lock.lock();
byte[] ep = connection.hGet(PSKID_SEC.getBytes(), identity.getBytes());
byte[] ep = connection.hashCommands().hGet(PSKID_SEC.getBytes(), identity.getBytes());
if (ep == null) {
return null;
} else {
byte[] data = connection.get((SEC_EP + new String(ep)).getBytes());
byte[] data = connection.stringCommands().get((SEC_EP + new String(ep)).getBytes());
if (data == null || data.length == 0) {
return null;
} else {
@ -128,17 +128,17 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
lock = redisLock.obtain(tbSecurityInfo.getEndpoint());
lock.lock();
if (info != null && info.getPskIdentity() != null) {
byte[] oldEndpointBytes = connection.hGet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes());
byte[] oldEndpointBytes = connection.hashCommands().hGet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes());
if (oldEndpointBytes != null) {
String oldEndpoint = new String(oldEndpointBytes);
if (!oldEndpoint.equals(info.getEndpoint())) {
throw new NonUniqueSecurityInfoException("PSK Identity " + info.getPskIdentity() + " is already used");
}
connection.hSet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes(), info.getEndpoint().getBytes());
connection.hashCommands().hSet(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes(), info.getEndpoint().getBytes());
}
}
byte[] previousData = connection.getSet((SEC_EP + tbSecurityInfo.getEndpoint()).getBytes(), tbSecurityInfoSerialized);
byte[] previousData = connection.stringCommands().getSet((SEC_EP + tbSecurityInfo.getEndpoint()).getBytes(), tbSecurityInfoSerialized);
// for tests: redis connect NoSec (securityInfo == null)
log.info("lwm2m redis connect. Endpoint: [{}], secMode: [{}] key: [{}], tbSecurityInfoSerialized [{}]",
@ -147,7 +147,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
if (previousData != null && info != null) {
String previousIdentity = ((TbLwM2MSecurityInfo) JavaSerDesUtil.decode(previousData)).getSecurityInfo().getPskIdentity();
if (previousIdentity != null && !previousIdentity.equals(info.getPskIdentity())) {
connection.hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes());
connection.hashCommands().hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes());
}
}
} finally {
@ -163,7 +163,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
try (var connection = connectionFactory.getConnection()) {
lock = redisLock.obtain(endpoint);
lock.lock();
byte[] data = connection.get((SEC_EP + endpoint).getBytes());
byte[] data = connection.stringCommands().get((SEC_EP + endpoint).getBytes());
if (data != null && data.length > 0) {
return JavaSerDesUtil.decode(data);
} else {
@ -182,13 +182,13 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
try (var connection = connectionFactory.getConnection()) {
lock = redisLock.obtain(endpoint);
lock.lock();
byte[] data = connection.get((SEC_EP + endpoint).getBytes());
byte[] data = connection.stringCommands().get((SEC_EP + endpoint).getBytes());
if (data != null && data.length > 0) {
SecurityInfo info = ((TbLwM2MSecurityInfo) JavaSerDesUtil.decode(data)).getSecurityInfo();
if (info != null && info.getPskIdentity() != null) {
connection.hDel(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes());
connection.hashCommands().hDel(PSKID_SEC.getBytes(), info.getPskIdentity().getBytes());
}
connection.del((SEC_EP + endpoint).getBytes());
connection.keyCommands().del((SEC_EP + endpoint).getBytes());
}
} finally {
if (lock != null) {
@ -203,7 +203,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
private SecurityMode getSecurityModeByRegistration (RedisConnection connection, String endpoint) {
try {
byte[] data = connection.get((REG_EP + endpoint).getBytes());
byte[] data = connection.stringCommands().get((REG_EP + endpoint).getBytes());
JsonNode registrationNode = JacksonUtil.fromString(new String(data != null ? data : new byte[0]), JsonNode.class);
String typeModeStr = registrationNode.get("transportdata").get("identity").get("type").asText();
return "unsecure".equals(typeModeStr) ? SecurityMode.NO_SEC : null;

10
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java

@ -45,7 +45,7 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
@Override
public LwM2mClient get(String endpoint) {
try (var connection = connectionFactory.getConnection()) {
byte[] data = connection.get(getKey(endpoint));
byte[] data = connection.stringCommands().get(getKey(endpoint));
if (data == null) {
return null;
} else {
@ -70,12 +70,12 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
clusterConnection.clusterGetNodes().forEach(node ->
scans.add(clusterConnection.scan(node, scanOptions)));
} else {
scans.add(scanConnection.scan(scanOptions));
scans.add(scanConnection.keyCommands().scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = getConnection.get(key);
byte[] element = getConnection.stringCommands().get(key);
if (element != null) {
try {
clients.add(deserialize(element));
@ -97,7 +97,7 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
try {
byte[] clientSerialized = serialize(client);
try (var connection = connectionFactory.getConnection()) {
connection.getSet(getKey(client.getEndpoint()), clientSerialized);
connection.stringCommands().getSet(getKey(client.getEndpoint()), clientSerialized);
}
} catch (Exception e) {
log.warn("Failed to serialize client: {}", client, e);
@ -108,7 +108,7 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
@Override
public void remove(String endpoint) {
try (var connection = connectionFactory.getConnection()) {
connection.del(getKey(endpoint));
connection.keyCommands().del(getKey(endpoint));
}
}

8
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java

@ -44,12 +44,12 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore {
clusterConnection.clusterGetNodes().forEach(node ->
scans.add(clusterConnection.scan(node, scanOptions)));
} else {
scans.add(scanConnection.scan(scanOptions));
scans.add(scanConnection.keyCommands().scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = getConnection.get(key);
byte[] element = getConnection.stringCommands().get(key);
if (element != null) {
configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class));
}
@ -63,14 +63,14 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore {
public void put(LwM2MModelConfig modelConfig) {
byte[] clientSerialized = JacksonUtil.writeValueAsBytes(modelConfig);
try (var connection = connectionFactory.getConnection()) {
connection.getSet(getKey(modelConfig.getEndpoint()), clientSerialized);
connection.stringCommands().getSet(getKey(modelConfig.getEndpoint()), clientSerialized);
}
}
@Override
public void remove(String endpoint) {
try (var connection = connectionFactory.getConnection()) {
connection.del(getKey(endpoint));
connection.keyCommands().del(getKey(endpoint));
}
}

21
common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.lwm2m.server.store;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.redis.connection.RedisConnection;
@ -51,10 +52,10 @@ class TbRedisLwM2MClientStoreTest {
@Mock
RedisConnectionFactory connectionFactory;
@Mock
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
RedisConnection scanConnection;
@Mock
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
RedisConnection getConnection;
TbRedisLwM2MClientStore store;
@ -77,8 +78,8 @@ class TbRedisLwM2MClientStoreTest {
// Cursor created before thenReturn to avoid Mockito unfinished-stubbing error
Cursor<byte[]> cursor = cursorOf(key);
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
when(getConnection.get(key)).thenReturn(value);
when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor);
when(getConnection.stringCommands().get(key)).thenReturn(value);
Set<LwM2mClient> result = store.getAll();
@ -89,29 +90,29 @@ class TbRedisLwM2MClientStoreTest {
@Test
void getAll_getIsNeverCalledOnScanConnection() {
Cursor<byte[]> cursor = cursorOf();
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor);
store.getAll();
verify(scanConnection, never()).get(any(byte[].class));
verify(scanConnection.stringCommands(), never()).get(any(byte[].class));
}
@Test
void getAll_scanIsNeverCalledOnGetConnection() {
Cursor<byte[]> cursor = cursorOf();
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor);
store.getAll();
verify(getConnection, never()).scan(any(ScanOptions.class));
verify(getConnection.keyCommands(), never()).scan(any(ScanOptions.class));
}
@Test
void getAll_skipsKeyWhenValueIsNull() {
byte[] key = "CLIENT#EP#gone".getBytes();
Cursor<byte[]> cursor = cursorOf(key);
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
// getConnection.get(key) returns null by default — no stubbing needed
when(scanConnection.keyCommands().scan(any(ScanOptions.class))).thenReturn(cursor);
// getConnection.stringCommands().get(key) returns null by default — no stubbing needed
Set<LwM2mClient> result = store.getAll();

Loading…
Cancel
Save