diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index f9c4e6f5c0..da2c62016a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Predicate; @@ -85,14 +86,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); this.rpcPendingMap = new HashMap<>(); - refreshAttributes(); + initAttributes(); } - private void refreshAttributes() { + private void initAttributes() { this.deviceAttributes = new DeviceAttributes(fetchAttributes(DataConstants.CLIENT_SCOPE), fetchAttributes(DataConstants.SERVER_SCOPE), fetchAttributes(DataConstants.SHARED_SCOPE)); } + private void refreshAttributes(DeviceAttributesEventNotificationMsg msg) { + if (this.deviceAttributes != null) { + if (msg.isDeleted()) { + msg.getDeletedKeys().forEach(key -> deviceAttributes.remove(key)); + } else { + deviceAttributes.update(msg.getScope(), msg.getValues()); + } + } + } + void processRpcRequest(ActorContext context, ToDeviceRpcRequestPluginMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); ToDeviceRpcRequestBody body = request.getBody(); @@ -196,8 +207,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) { //TODO: improve this procedure to fetch only changed attributes and support attributes deletion - refreshAttributes(); - Set keys = msg.getKeys(); + refreshAttributes(msg); + Set keys = msg.getDeletedKeys(); if (attributeSubscriptions.size() > 0) { ToDeviceMsg notification = null; if (msg.isDeleted()) { @@ -359,8 +370,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private List fetchAttributes(String attributeType) { - return systemContext.getAttributesService().findAll(this.deviceId, attributeType); + private List fetchAttributes(String scope) { + try { + //TODO: replace this with async operation. Happens only during actor creation, but is still criticla for performance, + return systemContext.getAttributesService().findAll(this.deviceId, scope).get(); + } catch (InterruptedException | ExecutionException e) { + logger.warning("[{}] Failed to fetch attributes for scope: {}", deviceId, scope); + throw new RuntimeException(e); + } } public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index b102226921..bea51dbedb 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -302,9 +302,8 @@ public final class PluginProcessingContext implements PluginContext { @Override public void getDevice(DeviceId deviceId, PluginCallback callback) { - //TODO: add caching here with async api. - Device device = pluginCtx.deviceService.findDeviceById(deviceId); - pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, device), ActorRef.noSender()); + ListenableFuture deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId); + Futures.addCallback(deviceFuture, getCallback(callback, v -> v)); } @Override diff --git a/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java b/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java index f6cb4e31da..2940a62a93 100644 --- a/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import java.util.*; +import com.google.common.util.concurrent.Futures; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.id.*; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -226,7 +227,9 @@ public class DefaultActorServiceTest { when(pluginMock.getConfiguration()).thenReturn(pluginAdditionalInfo); when(pluginMock.getClazz()).thenReturn(TelemetryStoragePlugin.class.getName()); - when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Collections.emptyList()); + when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList())); + when(attributesService.findAll(deviceId, DataConstants.SHARED_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList())); + when(attributesService.findAll(deviceId, DataConstants.SERVER_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList())); initActorSystem(); Thread.sleep(1000); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java index 262d15d1a3..fd50f4d2ef 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java @@ -68,7 +68,7 @@ public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey)); log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey); return Futures.transform(executeAsyncRead(select), (Function>) input -> - Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one())) + Optional.ofNullable(convertResultToAttributesKvEntry(attributeKey, input.one())) , readResultsProcessingExecutor); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 42fede47f8..19a260f9e0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -133,8 +133,8 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao } private ListenableFuture> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) { - long minPartition = query.getStartTs(); - long maxPartition = query.getEndTs(); + long minPartition = toPartitionTs(query.getStartTs()); + long maxPartition = toPartitionTs(query.getEndTs()); ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition); diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java index 8a43e32b5f..8628d0c16c 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.extensions.api.device; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import java.util.*; @@ -65,4 +67,28 @@ public class DeviceAttributes { public Optional getServerPublicAttribute(String attribute) { return Optional.ofNullable(serverPublicAttributesMap.get(attribute)); } + + public void remove(AttributeKey key) { + Map map = getMapByScope(key.getScope()); + if (map != null) { + map.remove(key); + } + } + + public void update(String scope, List values) { + Map map = getMapByScope(scope); + values.forEach(v -> map.put(v.getKey(), v)); + } + + private Map getMapByScope(String scope) { + Map map = null; + if (scope.equalsIgnoreCase(DataConstants.CLIENT_SCOPE)) { + map = clientSideAttributesMap; + } else if (scope.equalsIgnoreCase(DataConstants.SHARED_SCOPE)) { + map = serverPublicAttributesMap; + } else if (scope.equalsIgnoreCase(DataConstants.SERVER_SCOPE)) { + map = serverPrivateAttributesMap; + } + return map; + } } diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java index 4ff72ee045..f25f4f82a3 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java @@ -15,37 +15,43 @@ */ package org.thingsboard.server.extensions.api.device; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKey; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import java.util.List; import java.util.Set; /** * @author Andrew Shvayka */ @ToString +@AllArgsConstructor public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotificationMsg { - @Getter private final TenantId tenantId; - @Getter private final DeviceId deviceId; - @Getter private final Set keys; - @Getter private final boolean deleted; + @Getter + private final TenantId tenantId; + @Getter + private final DeviceId deviceId; + @Getter + private final Set deletedKeys; + @Getter + private final String scope; + @Getter + private final List values; + @Getter + private final boolean deleted; - public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, Set keys) { - return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, false); + public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, String scope, List values) { + return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false); } public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, Set keys) { - return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, true); + return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, null, null, true); } - private DeviceAttributesEventNotificationMsg(TenantId tenantId, DeviceId deviceId, Set keys, boolean deleted) { - this.tenantId = tenantId; - this.deviceId = deviceId; - this.keys = keys; - this.deleted = deleted; - } } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 2bd17aab87..78fa4ad873 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -93,7 +93,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); List keys = Arrays.asList(keysStr.split(",")); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList()); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)).collect(Collectors.toList()); ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { @Override public void onSuccess(PluginContext ctx, List data) { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index fbfacd3983..4bc7ae0df5 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -48,6 +48,8 @@ import java.util.stream.Collectors; public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { private static final int UNKNOWN_SUBSCRIPTION_ID = 0; + public static final int DEFAULT_LIMIT = 100; + public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; private final SubscriptionManager subscriptionManager; @@ -187,13 +189,11 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { if (keysOptional.isPresent()) { long startTs; if (cmd.getTimeWindow() > 0) { - List data = new ArrayList<>(); List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId()); long endTs = System.currentTimeMillis(); startTs = endTs - cmd.getTimeWindow(); - - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList()); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys)); } else { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); @@ -277,7 +277,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { } DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId()); List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList()); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { @Override public void onSuccess(PluginContext ctx, List data) { @@ -299,6 +299,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { }); } + private static Aggregation getAggregation(String agg) { + return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg); + } + + private int getLimit(int limit) { + return limit == 0 ? DEFAULT_LIMIT : limit; + } + private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); if (sessionMD == null) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 2badd3ab79..854ad8f1e9 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -91,7 +91,9 @@ public class GatewaySessionCtx { public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { String deviceName = checkDeviceName(getDeviceName(msg)); GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); - deviceSessionCtx.setClosed(true); + if (deviceSessionCtx != null) { + deviceSessionCtx.setClosed(true); + } ack(msg); }