|
|
|
@ -18,6 +18,7 @@ package org.thingsboard.server.transport.lwm2m.server.store; |
|
|
|
import com.fasterxml.jackson.databind.JsonNode; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.eclipse.californium.core.coap.Token; |
|
|
|
import org.eclipse.californium.core.network.RandomTokenGenerator; |
|
|
|
import org.eclipse.californium.core.network.TokenGenerator; |
|
|
|
import org.eclipse.californium.core.network.serialization.UdpDataParser; |
|
|
|
import org.eclipse.californium.core.network.serialization.UdpDataSerializer; |
|
|
|
@ -53,6 +54,7 @@ import org.springframework.data.redis.core.Cursor; |
|
|
|
import org.springframework.data.redis.core.ScanOptions; |
|
|
|
import org.springframework.integration.redis.util.RedisLockRegistry; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; |
|
|
|
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider; |
|
|
|
|
|
|
|
import java.net.InetSocketAddress; |
|
|
|
@ -118,21 +120,22 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
|
|
|
|
private final RedisLockRegistry redisLock; |
|
|
|
|
|
|
|
private final TokenGenerator tokenGenerator; |
|
|
|
private final LwM2MTransportServerConfig config; |
|
|
|
private TokenGenerator tokenGenerator; |
|
|
|
|
|
|
|
private final LwM2mVersionedModelProvider modelProvider; |
|
|
|
|
|
|
|
public TbLwM2mRedisRegistrationStore(TokenGenerator tokenGenerator,RedisConnectionFactory connectionFactory, LwM2mVersionedModelProvider modelProvider) { |
|
|
|
this(tokenGenerator, connectionFactory, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT, modelProvider); // default clean period 60s
|
|
|
|
public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, LwM2mVersionedModelProvider modelProvider) { |
|
|
|
this(config, connectionFactory, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT, modelProvider); // default clean period 60s
|
|
|
|
} |
|
|
|
|
|
|
|
public TbLwM2mRedisRegistrationStore(TokenGenerator tokenGenerator, RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) { |
|
|
|
this(tokenGenerator, connectionFactory, Executors.newScheduledThreadPool(1, |
|
|
|
public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) { |
|
|
|
this(config, connectionFactory, Executors.newScheduledThreadPool(1, |
|
|
|
new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), |
|
|
|
cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, modelProvider); |
|
|
|
} |
|
|
|
|
|
|
|
public TbLwM2mRedisRegistrationStore(TokenGenerator tokenGenerator,RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, |
|
|
|
public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, |
|
|
|
long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) { |
|
|
|
this.connectionFactory = connectionFactory; |
|
|
|
this.schedExecutor = schedExecutor; |
|
|
|
@ -140,7 +143,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
this.cleanLimit = cleanLimit; |
|
|
|
this.gracePeriod = lifetimeGracePeriodInSec; |
|
|
|
this.redisLock = new RedisLockRegistry(connectionFactory, "Registration"); |
|
|
|
this.tokenGenerator = tokenGenerator; |
|
|
|
this.config = config; |
|
|
|
this.modelProvider = modelProvider; |
|
|
|
} |
|
|
|
|
|
|
|
@ -508,7 +511,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
((CompositeObservation)observation).getPaths().forEach(path -> { |
|
|
|
if (validateObserveResource(path, registrationId)) { |
|
|
|
String serializedObs = createSerializedSingleObservation(nodeSerObs, path.toString()); |
|
|
|
SingleObservation singleObservation = createSingleObservation(registrationId, path, ct, ctx, serializedObs, tokenGenerator); |
|
|
|
SingleObservation singleObservation = createSingleObservation(registrationId, path, ct, ctx, serializedObs, getTokenGenerator()); |
|
|
|
updateSingleObservation(registrationId, singleObservation, addIfAbsent, removed, connection); |
|
|
|
// cancel existing observations for the same path and registration id.
|
|
|
|
cancelObservation (singleObservation, registrationId, removed, connection); |
|
|
|
@ -706,6 +709,13 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab |
|
|
|
|
|
|
|
/* *************** Observation utility functions **************** */ |
|
|
|
|
|
|
|
private TokenGenerator getTokenGenerator(){ |
|
|
|
if (this.tokenGenerator == null) { |
|
|
|
this.tokenGenerator = new RandomTokenGenerator(config.getCoapConfig()); |
|
|
|
} |
|
|
|
return this.tokenGenerator; |
|
|
|
} |
|
|
|
|
|
|
|
private void unsafeRemoveObservation(RedisConnection connection, String registrationId, byte[] observationId) { |
|
|
|
if (connection.commands().del(toKey(OBS_TKN, observationId)) > 0L) { |
|
|
|
connection.listCommands().lRem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId); |
|
|
|
|