|
|
|
@ -163,15 +163,15 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
lock.lock(); |
|
|
|
// add registration
|
|
|
|
byte[] k = toEndpointKey(registration.getEndpoint()); |
|
|
|
byte[] old = connection.getSet(k, serializeReg(registration)); |
|
|
|
byte[] old = connection.stringCommands().getSet(k, serializeReg(registration)); |
|
|
|
|
|
|
|
// add registration: secondary indexes
|
|
|
|
byte[] regid_idx = toRegIdKey(registration.getId()); |
|
|
|
connection.set(regid_idx, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
connection.stringCommands().set(regid_idx, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
byte[] addr_idx = toRegAddrKey(registration.getSocketAddress()); |
|
|
|
connection.set(addr_idx, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
connection.stringCommands().set(addr_idx, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
byte[] identity_idx = toRegIdentityKey(registration.getClientTransportData().getIdentity()); |
|
|
|
connection.set(identity_idx, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
connection.stringCommands().set(identity_idx, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
|
|
|
|
// Add or update expiration
|
|
|
|
addOrUpdateExpiration(connection, registration); |
|
|
|
@ -180,7 +180,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
Registration oldRegistration = deserializeReg(old); |
|
|
|
// remove old secondary index
|
|
|
|
if (!registration.getId().equals(oldRegistration.getId())) |
|
|
|
connection.del(toRegIdKey(oldRegistration.getId())); |
|
|
|
connection.keyCommands().del(toRegIdKey(oldRegistration.getId())); |
|
|
|
if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) { |
|
|
|
removeAddrIndex(connection, oldRegistration); |
|
|
|
} |
|
|
|
@ -209,7 +209,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
|
|
|
|
// Fetch the registration ep by registration ID index
|
|
|
|
byte[] ep = connection.get(toRegIdKey(update.getRegistrationId())); |
|
|
|
byte[] ep = connection.stringCommands().get(toRegIdKey(update.getRegistrationId())); |
|
|
|
if (ep == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -220,7 +220,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
lock.lock(); |
|
|
|
|
|
|
|
// Fetch the registration
|
|
|
|
byte[] data = connection.get(toEndpointKey(ep)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(ep)); |
|
|
|
if (data == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -230,7 +230,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
Registration updatedRegistration = update.update(r); |
|
|
|
|
|
|
|
// Store the new registration
|
|
|
|
connection.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration)); |
|
|
|
connection.stringCommands().set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration)); |
|
|
|
|
|
|
|
// Add or update expiration
|
|
|
|
addOrUpdateExpiration(connection, updatedRegistration); |
|
|
|
@ -239,7 +239,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
* If registration is already associated to this address we don't care as we only want to keep the most |
|
|
|
* recent binding. */ |
|
|
|
byte[] addr_idx = toRegAddrKey(updatedRegistration.getSocketAddress()); |
|
|
|
connection.set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8)); |
|
|
|
connection.stringCommands().set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8)); |
|
|
|
if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) { |
|
|
|
removeAddrIndex(connection, r); |
|
|
|
} |
|
|
|
@ -265,11 +265,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
} |
|
|
|
|
|
|
|
private Registration getRegistration(RedisConnection connection, String registrationId) { |
|
|
|
byte[] ep = connection.get(toRegIdKey(registrationId)); |
|
|
|
byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId)); |
|
|
|
if (ep == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
byte[] data = connection.get(toEndpointKey(ep)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(ep)); |
|
|
|
if (data == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -281,7 +281,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
public Registration getRegistrationByEndpoint(String endpoint) { |
|
|
|
Validate.notNull(endpoint); |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
byte[] data = connection.get(toEndpointKey(endpoint)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(endpoint)); |
|
|
|
if (data == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -293,11 +293,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
public Registration getRegistrationByAdress(InetSocketAddress address) { |
|
|
|
Validate.notNull(address); |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
byte[] ep = connection.get(toRegAddrKey(address)); |
|
|
|
byte[] ep = connection.stringCommands().get(toRegAddrKey(address)); |
|
|
|
if (ep == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
byte[] data = connection.get(toEndpointKey(ep)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(ep)); |
|
|
|
if (data == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -309,11 +309,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
public Registration getRegistrationByIdentity(LwM2mIdentity identity) { |
|
|
|
Validate.notNull(identity); |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
byte[] ep = connection.get(toRegIdentityKey(identity)); |
|
|
|
byte[] ep = connection.stringCommands().get(toRegIdentityKey(identity)); |
|
|
|
if (ep == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
byte[] data = connection.get(toEndpointKey(ep)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(ep)); |
|
|
|
if (data == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -332,12 +332,12 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
clusterConnection.clusterGetNodes().forEach(node -> |
|
|
|
scans.add(clusterConnection.scan(node, scanOptions))); |
|
|
|
} else { |
|
|
|
scans.add(scanConnection.scan(scanOptions)); |
|
|
|
scans.add(scanConnection.keyCommands().scan(scanOptions)); |
|
|
|
} |
|
|
|
|
|
|
|
scans.forEach(scan -> { |
|
|
|
scan.forEachRemaining(key -> { |
|
|
|
byte[] element = getConnection.get(key); |
|
|
|
byte[] element = getConnection.stringCommands().get(key); |
|
|
|
if (element != null) { |
|
|
|
list.add(deserializeReg(element)); |
|
|
|
} |
|
|
|
@ -357,7 +357,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
|
|
|
|
private Deregistration removeRegistration(RedisConnection connection, String registrationId, boolean removeOnlyIfNotAlive) { |
|
|
|
// fetch the client ep by registration ID index
|
|
|
|
byte[] ep = connection.get(toRegIdKey(registrationId)); |
|
|
|
byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId)); |
|
|
|
if (ep == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -369,16 +369,16 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
lock.lock(); |
|
|
|
|
|
|
|
// fetch the client
|
|
|
|
byte[] data = connection.get(toEndpointKey(ep)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(ep)); |
|
|
|
if (data == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
Registration r = deserializeReg(data); |
|
|
|
|
|
|
|
if (!removeOnlyIfNotAlive || !r.isAlive(gracePeriod)) { |
|
|
|
long nbRemoved = connection.del(toRegIdKey(r.getId())); |
|
|
|
long nbRemoved = connection.keyCommands().del(toRegIdKey(r.getId())); |
|
|
|
if (nbRemoved > 0) { |
|
|
|
connection.del(toEndpointKey(r.getEndpoint())); |
|
|
|
connection.keyCommands().del(toEndpointKey(r.getEndpoint())); |
|
|
|
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(connection, r.getId()); |
|
|
|
removeAddrIndex(connection, r); |
|
|
|
removeIdentityIndex(connection, r); |
|
|
|
@ -407,12 +407,12 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
// Watch the key to remove.
|
|
|
|
// connection.watch(indexKey);
|
|
|
|
|
|
|
|
byte[] epFromAddr = connection.get(indexKey); |
|
|
|
byte[] epFromAddr = connection.stringCommands().get(indexKey); |
|
|
|
// Delete the key if needed.
|
|
|
|
if (Arrays.equals(epFromAddr, endpointName.getBytes(UTF_8))) { |
|
|
|
// Try to delete the key
|
|
|
|
// connection.multi();
|
|
|
|
connection.del(indexKey); |
|
|
|
connection.keyCommands().del(indexKey); |
|
|
|
// connection.exec();
|
|
|
|
// if transaction failed this is not an issue as the index is probably reused and we don't need to
|
|
|
|
// delete it anymore.
|
|
|
|
@ -423,11 +423,11 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
} |
|
|
|
|
|
|
|
private void addOrUpdateExpiration(RedisConnection connection, Registration registration) { |
|
|
|
connection.zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
connection.zSetCommands().zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
} |
|
|
|
|
|
|
|
private void removeExpiration(RedisConnection connection, Registration registration) { |
|
|
|
connection.zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
connection.zSetCommands().zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); |
|
|
|
} |
|
|
|
|
|
|
|
private byte[] toRegIdKey(String registrationId) { |
|
|
|
@ -556,7 +556,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
@Override |
|
|
|
public Observation getObservation(ObservationIdentifier observationId) { |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
byte[] observationValue = connection.get(toKey(OBS_TKN, observationId.getBytes())); |
|
|
|
byte[] observationValue = connection.stringCommands().get(toKey(OBS_TKN, observationId.getBytes())); |
|
|
|
return deserializeObs(observationValue); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -570,7 +570,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
|
|
|
|
// fetch the client ep by registration ID index
|
|
|
|
byte[] ep = connection.get(toRegIdKey(registrationId)); |
|
|
|
byte[] ep = connection.stringCommands().get(toRegIdKey(registrationId)); |
|
|
|
if (ep == null) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
@ -635,7 +635,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
|
|
|
|
public Observation get(Token token) { |
|
|
|
try (var connection = connectionFactory.getConnection()) { |
|
|
|
byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes())); |
|
|
|
byte[] obs = connection.stringCommands().get(toKey(OBS_TKN, token.getBytes())); |
|
|
|
if (obs == null) { |
|
|
|
return null; |
|
|
|
} else { |
|
|
|
@ -664,14 +664,14 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
byte[] regIdKey = toKey(OBS_TKNS_REGID_IDX, registrationId); |
|
|
|
|
|
|
|
// fetch all observations by token
|
|
|
|
for (byte[] token : connection.lRange(regIdKey, 0, -1)) { |
|
|
|
byte[] obs = connection.get(toKey(OBS_TKN, token)); |
|
|
|
for (byte[] token : connection.listCommands().lRange(regIdKey, 0, -1)) { |
|
|
|
byte[] obs = connection.stringCommands().get(toKey(OBS_TKN, token)); |
|
|
|
if (obs != null) { |
|
|
|
removed.add(deserializeObs(obs)); |
|
|
|
} |
|
|
|
connection.del(toKey(OBS_TKN, token)); |
|
|
|
connection.keyCommands().del(toKey(OBS_TKN, token)); |
|
|
|
} |
|
|
|
connection.del(regIdKey); |
|
|
|
connection.keyCommands().del(regIdKey); |
|
|
|
|
|
|
|
return removed; |
|
|
|
} |
|
|
|
@ -752,7 +752,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
System.currentTimeMillis(), 0, cleanLimit); |
|
|
|
|
|
|
|
for (byte[] endpoint : endpointsExpired) { |
|
|
|
byte[] data = connection.get(toEndpointKey(endpoint)); |
|
|
|
byte[] data = connection.stringCommands().get(toEndpointKey(endpoint)); |
|
|
|
if (data != null && data.length > 0) { |
|
|
|
Registration r = deserializeReg(data); |
|
|
|
if (!r.isAlive(gracePeriod)) { |
|
|
|
|