From 694b6bf85169ca7e1dbe87a5a282c8ee9bfb2721 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 19 Oct 2023 19:04:35 +0300 Subject: [PATCH] Refactoring: new data structures and message flow Local Subscription Service refactoring Refactoring of the proto structures Bug fixing after refactoring Minor improvements and bug fixes Improvements to avoid additional DB calls Bug fixes after refactoring Cluster mode improvements --- .../InMemoryHouseKeeperServiceService.java | 3 +- .../partition/TbCoreStartupService.java | 52 ++ .../queue/DefaultTbClusterService.java | 14 + .../queue/DefaultTbCoreConsumerService.java | 66 +- .../DefaultSubscriptionManagerService.java | 640 ++++++------------ ...efaultTbEntityDataSubscriptionService.java | 2 +- .../DefaultTbLocalSubscriptionService.java | 514 ++++++++++---- .../SubscriptionManagerService.java | 9 +- .../SubscriptionSchedulerComponent.java | 52 ++ .../subscription/TbAbstractDataSubCtx.java | 8 +- .../subscription/TbAbstractSubCtx.java | 4 + .../subscription/TbAttributeSubscription.java | 4 +- .../subscription/TbEntityLocalSubsInfo.java | 245 +++++++ .../subscription/TbEntityRemoteSubsInfo.java | 79 +++ .../subscription/TbEntitySubEvent.java | 42 ++ .../subscription/TbEntityUpdatesInfo.java | 30 + .../TbLocalSubscriptionService.java | 36 +- .../service/subscription/TbSubscription.java | 2 +- .../subscription/TbSubscriptionUtils.java | 358 +++++----- .../subscription/TbSubscriptionsInfo.java | 56 ++ ...ion.java => TbTimeSeriesSubscription.java} | 9 +- .../service/ws/DefaultWebSocketService.java | 36 +- .../DefaultNotificationCommandsHandler.java | 7 +- .../sub/NotificationsCountSubscription.java | 2 +- .../sub/NotificationsSubscription.java | 2 +- .../sub/AlarmSubscriptionUpdate.java | 23 +- .../sub/TelemetrySubscriptionUpdate.java | 14 +- .../src/test/resources/logback-test.xml | 2 + .../server/cluster/TbClusterService.java | 4 + common/cluster-api/src/main/proto/queue.proto | 105 ++- .../queue/discovery/HashPartitionService.java | 5 + .../queue/discovery/PartitionService.java | 2 + .../queue/discovery/ZkDiscoveryService.java | 12 +- .../event/OtherServiceShutdownEvent.java | 41 ++ .../server/queue/util/AfterStartUp.java | 1 + .../discovery/ZkDiscoveryServiceTest.java | 5 +- .../query/DefaultEntityQueryRepository.java | 1 - 37 files changed, 1668 insertions(+), 819 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionSchedulerComponent.java create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java rename application/src/main/java/org/thingsboard/server/service/subscription/{TbTimeseriesSubscription.java => TbTimeSeriesSubscription.java} (84%) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java index 4e83aa0060..9edc81f9fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -91,7 +90,7 @@ public class InMemoryHouseKeeperServiceService implements HouseKeeperService { } @Override - public void onFailure(@NotNull Throwable throwable) { + public void onFailure(Throwable throwable) { queueSize.decrementAndGet(); totalProcessedCounter.incrementAndGet(); log.error("[{}][{}] unassignDeletedUserAlarms failed, pending queue size: {}, total processed count: {}", diff --git a/application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java b/application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java new file mode 100644 index 0000000000..3139ff256e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java @@ -0,0 +1,52 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.partition; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.util.AfterStartUp; +import org.thingsboard.server.queue.util.TbCoreComponent; + +@Slf4j +@TbCoreComponent +@Service +@RequiredArgsConstructor +public class TbCoreStartupService { + + private final PartitionService partitionService; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbClusterService clusterService; + + @AfterStartUp(order = AfterStartUp.STARTUP_SERVICE) + public void onApplicationEvent(ApplicationReadyEvent event) { + var myPartitions = partitionService.getMyPartitions(new QueueKey(ServiceType.TB_CORE)); + if (myPartitions != null && !myPartitions.isEmpty()) { + TransportProtos.ToCoreNotificationMsg toCoreMsg = TransportProtos.ToCoreNotificationMsg.newBuilder() + .setCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder() + .setServiceId(serviceInfoProvider.getServiceId()).addAllPartitions(myPartitions).build()).build(); + clusterService.broadcastToCore(toCoreMsg); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 53ef38d96a..308d3b2f43 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -77,6 +77,7 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -138,6 +139,18 @@ public class DefaultTbClusterService implements TbClusterService { toCoreMsgs.incrementAndGet(); } + @Override + public void broadcastToCore(ToCoreNotificationMsg toCoreMsg) { + UUID msgId = UUID.randomUUID(); + TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); + Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); + for (String serviceId : tbCoreServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msgId, toCoreMsg), null); + toCoreNfs.incrementAndGet(); + } + } + @Override public void pushMsgToVersionControl(TenantId tenantId, TransportProtos.ToVersionControlServiceMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId); @@ -203,6 +216,7 @@ public class DefaultTbClusterService implements TbClusterService { } return null; } + private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile) { if (ruleEngineProfile != null) { RuleChainId targetRuleChainId = ruleEngineProfile.getDefaultRuleChainId(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fd5a252cc5..3494125d5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -58,7 +58,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -66,6 +65,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -93,7 +93,6 @@ import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; -import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -343,10 +342,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService Do NOTHING. + callback.onSuccess(); } else { throwNotHandled(msg, callback); } } + private void forwardCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg, TbCallback callback) { + log.info("[{}] Processing core startup with partitions: {}", coreStartupMsg.getServiceId(), coreStartupMsg.getPartitionsList()); + localSubscriptionService.onCoreStartupMsg(coreStartupMsg); + callback.onSuccess(); + } + private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) { - if (msg.hasAttributeSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAttributeSub()), callback); + if (msg.hasSubEvent()) { + TbEntitySubEventProto subEvent = msg.getSubEvent(); + subscriptionManagerService.onSubEvent(subEvent.getServiceId(), TbSubscriptionUtils.fromProto(subEvent), callback); } else if (msg.hasTelemetrySub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getTelemetrySub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasAlarmSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAlarmSub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasNotificationsSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getNotificationsSub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasNotificationsCountSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getNotificationsCountSub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasSubClose()) { - TbSubscriptionCloseProto closeProto = msg.getSubClose(); - subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasTsUpdate()) { TbTimeSeriesUpdateProto proto = msg.getTsUpdate(); long tenantIdMSB = proto.getTenantIdMSB(); @@ -576,7 +586,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService implements SubscriptionManagerService { - private final AttributesService attrService; - private final TimeseriesService tsService; private final NotificationsTopicService notificationsTopicService; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; @@ -97,126 +78,89 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene private final TbLocalSubscriptionService localSubscriptionService; private final DeviceStateService deviceStateService; private final TbClusterService clusterService; + private final SubscriptionSchedulerComponent scheduler; - private final Map> subscriptionsByEntityId = new ConcurrentHashMap<>(); - private final Map> subscriptionsByWsSessionId = new ConcurrentHashMap<>(); - private final ConcurrentMap> partitionedSubscriptions = new ConcurrentHashMap<>(); - private final Set currentPartitions = ConcurrentHashMap.newKeySet(); + private final Lock subsLock = new ReentrantLock(); + private final ConcurrentMap entitySubscriptions = new ConcurrentHashMap<>(); + + private final ConcurrentMap entityUpdates = new ConcurrentHashMap<>(); - private ExecutorService tsCallBackExecutor; private String serviceId; private TbQueueProducer> toCoreNotificationsProducer; + private long initTs; + @PostConstruct public void initExecutor() { - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); serviceId = serviceInfoProvider.getServiceId(); + initTs = System.currentTimeMillis(); toCoreNotificationsProducer = producerProvider.getTbCoreNotificationsMsgProducer(); - } - - @PreDestroy - public void shutdownExecutor() { - if (tsCallBackExecutor != null) { - tsCallBackExecutor.shutdownNow(); - } + scheduler.scheduleWithFixedDelay(this::cleanupEntityUpdates, 1, 1, TimeUnit.HOURS); } @Override - public void addSubscription(TbSubscription subscription, TbCallback callback) { - log.trace("[{}][{}][{}] Registering subscription for entity [{}]", - subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); - if (currentPartitions.contains(tpi)) { - partitionedSubscriptions.computeIfAbsent(tpi, k -> ConcurrentHashMap.newKeySet()).add(subscription); + public void onSubEvent(String serviceId, TbEntitySubEvent event, TbCallback callback) { + var tenantId = event.getTenantId(); + var entityId = event.getEntityId(); + log.trace("[{}][{}][{}] Processing subscription event {}", tenantId, entityId, serviceId, event); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (tpi.isMyPartition()) { + subsLock.lock(); + try { + var entitySubs = entitySubscriptions.computeIfAbsent(entityId, id -> new TbEntityRemoteSubsInfo(tenantId, entityId)); + boolean empty = entitySubs.updateAndCheckIsEmpty(serviceId, event); + if (empty) { + entitySubscriptions.remove(entityId); + } + } finally { + subsLock.unlock(); + } callback.onSuccess(); + if (event.hasTsOrAttrSub()) { + sendSubEventCallback(serviceId, entityId, event.getSeqNumber()); + } } else { - log.warn("[{}][{}] Entity belongs to external partition. Probably rebalancing is in progress. Topic: {}" - , subscription.getTenantId(), subscription.getEntityId(), tpi.getFullTopicName()); + log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}" + , tenantId, entityId, serviceId, tpi.getFullTopicName()); callback.onFailure(new RuntimeException("Entity belongs to external partition " + tpi.getFullTopicName() + "!")); } - boolean newSubscription = subscriptionsByEntityId - .computeIfAbsent(subscription.getEntityId(), k -> ConcurrentHashMap.newKeySet()).add(subscription); - subscriptionsByWsSessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()).put(subscription.getSubscriptionId(), subscription); - if (newSubscription) { - switch (subscription.getType()) { - case TIMESERIES: - handleNewTelemetrySubscription((TbTimeseriesSubscription) subscription); - break; - case ATTRIBUTES: - handleNewAttributeSubscription((TbAttributeSubscription) subscription); - break; - case ALARMS: - handleNewAlarmsSubscription((TbAlarmsSubscription) subscription); - break; - } - } } @Override - public void cancelSubscription(String sessionId, int subscriptionId, TbCallback callback) { - log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); - Map sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId); - if (sessionSubscriptions != null) { - TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); - if (subscription != null) { - removeSubscriptionFromEntityMap(subscription); - removeSubscriptionFromPartitionMap(subscription); - if (sessionSubscriptions.isEmpty()) { - subscriptionsByWsSessionId.remove(sessionId); - } - } else { - log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); + @EventListener(OtherServiceShutdownEvent.class) + public void onApplicationEvent(OtherServiceShutdownEvent event) { + if (event.getServiceTypes() != null && event.getServiceTypes().contains(ServiceType.TB_CORE)) { + subsLock.lock(); + try { + int sizeBeforeCleanup = entitySubscriptions.size(); + entitySubscriptions.entrySet().removeIf(kv -> kv.getValue().removeAndCheckIsEmpty(event.getServiceId())); + log.info("[{}][{}] Removed {} entity subscription records due to server shutdown.", serviceId, event.getServiceId(), entitySubscriptions.size() - sizeBeforeCleanup); + } finally { + subsLock.unlock(); } + } + } + + private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) { + var update = getEntityUpdatesInfo(entityId); + if (serviceId.equals(targetId)) { + localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY); } else { - log.debug("[{}] No session subscriptions found!", sessionId); + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update)); } - callback.onSuccess(); } @Override protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { - Set removedPartitions = new HashSet<>(currentPartitions); - removedPartitions.removeAll(partitionChangeEvent.getPartitions()); - - currentPartitions.clear(); - currentPartitions.addAll(partitionChangeEvent.getPartitions()); - - // We no longer manage current partition of devices; - removedPartitions.forEach(partition -> { - Set subs = partitionedSubscriptions.remove(partition); - if (subs != null) { - subs.forEach(sub -> { - if (!serviceId.equals(sub.getServiceId())) { - removeSubscriptionFromEntityMap(sub); - } - }); - } - }); + entitySubscriptions.values().removeIf(sub -> + !partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()).isMyPartition()); } } @Override public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { - return (TbTimeseriesSubscription) s; - } else { - return null; - } - }, s -> true, s -> { - List subscriptionUpdate = null; - for (TsKvEntry kv : ts) { - if ((s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey())))) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(kv); - } - } - return subscriptionUpdate; - }, true); + onTimeSeriesUpdate(entityId, ts); if (entityId.getEntityType() == EntityType.DEVICE) { updateDeviceInactivityTimeout(tenantId, entityId, ts); } @@ -224,168 +168,68 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } @Override - public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback) { - onAttributesUpdate(tenantId, entityId, scope, attributes, true, callback); - } - - @Override - public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { - return (TbAttributeSubscription) s; - } else { - return null; - } - }, - s -> (TbAttributeSubscriptionScope.ANY_SCOPE.equals(s.getScope()) || scope.equals(s.getScope().name())), - s -> { - List subscriptionUpdate = null; - for (AttributeKvEntry kv : attributes) { - if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv)); - } - } - return subscriptionUpdate; - }, true); + public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback) { + onTimeSeriesUpdate(entityId, + keys.stream().map(key -> new BasicTsKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList())); if (entityId.getEntityType() == EntityType.DEVICE) { - if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) { - updateDeviceInactivityTimeout(tenantId, entityId, attributes); - } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { - clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, - new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) - , null); - } + deleteDeviceInactivityTimeout(tenantId, entityId, keys); } callback.onSuccess(); } - private void updateDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List kvEntries) { - for (KvEntry kvEntry : kvEntries) { - if (kvEntry.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { - deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), getLongValue(kvEntry)); - } + public void onTimeSeriesUpdate(EntityId entityId, List update) { + getEntityUpdatesInfo(entityId).timeSeriesUpdateTs = System.currentTimeMillis(); + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + log.trace("[{}] Handling time-series update: {}", entityId, update); + subInfo.getSubs().forEach((serviceId, sub) -> { + if (sub.tsAllKeys) { + onTimeSeriesUpdate(serviceId, entityId, update); + } else if (sub.tsKeys != null) { + List tmp = getSubList(update, sub.tsKeys); + if (tmp != null) { + onTimeSeriesUpdate(serviceId, entityId, tmp); + } + } + }); + } else { + log.trace("[{}] No time-series subscriptions for entity.", entityId); } } - private void deleteDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List keys) { - for (String key : keys) { - if (key.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { - deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), 0); - } + private void onTimeSeriesUpdate(String targetId, EntityId entityId, List update) { + if (serviceId.equals(targetId)) { + localSubscriptionService.onTimeSeriesUpdate(entityId, update, TbCallback.EMPTY); + } else { + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(true, entityId, update)); } } @Override - public void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { - onLocalAlarmSubUpdate(entityId, - s -> { - if (TbSubscriptionType.ALARMS.equals(s.getType())) { - return (TbAlarmsSubscription) s; - } else { - return null; - } - }, - s -> alarm.getCreatedTime() >= s.getTs() || alarm.getAssignTs() >= s.getTs(), - alarm, false - ); - callback.onSuccess(); - } - - @Override - public void onAlarmDeleted(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { - onLocalAlarmSubUpdate(entityId, - s -> { - if (TbSubscriptionType.ALARMS.equals(s.getType())) { - return (TbAlarmsSubscription) s; - } else { - return null; - } - }, - s -> alarm.getCreatedTime() >= s.getTs(), - alarm, true - ); - callback.onSuccess(); - } - - @Override - public void onNotificationUpdate(TenantId tenantId, UserId recipientId, NotificationUpdate notificationUpdate, TbCallback callback) { - Set subscriptions = subscriptionsByEntityId.get(recipientId); - if (subscriptions != null) { - NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate(notificationUpdate); - log.trace("Handling notificationUpdate for user {}: {}", recipientId, notificationUpdate); - subscriptions.stream() - .filter(subscription -> subscription.getType() == TbSubscriptionType.NOTIFICATIONS - || subscription.getType() == TbSubscriptionType.NOTIFICATIONS_COUNT) - .forEach(subscription -> onNotificationsSubUpdate(subscriptionUpdate, subscription)); - } - callback.onSuccess(); + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback) { + onAttributesUpdate(tenantId, entityId, scope, attributes, true, callback); } @Override - public void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate notificationRequestUpdate, TbCallback callback) { - NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate(notificationRequestUpdate); - subscriptionsByEntityId.forEach((entityId, subscriptions) -> { - if (entityId.getEntityType() != EntityType.USER) { - return; + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, TbCallback callback) { + getEntityUpdatesInfo(entityId).attributesUpdateTs = System.currentTimeMillis(); + processAttributesUpdate(entityId, attributes); + if (entityId.getEntityType() == EntityType.DEVICE) { + if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) { + updateDeviceInactivityTimeout(tenantId, entityId, attributes); + } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { + clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, + new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) + , null); } - log.trace("Handling notificationRequestUpdate for user {}: {}", entityId, notificationRequestUpdate); - subscriptions.forEach(subscription -> { - if (subscription.getType() != TbSubscriptionType.NOTIFICATIONS && - subscription.getType() != TbSubscriptionType.NOTIFICATIONS_COUNT) { - return; - } - if (!subscription.getTenantId().equals(tenantId)) { - return; - } - onNotificationsSubUpdate(subscriptionUpdate, subscription); - }); - }); - callback.onSuccess(); - } - - private void onNotificationsSubUpdate(NotificationsSubscriptionUpdate subscriptionUpdate, TbSubscription subscription) { - if (serviceId.equals(subscription.getServiceId())) { - log.trace("[{}][{}][{}] Subscription session is managed by current service, forwarding to localSubscriptionService (update: {})", - subscription.getServiceId(), subscription.getEntityId(), subscription.getSessionId(), subscriptionUpdate); - localSubscriptionService.onSubscriptionUpdate(subscription.getSessionId(), - subscription.getSubscriptionId(), subscriptionUpdate, TbCallback.EMPTY); - } else { - log.trace("[{}][{}][{}] Subscription session is not managed by current service (update: {})", - subscription.getServiceId(), subscription.getEntityId(), subscription.getSessionId(), subscriptionUpdate); - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - ToCoreNotificationMsg updateProto = TbSubscriptionUtils.notificationsSubUpdateToProto(subscription, subscriptionUpdate); - TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(subscription.getEntityId().getId(), updateProto); - toCoreNotificationsProducer.send(tpi, queueMsg, null); } + callback.onSuccess(); } @Override public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { - return (TbAttributeSubscription) s; - } else { - return null; - } - }, - s -> (TbAttributeSubscriptionScope.ANY_SCOPE.equals(s.getScope()) || scope.equals(s.getScope().name())), - s -> { - List subscriptionUpdate = null; - for (String key : keys) { - if (s.isAllKeys() || s.getKeyStates().containsKey(key)) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, ""))); - } - } - return subscriptionUpdate; - }, false); + processAttributesUpdate(entityId, + keys.stream().map(key -> new BaseAttributeKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList())); if (entityId.getEntityType() == EntityType.DEVICE) { if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope) || TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) { @@ -398,222 +242,117 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene callback.onSuccess(); } - @Override - public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { - return (TbTimeseriesSubscription) s; - } else { - return null; - } - }, s -> true, s -> { - List subscriptionUpdate = null; - for (String key : keys) { - if (s.isAllKeys() || s.getKeyStates().containsKey(key)) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, ""))); - } - } - return subscriptionUpdate; - }, false); - if (entityId.getEntityType() == EntityType.DEVICE) { - deleteDeviceInactivityTimeout(tenantId, entityId, keys); - } - callback.onSuccess(); - } - - private void onLocalTelemetrySubUpdate(EntityId entityId, - Function castFunction, - Predicate filterFunction, - Function> processFunction, - boolean ignoreEmptyUpdates) { - Set entitySubscriptions = subscriptionsByEntityId.get(entityId); - if (entitySubscriptions != null) { - entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> { - List subscriptionUpdate = processFunction.apply(s); - if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) { - if (serviceId.equals(s.getServiceId())) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate); - localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); - } else { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(s, subscriptionUpdate, ignoreEmptyUpdates), null); + public void processAttributesUpdate(EntityId entityId, List update) { + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + log.trace("[{}] Handling attributes update: {}", entityId, update); + subInfo.getSubs().forEach((serviceId, sub) -> { + if (sub.attrAllKeys) { + processAttributesUpdate(serviceId, entityId, update); + } else if (sub.attrKeys != null) { + List tmp = getSubList(update, sub.attrKeys); + if (tmp != null) { + processAttributesUpdate(serviceId, entityId, tmp); } } }); } else { - log.debug("[{}] No device subscriptions to process!", entityId); + log.trace("[{}] No attributes subscriptions for entity.", entityId); } } - private void onLocalAlarmSubUpdate(EntityId entityId, - Function castFunction, - Predicate filterFunction, - AlarmInfo alarm, boolean deleted) { - Set entitySubscriptions = subscriptionsByEntityId.get(entityId); - if (alarm == null) { - log.warn("[{}] empty alarm update!", entityId); - return; - } - if (entitySubscriptions != null) { - entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> { - if (serviceId.equals(s.getServiceId())) { - AlarmSubscriptionUpdate update = new AlarmSubscriptionUpdate(s.getSubscriptionId(), alarm, deleted); - localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); - } else { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(s, alarm, deleted), null); - } - }); + private void processAttributesUpdate(String targetId, EntityId entityId, List update) { + List tsKvEntryList = update.stream().map(attr -> new BasicTsKvEntry(attr.getLastUpdateTs(), attr)).collect(Collectors.toList()); + if (serviceId.equals(targetId)) { + localSubscriptionService.onAttributesUpdate(entityId, tsKvEntryList, TbCallback.EMPTY); } else { - log.debug("[{}] No device subscriptions to process!", entityId); + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(false, entityId, tsKvEntryList)); } } - private void removeSubscriptionFromEntityMap(TbSubscription sub) { - Set entitySubSet = subscriptionsByEntityId.get(sub.getEntityId()); - if (entitySubSet != null) { - entitySubSet.remove(sub); - if (entitySubSet.isEmpty()) { - subscriptionsByEntityId.remove(sub.getEntityId()); + private void updateDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List kvEntries) { + for (KvEntry kvEntry : kvEntries) { + if (kvEntry.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { + deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), getLongValue(kvEntry)); } } } - private void removeSubscriptionFromPartitionMap(TbSubscription sub) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()); - Set subs = partitionedSubscriptions.get(tpi); - if (subs != null) { - subs.remove(sub); + private void deleteDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List keys) { + for (String key : keys) { + if (key.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { + deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), 0); + } } } - private void handleNewAttributeSubscription(TbAttributeSubscription subscription) { - log.trace("[{}][{}][{}] Processing remote attribute subscription for entity [{}]", - serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - - final Map keyStates = subscription.getKeyStates(); - DonAsynchron.withCallback(attrService.find(subscription.getTenantId(), subscription.getEntityId(), DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> { - List missedUpdates = new ArrayList<>(); - values.forEach(latestEntry -> { - if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) { - missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); - } - }); - if (!missedUpdates.isEmpty()) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null); - } - }, - e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor); + @Override + public void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { + onAlarmSubUpdate(tenantId, entityId, alarm, false, callback); } - private void handleNewAlarmsSubscription(TbAlarmsSubscription subscription) { - log.trace("[{}][{}][{}] Processing remote alarm subscription for entity [{}]", - serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - //TODO: @dlandiak search all new alarms for this entity. + @Override + public void onAlarmDeleted(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { + onAlarmSubUpdate(tenantId, entityId, alarm, true, callback); } - private void handleNewTelemetrySubscription(TbTimeseriesSubscription subscription) { - log.trace("[{}][{}][{}] Processing remote telemetry subscription for entity [{}]", - serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - - long curTs = System.currentTimeMillis(); - - if (subscription.isLatestValues()) { - DonAsynchron.withCallback(tsService.findLatest(subscription.getTenantId(), subscription.getEntityId(), subscription.getKeyStates().keySet()), - missedUpdates -> { - if (missedUpdates != null && !missedUpdates.isEmpty()) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null); - } - }, - e -> log.error("Failed to fetch missed updates.", e), - tsCallBackExecutor); - } else { - List queries = new ArrayList<>(); - subscription.getKeyStates().forEach((key, value) -> { - if (curTs > value) { - long startTs = subscription.getStartTime() > 0 ? Math.max(subscription.getStartTime(), value + 1L) : (value + 1L); - long endTs = subscription.getEndTime() > 0 ? Math.min(subscription.getEndTime(), curTs) : curTs; - queries.add(new BaseReadTsKvQuery(key, startTs, endTs, 0, 1000, Aggregation.NONE)); + private void onAlarmSubUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback) { + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + log.trace("[{}][{}] Handling alarm update {}: {}", tenantId, entityId, alarm, deleted); + for (Map.Entry entry : subInfo.getSubs().entrySet()) { + if (entry.getValue().notifications) { + onAlarmSubUpdate(entry.getKey(), entityId, alarm, deleted); } - }); - if (!queries.isEmpty()) { - DonAsynchron.withCallback(tsService.findAll(subscription.getTenantId(), subscription.getEntityId(), queries), - missedUpdates -> { - if (missedUpdates != null && !missedUpdates.isEmpty()) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null); - } - }, - e -> log.error("Failed to fetch missed updates.", e), - tsCallBackExecutor); } } + callback.onSuccess(); } - private TbProtoQueueMsg toProto(TbSubscription subscription, List updates) { - return toProto(subscription, updates, true); + private void onAlarmSubUpdate(String targetServiceId, EntityId entityId, AlarmInfo alarm, boolean deleted) { + if (alarm == null) { + log.warn("[{}] empty alarm update!", entityId); + return; + } + if (serviceId.equals(targetServiceId)) { + log.trace("[{}] Forwarding to local service: {} deleted: {}", entityId, alarm, deleted); + localSubscriptionService.onAlarmUpdate(entityId, alarm, deleted, TbCallback.EMPTY); + } else { + sendCoreNotification(targetServiceId, entityId, + TbSubscriptionUtils.toAlarmSubUpdateToProto(entityId, alarm, deleted)); + } } - private TbProtoQueueMsg toProto(TbSubscription subscription, List updates, boolean ignoreEmptyUpdates) { - TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder(); - - builder.setSessionId(subscription.getSessionId()); - builder.setSubscriptionId(subscription.getSubscriptionId()); - - Map> data = new TreeMap<>(); - for (TsKvEntry tsEntry : updates) { - List values = data.computeIfAbsent(tsEntry.getKey(), k -> new ArrayList<>()); - Object[] value = new Object[2]; - value[0] = tsEntry.getTs(); - value[1] = tsEntry.getValueAsString(); - values.add(value); - } + private void sendCoreNotification(String targetServiceId, EntityId entityId, ToCoreNotificationMsg msg) { + log.trace("[{}] Forwarding to remote service [{}]: {}", entityId, targetServiceId, msg); + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, targetServiceId); + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(entityId.getId(), msg); + toCoreNotificationsProducer.send(tpi, queueMsg, null); + } - data.forEach((key, value) -> { - TbSubscriptionUpdateValueListProto.Builder dataBuilder = TbSubscriptionUpdateValueListProto.newBuilder(); - dataBuilder.setKey(key); - boolean hasData = false; - for (Object v : value) { - Object[] array = (Object[]) v; - TbSubscriptionUpdateTsValue.Builder tsValueBuilder = TbSubscriptionUpdateTsValue.newBuilder(); - tsValueBuilder.setTs((long) array[0]); - String strVal = (String) array[1]; - if (strVal != null) { - hasData = true; - tsValueBuilder.setValue(strVal); + @Override + public void onNotificationUpdate(TenantId tenantId, UserId entityId, NotificationUpdate notificationUpdate, TbCallback callback) { + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate(notificationUpdate); + log.trace("[{}][{}] Handling notificationUpdate for user {}", tenantId, entityId, notificationUpdate); + for (Map.Entry entry : subInfo.getSubs().entrySet()) { + if (entry.getValue().notifications) { + onNotificationsSubUpdate(entry.getKey(), entityId, subscriptionUpdate); } - dataBuilder.addTsValue(tsValueBuilder.build()); - } - if (!ignoreEmptyUpdates || hasData) { - builder.addData(dataBuilder.build()); } - }); - - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg( - LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build()) - .build(); - return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); + } + callback.onSuccess(); } - private TbProtoQueueMsg toProto(TbSubscription subscription, AlarmInfo alarm, boolean deleted) { - TbAlarmSubscriptionUpdateProto.Builder builder = TbAlarmSubscriptionUpdateProto.newBuilder(); - - builder.setSessionId(subscription.getSessionId()); - builder.setSubscriptionId(subscription.getSubscriptionId()); - builder.setAlarm(JacksonUtil.toString(alarm)); - builder.setDeleted(deleted); - - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg( - LocalSubscriptionServiceMsgProto.newBuilder() - .setAlarmSubUpdate(builder.build()).build()) - .build(); - return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); + private void onNotificationsSubUpdate(String targetServiceId, EntityId entityId, NotificationsSubscriptionUpdate subscriptionUpdate) { + if (serviceId.equals(targetServiceId)) { + log.trace("[{}] Forwarding to local service: {}", entityId, subscriptionUpdate); + localSubscriptionService.onNotificationUpdate(entityId, subscriptionUpdate, TbCallback.EMPTY); + } else { + sendCoreNotification(targetServiceId, entityId, + TbSubscriptionUtils.notificationsSubUpdateToProto(entityId, subscriptionUpdate)); + } } private static long getLongValue(KvEntry kve) { @@ -639,4 +378,31 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } } + private static List getSubList(List ts, Set keys) { + List update = null; + for (T entry : ts) { + if (keys.contains(entry.getKey())) { + if (update == null) { + update = new ArrayList<>(ts.size()); + } + update.add(entry); + } + } + return update; + } + + private TbEntityUpdatesInfo getEntityUpdatesInfo(EntityId entityId) { + return entityUpdates.computeIfAbsent(entityId, id -> new TbEntityUpdatesInfo(initTs)); + } + + private void cleanupEntityUpdates() { + initTs = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); + int sizeBeforeCleanup = entityUpdates.size(); + entityUpdates.entrySet().removeIf(kv -> { + var v = kv.getValue(); + return initTs > v.attributesUpdateTs && initTs > v.timeSeriesUpdateTs; + }); + log.info("Removed {} old entity update records.", entityUpdates.size() - sizeBeforeCleanup); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 4b37ba26b5..b45450cec8 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -641,7 +641,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); - //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. + //Fetch the latest values for telemetry keys in case they are not copied from NoSQL to SQL DB in hybrid mode. if (!tsInSqlDB) { log.trace("[{}][{}] Going to fetch missing latest values: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); List allTsKeys = latestCmd.getKeys().stream() diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 64f1de8662..f15a25fe4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -16,85 +16,97 @@ package org.thingsboard.server.service.subscription; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.alarm.AlarmInfo; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.TbApplicationEventListener; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; @Slf4j @TbCoreComponent @Service public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionService { - private final Set currentPartitions = ConcurrentHashMap.newKeySet(); - private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); + private final ConcurrentMap>> subscriptionsBySessionId = new ConcurrentHashMap<>(); + private final ConcurrentMap subscriptionsByEntityId = new ConcurrentHashMap<>(); + private final ConcurrentMap entityUpdates = new ConcurrentHashMap<>(); - @Autowired - private PartitionService partitionService; + private final AttributesService attrService; + private final TimeseriesService tsService; + private final TbServiceInfoProvider serviceInfoProvider; + private final PartitionService partitionService; + private final TbClusterService clusterService; + private final SubscriptionManagerService subscriptionManagerService; - @Autowired - private TbClusterService clusterService; + private ExecutorService tsCallBackExecutor; - @Autowired - @Lazy - private SubscriptionManagerService subscriptionManagerService; + public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, + PartitionService partitionService, TbClusterService clusterService, + @Lazy SubscriptionManagerService subscriptionManagerService) { + this.attrService = attrService; + this.tsService = tsService; + this.serviceInfoProvider = serviceInfoProvider; + this.partitionService = partitionService; + this.clusterService = clusterService; + this.subscriptionManagerService = subscriptionManagerService; + } + private String serviceId; private ExecutorService subscriptionUpdateExecutor; - private TbApplicationEventListener partitionChangeListener = new TbApplicationEventListener<>() { - @Override - protected void onTbApplicationEvent(PartitionChangeEvent event) { - if (ServiceType.TB_CORE.equals(event.getServiceType())) { - currentPartitions.clear(); - currentPartitions.addAll(event.getPartitions()); - } - } - }; - - private TbApplicationEventListener clusterTopologyChangeListener = new TbApplicationEventListener<>() { - @Override - protected void onTbApplicationEvent(ClusterTopologyChangeEvent event) { - if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getType()))) { - /* - * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again. - * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart. - * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically - * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService. - * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache - * Since number of subscriptions is usually much less then number of devices that are pushing data. - */ - subscriptionsBySessionId.values().forEach(map -> map.values() - .forEach(sub -> pushSubscriptionToManagerService(sub, true))); - } - } - }; + private final Lock subsLock = new ReentrantLock(); @PostConstruct public void initExecutor() { subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); + serviceId = serviceInfoProvider.getServiceId(); } @PreDestroy @@ -102,79 +114,78 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer if (subscriptionUpdateExecutor != null) { subscriptionUpdateExecutor.shutdownNow(); } - } - - @Override - @EventListener(PartitionChangeEvent.class) - public void onApplicationEvent(PartitionChangeEvent event) { - partitionChangeListener.onApplicationEvent(event); + if (tsCallBackExecutor != null) { + tsCallBackExecutor.shutdownNow(); + } } @Override @EventListener(ClusterTopologyChangeEvent.class) public void onApplicationEvent(ClusterTopologyChangeEvent event) { - clusterTopologyChangeListener.onApplicationEvent(event); + if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getType()))) { + /* + * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again. + * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart. + * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically + * It is also cheaper than caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService. + * Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache + * Since number of subscriptions is usually much less than number of devices that are pushing data. + */ + subscriptionsByEntityId.values().forEach(sub -> pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED))); + } } - //TODO 3.1: replace null callbacks with callbacks from websocket service. @Override - public void addSubscription(TbSubscription subscription) { - pushSubscriptionToManagerService(subscription, true); - registerSubscription(subscription); + public void onCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg) { + subscriptionUpdateExecutor.submit(() -> { + Set partitions = new HashSet<>(coreStartupMsg.getPartitionsList()); + AtomicInteger counter = new AtomicInteger(); + subscriptionsByEntityId.values().forEach(sub -> { + var tpi = partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()); + if (!tpi.isMyPartition() && partitions.contains(tpi.getPartition().orElse(Integer.MAX_VALUE))) { + pushToQueue(sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED), tpi); + counter.incrementAndGet(); + } + }); + log.info("[{}] Pushed {} subscriptions to [{}]", serviceId, counter.get(), coreStartupMsg.getServiceId()); + }); } - private void pushSubscriptionToManagerService(TbSubscription subscription, boolean pushToLocalService) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); - if (currentPartitions.contains(tpi)) { - // Subscription is managed on the same server; - if (pushToLocalService) { - subscriptionManagerService.addSubscription(subscription, TbCallback.EMPTY); - } - } else { - // Push to the queue; - TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription); - clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); - } + @Override + public void addSubscription(TbSubscription subscription) { + TenantId tenantId = subscription.getTenantId(); + EntityId entityId = subscription.getEntityId(); + log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); + Map> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); + sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); + modifySubscription(tenantId, entityId, subscription, true); } @Override - @SuppressWarnings("unchecked") - public void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback) { - TbSubscription subscription = subscriptionsBySessionId - .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); - if (subscription != null) { - switch (subscription.getType()) { - case TIMESERIES: - TbTimeseriesSubscription tsSub = (TbTimeseriesSubscription) subscription; - update.getLatestValues().forEach((key, value) -> tsSub.getKeyStates().put(key, value)); - break; - case ATTRIBUTES: - TbAttributeSubscription attrSub = (TbAttributeSubscription) subscription; - update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value)); - break; - } - subscriptionUpdateExecutor.submit(() -> subscription.getUpdateProcessor().accept(subscription, update)); - } - callback.onSuccess(); + public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) { + UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB()); + onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); } @Override - @SuppressWarnings("unchecked") - public void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback) { - TbSubscription subscription = subscriptionsBySessionId - .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); - if (subscription != null && subscription.getType() == TbSubscriptionType.ALARMS) { - subscriptionUpdateExecutor.submit(() -> subscription.getUpdateProcessor().accept(subscription, update)); - } - callback.onSuccess(); + public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback); } - @Override - public void onSubscriptionUpdate(String sessionId, int subscriptionId, NotificationsSubscriptionUpdate update, TbCallback callback) { - TbSubscription subscription = subscriptionsBySessionId.getOrDefault(sessionId, Collections.emptyMap()).get(subscriptionId); - if (subscription != null && (subscription.getType() == TbSubscriptionType.NOTIFICATIONS - || subscription.getType() == TbSubscriptionType.NOTIFICATIONS_COUNT)) { - subscriptionUpdateExecutor.submit(() -> subscription.getUpdateProcessor().accept(subscription, update)); + public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + entityUpdates.put(entityId, entityUpdatesInfo); + Set> pendingSubs = null; + subsLock.lock(); + try { + TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId); + if (entitySubs != null) { + pendingSubs = entitySubs.clearPendingSubscriptions(seqNumber); + } + } finally { + subsLock.unlock(); + } + if (pendingSubs != null) { + pendingSubs.forEach(this::checkMissedUpdates); } callback.onSuccess(); } @@ -182,22 +193,14 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void cancelSubscription(String sessionId, int subscriptionId) { log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); - Map sessionSubscriptions = subscriptionsBySessionId.get(sessionId); + Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); if (sessionSubscriptions != null) { - TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); + TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); if (subscription != null) { if (sessionSubscriptions.isEmpty()) { subscriptionsBySessionId.remove(sessionId); } - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); - if (currentPartitions.contains(tpi)) { - // Subscription is managed on the same server; - subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbCallback.EMPTY); - } else { - // Push to the queue; - TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription); - clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); - } + modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); } else { log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); } @@ -208,16 +211,301 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void cancelAllSessionSubscriptions(String sessionId) { - Map subscriptions = subscriptionsBySessionId.get(sessionId); - if (subscriptions != null) { - Set toRemove = new HashSet<>(subscriptions.keySet()); - toRemove.forEach(id -> cancelSubscription(sessionId, id)); + log.debug("[{}] Going to remove session subscriptions.", sessionId); + Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); + if (sessionSubscriptions != null) { + for (TbSubscription subscription : sessionSubscriptions.values()) { + modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); + } + } else { + log.debug("[{}] No session subscriptions found!", sessionId); } } - private void registerSubscription(TbSubscription subscription) { - Map sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); - sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); + @Override + public void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto proto, TbCallback callback) { + //TODO: optimize to avoid re-wrapping from TsValueListProto -> List -> Map>. Low priority. + onTimeSeriesUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onTimeSeriesUpdate(EntityId entityId, List data, TbCallback callback) { + onTimeSeriesUpdate(entityId.getId(), data, callback); + } + + private void onTimeSeriesUpdate(UUID entityId, List data, TbCallback callback) { + entityUpdates.get(entityId).timeSeriesUpdateTs = System.currentTimeMillis(); + processSubscriptionData(entityId, + sub -> TbSubscriptionType.TIMESERIES.equals(sub.getType()), + s -> { + TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s; + List updateData = null; + if (sub.isAllKeys()) { + updateData = data; + } else { + for (TsKvEntry kv : data) { + if (sub.getKeyStates().containsKey((kv.getKey()))) { + if (updateData == null) { + updateData = new ArrayList<>(); + } + updateData.add(kv); + } + } + } + if (updateData != null) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), updateData); + update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put(key, value)); + subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, update)); + } + }, callback); + } + + @Override + public void onAttributesUpdate(TransportProtos.TbSubUpdateProto proto, TbCallback callback) { + onAttributesUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onAttributesUpdate(EntityId entityId, List data, TbCallback callback) { + onAttributesUpdate(entityId.getId(), data, callback); + } + + private void onAttributesUpdate(UUID entityId, List data, TbCallback callback) { + entityUpdates.get(entityId).attributesUpdateTs = System.currentTimeMillis(); + processSubscriptionData(entityId, + sub -> TbSubscriptionType.ATTRIBUTES.equals(sub.getType()), + s -> { + TbAttributeSubscription sub = (TbAttributeSubscription) s; + List updateData = null; + if (sub.isAllKeys()) { + updateData = data; + } else { + for (TsKvEntry kv : data) { + if (sub.getKeyStates().containsKey((kv.getKey()))) { + if (updateData == null) { + updateData = new ArrayList<>(); + } + updateData.add(kv); + } + } + } + if (updateData != null) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), updateData); + update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put(key, value)); + subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, update)); + } + }, callback); + } + + @Override + public void onAlarmUpdate(TransportProtos.TbAlarmSubUpdateProto proto, TbCallback callback) { + onAlarmUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onAlarmUpdate(EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback) { + onAlarmUpdate(entityId.getId(), new AlarmSubscriptionUpdate(alarm, deleted), callback); + } + + private void onAlarmUpdate(UUID entityId, AlarmSubscriptionUpdate update, TbCallback callback) { + processSubscriptionData(entityId, + sub -> TbSubscriptionType.ALARMS.equals(sub.getType()), + update, callback); + } + + @Override + public void onNotificationUpdate(TransportProtos.NotificationsSubUpdateProto proto, TbCallback callback) { + onNotificationUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onNotificationUpdate(EntityId entityId, NotificationsSubscriptionUpdate update, TbCallback callback) { + onNotificationUpdate(entityId.getId(), update, callback); + } + + private void onNotificationUpdate(UUID entityId, NotificationsSubscriptionUpdate update, TbCallback callback) { + processSubscriptionData(entityId, + sub -> TbSubscriptionType.NOTIFICATIONS.equals(sub.getType()) || TbSubscriptionType.NOTIFICATIONS_COUNT.equals(sub.getType()), + update, callback); + } + + @Override + @SuppressWarnings("unchecked") + public void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update, TbCallback callback) { + log.trace("[{}] Received notification request update: {}", tenantId, update); + NotificationsSubscriptionUpdate theUpdate = new NotificationsSubscriptionUpdate(update); + subscriptionsByEntityId.values().forEach(subInfo -> { + if (subInfo.isNf() && tenantId.equals(subInfo.getTenantId()) && EntityType.USER.equals(subInfo.getEntityId().getEntityType())) { + subInfo.getSubs().forEach(s -> { + TbSubscription sub = (TbSubscription) s; + subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, theUpdate)); + }); + } + }); + callback.onSuccess(); + } + + @SuppressWarnings("unchecked") + private void processSubscriptionData(UUID entityId, + Predicate> filter, + T data, + TbCallback callback) { + log.trace("[{}] Received subscription data: {}", entityId, data); + var subs = subscriptionsByEntityId.get(entityId); + if (subs != null) { + subs.getSubs().forEach(s -> { + if (filter.test(s)) { + subscriptionUpdateExecutor.submit(() -> { + TbSubscription sub = (TbSubscription) s; + sub.getUpdateProcessor().accept(sub, data); + }); + } + }); + } + callback.onSuccess(); + } + + private void processSubscriptionData(UUID entityId, + Predicate> filter, + Consumer> processor, + TbCallback callback) { + var subs = subscriptionsByEntityId.get(entityId); + if (subs != null) { + subs.getSubs().forEach(s -> { + if (filter.test(s)) { + processor.accept(s); + } + }); + } + callback.onSuccess(); + } + + private void modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription subscription, boolean add) { + TbSubscription missedUpdatesCandidate = null; + TbEntitySubEvent event; + subsLock.lock(); + try { + TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.computeIfAbsent(entityId.getId(), id -> new TbEntityLocalSubsInfo(tenantId, entityId)); + event = add ? entitySubs.add(subscription) : entitySubs.remove(subscription); + if (entitySubs.isEmpty()) { + subscriptionsByEntityId.remove(entityId.getId()); + entityUpdates.remove(entityId.getId()); + } else if (add) { + missedUpdatesCandidate = entitySubs.registerPendingSubscription(subscription, event); + } + } finally { + subsLock.unlock(); + } + if (event != null) { + log.trace("[{}][{}][{}] Event: {}", tenantId, entityId, subscription.getSubscriptionId(), event); + pushSubEventToManagerService(tenantId, entityId, event); + if (missedUpdatesCandidate != null) { + checkMissedUpdates(missedUpdatesCandidate); + } + } else { + log.trace("[{}][{}][{}] No changes detected.", tenantId, entityId, subscription.getSubscriptionId()); + } + } + + private void pushSubEventToManagerService(TenantId tenantId, EntityId entityId, TbEntitySubEvent event) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (tpi.isMyPartition()) { + // Subscription is managed on the same server; + subscriptionManagerService.onSubEvent(serviceId, event, TbCallback.EMPTY); + } else { + pushToQueue(entityId, event, tpi); + } + } + + private void pushToQueue(EntityId entityId, TbEntitySubEvent event, TopicPartitionInfo tpi) { + clusterService.pushMsgToCore(tpi, entityId.getId(), TbSubscriptionUtils.toSubEventProto(serviceId, event), null); + } + + private void checkMissedUpdates(TbSubscription subscription) { + log.trace("[{}][{}][{}] Check missed updates for subscription: {}", + subscription.getTenantId(), subscription.getEntityId(), subscription.getSubscriptionId(), subscription); + switch (subscription.getType()) { + case TIMESERIES: + handleNewTelemetrySubscription((TbTimeSeriesSubscription) subscription); + break; + case ATTRIBUTES: + handleNewAttributeSubscription((TbAttributeSubscription) subscription); + break; + } + } + + private void handleNewAttributeSubscription(TbAttributeSubscription subscription) { + log.trace("[{}][{}][{}] Processing attribute subscription for entity [{}]", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); + var entityUpdateInfo = entityUpdates.get(subscription.getEntityId().getId()); + if (entityUpdateInfo != null && entityUpdateInfo.attributesUpdateTs > 0 && subscription.getQueryTs() > entityUpdateInfo.attributesUpdateTs) { + log.trace("[{}][{}][{}] No need to check for missed updates [{}]", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); + return; + } + final Map keyStates = subscription.getKeyStates(); + DonAsynchron.withCallback(attrService.find(subscription.getTenantId(), subscription.getEntityId(), DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> { + List updates = new ArrayList<>(); + values.forEach(latestEntry -> { + if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) { + updates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); + } + }); + var missedUpdates = updates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList()); + if (!missedUpdates.isEmpty()) { + onAttributesUpdate(subscription.getEntityId(), missedUpdates, TbCallback.EMPTY); + } + }, + e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor); + } + + private void handleNewTelemetrySubscription(TbTimeSeriesSubscription subscription) { + log.trace("[{}][{}][{}] Processing telemetry subscription for entity [{}]", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); + var entityUpdateInfo = entityUpdates.get(subscription.getEntityId().getId()); + if (entityUpdateInfo != null && entityUpdateInfo.timeSeriesUpdateTs > 0 && subscription.getQueryTs() > entityUpdateInfo.timeSeriesUpdateTs) { + log.trace("[{}][{}][{}] No need to check for missed updates. time [{}][{}] diff: {}ms", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getQueryTs(), entityUpdateInfo.timeSeriesUpdateTs, subscription.getQueryTs() - entityUpdateInfo.timeSeriesUpdateTs); + return; + } + + long curTs = System.currentTimeMillis(); + + if (subscription.isLatestValues()) { + DonAsynchron.withCallback(tsService.findLatest(subscription.getTenantId(), subscription.getEntityId(), subscription.getKeyStates().keySet()), + missedUpdates -> { + if (missedUpdates != null && !missedUpdates.isEmpty()) { + missedUpdates = missedUpdates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList()); + if (!missedUpdates.isEmpty()) { + onTimeSeriesUpdate(subscription.getEntityId(), missedUpdates, TbCallback.EMPTY); + } + } + }, + e -> log.error("Failed to fetch missed updates.", e), + tsCallBackExecutor); + } else { + List queries = new ArrayList<>(); + subscription.getKeyStates().forEach((key, value) -> { + if (curTs > value) { + long startTs = subscription.getStartTime() > 0 ? Math.max(subscription.getStartTime(), value + 1L) : (value + 1L); + long endTs = subscription.getEndTime() > 0 ? Math.min(subscription.getEndTime(), curTs) : curTs; + queries.add(new BaseReadTsKvQuery(key, startTs, endTs, 0, 1000, Aggregation.NONE)); + } + }); + if (!queries.isEmpty()) { + DonAsynchron.withCallback(tsService.findAll(subscription.getTenantId(), subscription.getEntityId(), queries), + missedUpdates -> { + if (missedUpdates != null && !missedUpdates.isEmpty()) { + missedUpdates = missedUpdates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList()); + if (!missedUpdates.isEmpty()) { + onTimeSeriesUpdate(subscription.getEntityId(), missedUpdates, TbCallback.EMPTY); + } + } + }, + e -> log.error("Failed to fetch missed updates.", e), + tsCallBackExecutor); + } + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index 736a251dff..4f6d16e6c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.subscription; import org.springframework.context.ApplicationListener; +import org.springframework.context.event.EventListener; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -23,6 +24,8 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; @@ -31,9 +34,9 @@ import java.util.List; public interface SubscriptionManagerService extends ApplicationListener { - void addSubscription(TbSubscription subscription, TbCallback callback); + void onSubEvent(String serviceId, TbEntitySubEvent event, TbCallback empty); - void cancelSubscription(String sessionId, int subscriptionId, TbCallback callback); + void onApplicationEvent(OtherServiceShutdownEvent event); void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbCallback callback); @@ -51,6 +54,4 @@ public interface SubscriptionManagerService extends ApplicationListener scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index 3f11679849..f02a466994 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -186,6 +186,7 @@ public abstract class TbAbstractDataSubCtx sendWsMsg(sub.getSessionId(), subscriptionUpdate, keysType)) + .queryTs(createdTime) .allKeys(false) .keyStates(keyStates) .scope(scope) @@ -207,19 +208,20 @@ public abstract class TbAbstractDataSubCtx keyStates) { + private TbTimeSeriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates) { return createTsSub(entityData, subIdx, latestValues, startTs, endTs, keyStates, latestValues); } - private TbTimeseriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates, boolean resultToLatestValues) { + private TbTimeSeriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates, boolean resultToLatestValues) { log.trace("[{}][{}][{}] Creating time-series subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates); - return TbTimeseriesSubscription.builder() + return TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionRef.getSessionId()) .subscriptionId(subIdx) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityData.getEntityId()) .updateProcessor((sub, subscriptionUpdate) -> sendWsMsg(sub.getSessionId(), subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues)) + .queryTs(createdTime) .allKeys(false) .keyStates(keyStates) .latestValues(latestValues) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java index 7eba28bccd..102431f63b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java @@ -78,11 +78,14 @@ public abstract class TbAbstractSubCtx { @Setter protected volatile ScheduledFuture refreshTask; protected volatile boolean stopped; + @Getter + protected long createdTime; public TbAbstractSubCtx(String serviceId, WebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, SubscriptionServiceStatistics stats, WebSocketSessionRef sessionRef, int cmdId) { + this.createdTime = System.currentTimeMillis(); this.serviceId = serviceId; this.wsService = wsService; this.entityService = entityService; @@ -142,6 +145,7 @@ public abstract class TbAbstractSubCtx { .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) .updateProcessor((subscription, subscriptionUpdate) -> dynamicValueSubUpdate(subscription.getSessionId(), subscriptionUpdate, dynamicValueKeySubMap)) + .queryTs(createdTime) .allKeys(false) .keyStates(keyStates) .scope(TbAttributeSubscriptionScope.SERVER_SCOPE) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java index 3746758caa..2392899dc7 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java @@ -26,6 +26,7 @@ import java.util.function.BiConsumer; public class TbAttributeSubscription extends TbSubscription { + @Getter private final long queryTs; @Getter private final boolean allKeys; @Getter private final Map keyStates; @Getter private final TbAttributeSubscriptionScope scope; @@ -33,8 +34,9 @@ public class TbAttributeSubscription extends TbSubscription, TelemetrySubscriptionUpdate> updateProcessor, - boolean allKeys, Map keyStates, TbAttributeSubscriptionScope scope) { + long queryTs, boolean allKeys, Map keyStates, TbAttributeSubscriptionScope scope) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES, updateProcessor); + this.queryTs = queryTs; this.allKeys = allKeys; this.keyStates = keyStates; this.scope = scope; diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java new file mode 100644 index 0000000000..96ea628de2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java @@ -0,0 +1,245 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Information about the local websocket subscriptions. + */ +@Slf4j +@RequiredArgsConstructor +public class TbEntityLocalSubsInfo { + + @Getter + private final TenantId tenantId; + @Getter + private final EntityId entityId; + @Getter + private final Lock lock = new ReentrantLock(); + @Getter + private final Set> subs = ConcurrentHashMap.newKeySet(); + private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo(); + + private final Map>> pendingSubs = new ConcurrentHashMap<>(); + @Getter + @Setter + private int pendingTimeSeriesEvent; + @Getter + @Setter + private long pendingTimeSeriesEventTs; + @Getter + @Setter + private int pendingAttributesEvent; + @Getter + @Setter + private long pendingAttributesEventTs; + + private int seqNumber = 0; + + public TbEntitySubEvent add(TbSubscription subscription) { + log.trace("[{}][{}][{}] Adding: {}", tenantId, entityId, subscription.getSubscriptionId(), subscription); + boolean created = subs.isEmpty(); + subs.add(subscription); + TbSubscriptionsInfo newState = created ? state : state.copy(); + boolean stateChanged = false; + switch (subscription.getType()) { + case NOTIFICATIONS: + case NOTIFICATIONS_COUNT: + if (!newState.notifications) { + newState.notifications = true; + stateChanged = true; + } + break; + case ALARMS: + if (!newState.alarms) { + newState.alarms = true; + stateChanged = true; + } + break; + case ATTRIBUTES: + var attrSub = (TbAttributeSubscription) subscription; + if (!newState.attrAllKeys) { + if (attrSub.isAllKeys()) { + newState.attrAllKeys = true; + stateChanged = true; + } else { + if (newState.attrKeys == null) { + newState.attrKeys = new HashSet<>(attrSub.getKeyStates().keySet()); + stateChanged = true; + } else if (newState.attrKeys.addAll(attrSub.getKeyStates().keySet())) { + stateChanged = true; + } + } + } + break; + case TIMESERIES: + var tsSub = (TbTimeSeriesSubscription) subscription; + if (!newState.tsAllKeys) { + if (tsSub.isAllKeys()) { + newState.tsAllKeys = true; + stateChanged = true; + } else { + if (newState.tsKeys == null) { + newState.tsKeys = new HashSet<>(tsSub.getKeyStates().keySet()); + stateChanged = true; + } else if (newState.tsKeys.addAll(tsSub.getKeyStates().keySet())) { + stateChanged = true; + } + } + } + break; + } + if (stateChanged) { + state = newState; + } + if (created) { + return toEvent(ComponentLifecycleEvent.CREATED); + } else if (stateChanged) { + return toEvent(ComponentLifecycleEvent.UPDATED); + } else { + return null; + } + } + + public TbEntitySubEvent remove(TbSubscription sub) { + log.trace("[{}][{}][{}] Removing: {}", tenantId, entityId, sub.getSubscriptionId(), sub); + if (!subs.remove(sub)) { + return null; + } + if (subs.isEmpty()) { + return toEvent(ComponentLifecycleEvent.DELETED); + } + TbSubscriptionsInfo oldState = state.copy(); + TbSubscriptionsInfo newState = new TbSubscriptionsInfo(); + for (TbSubscription subscription : subs) { + switch (subscription.getType()) { + case NOTIFICATIONS: + case NOTIFICATIONS_COUNT: + if (!newState.notifications) { + newState.notifications = true; + } + break; + case ALARMS: + if (!newState.alarms) { + newState.alarms = true; + } + break; + case ATTRIBUTES: + var attrSub = (TbAttributeSubscription) subscription; + if (!newState.attrAllKeys && attrSub.isAllKeys()) { + newState.attrAllKeys = true; + continue; + } + if (newState.attrKeys == null) { + newState.attrKeys = new HashSet<>(attrSub.getKeyStates().keySet()); + } else { + newState.attrKeys.addAll(attrSub.getKeyStates().keySet()); + } + break; + case TIMESERIES: + var tsSub = (TbTimeSeriesSubscription) subscription; + if (!newState.tsAllKeys && tsSub.isAllKeys()) { + newState.tsAllKeys = true; + continue; + } + if (newState.tsKeys == null) { + newState.tsKeys = new HashSet<>(tsSub.getKeyStates().keySet()); + } else { + newState.tsKeys.addAll(tsSub.getKeyStates().keySet()); + } + break; + } + } + if (newState.equals(oldState)) { + return null; + } else { + this.state = newState; + return toEvent(ComponentLifecycleEvent.UPDATED); + } + } + + public TbEntitySubEvent toEvent(ComponentLifecycleEvent type) { + seqNumber++; + var result = TbEntitySubEvent.builder().tenantId(tenantId).entityId(entityId).type(type).seqNumber(seqNumber); + if (!ComponentLifecycleEvent.DELETED.equals(type)) { + result.info(state.copy(seqNumber)); + } + return result.build(); + } + + public boolean isNf() { + return state.notifications; + } + + + public boolean isEmpty() { + return state.isEmpty(); + } + + public TbSubscription registerPendingSubscription(TbSubscription subscription, TbEntitySubEvent event) { + if (TbSubscriptionType.ATTRIBUTES.equals(subscription.getType())) { + if (event != null) { + log.trace("[{}][{}] Registering new pending attributes subscription event: {} for subscription: {}", tenantId, entityId, event.getSeqNumber(), subscription.getSubscriptionId()); + pendingAttributesEvent = event.getSeqNumber(); + pendingAttributesEventTs = System.currentTimeMillis(); + pendingSubs.computeIfAbsent(pendingAttributesEvent, e -> new HashSet<>()).add(subscription); + } else if (pendingAttributesEvent > 0) { + log.trace("[{}][{}] Registering pending attributes subscription {} for event: {} ", tenantId, entityId, subscription.getSubscriptionId(), pendingAttributesEvent); + pendingSubs.computeIfAbsent(pendingAttributesEvent, e -> new HashSet<>()).add(subscription); + } else { + return subscription; + } + } else if (subscription instanceof TbTimeSeriesSubscription) { + if (event != null) { + log.trace("[{}][{}] Registering new pending time-series subscription event: {} for subscription: {}", tenantId, entityId, event.getSeqNumber(), subscription.getSubscriptionId()); + pendingTimeSeriesEvent = event.getSeqNumber(); + pendingTimeSeriesEventTs = System.currentTimeMillis(); + pendingSubs.computeIfAbsent(pendingTimeSeriesEvent, e -> new HashSet<>()).add(subscription); + } else if (pendingTimeSeriesEvent > 0) { + log.trace("[{}][{}] Registering pending time-series subscription {} for event: {} ", tenantId, entityId, subscription.getSubscriptionId(), pendingTimeSeriesEvent); + pendingSubs.computeIfAbsent(pendingTimeSeriesEvent, e -> new HashSet<>()).add(subscription); + } else { + return subscription; + } + } + return null; + } + + public Set> clearPendingSubscriptions(int seqNumber) { + if (pendingTimeSeriesEvent == seqNumber) { + pendingTimeSeriesEvent = 0; + pendingTimeSeriesEventTs = 0L; + } else if (pendingAttributesEvent == seqNumber) { + pendingAttributesEvent = 0; + pendingAttributesEventTs = 0L; + } + return pendingSubs.remove(seqNumber); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java new file mode 100644 index 0000000000..df66b060da --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.util.Pair; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Information about the local websocket subscriptions. + */ +@RequiredArgsConstructor +@Slf4j +public class TbEntityRemoteSubsInfo { + @Getter + private final TenantId tenantId; + @Getter + private final EntityId entityId; + @Getter + private final Map subs = new ConcurrentHashMap<>(); // By service ID + + public boolean updateAndCheckIsEmpty(String serviceId, TbEntitySubEvent event) { + var current = subs.get(serviceId); + if (current != null && current.seqNumber > event.getSeqNumber()) { + log.warn("[{}][{}] Duplicate subscription event received. Current: {}, Event: {}", + tenantId, entityId, current, event.getInfo()); + return false; + } + switch (event.getType()) { + case CREATED: + subs.put(serviceId, event.getInfo()); + break; + case UPDATED: + var newSubInfo = event.getInfo(); + if (newSubInfo.isEmpty()) { + subs.remove(serviceId); + return isEmpty(); + } else { + subs.put(serviceId, newSubInfo); + } + break; + case DELETED: + subs.remove(serviceId); + return isEmpty(); + } + return false; + } + + public boolean removeAndCheckIsEmpty(String serviceId) { + if (subs.remove(serviceId) != null) { + return subs.isEmpty(); + } else { + return false; + } + } + + public boolean isEmpty() { + return subs.isEmpty(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java new file mode 100644 index 0000000000..7e2956b5f0 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; + +import java.util.Set; + +/** + * Information about the local websocket subscriptions. + */ +@Builder +@Data +public class TbEntitySubEvent { + + private final TenantId tenantId; + private final EntityId entityId; + private final ComponentLifecycleEvent type; + private final TbSubscriptionsInfo info; + private final int seqNumber; + + public boolean hasTsOrAttrSub() { + return info != null && (info.tsAllKeys || info.attrAllKeys || info.tsKeys != null || info.attrKeys != null); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java new file mode 100644 index 0000000000..a09da220d5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class TbEntityUpdatesInfo { + + volatile long attributesUpdateTs; + volatile long timeSeriesUpdateTs; + + public TbEntityUpdatesInfo(long ts) { + this.attributesUpdateTs = ts; + this.timeSeriesUpdateTs = ts; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java index 7685c8fbed..d9fc87f51b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java @@ -15,28 +15,54 @@ */ package org.thingsboard.server.service.subscription; +import org.thingsboard.server.common.data.alarm.AlarmInfo; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; +import java.util.List; + public interface TbLocalSubscriptionService { - void addSubscription(TbSubscription subscription); + void addSubscription(TbSubscription subscription); + + void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); + + void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); void cancelSubscription(String sessionId, int subscriptionId); void cancelAllSessionSubscriptions(String sessionId); - void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback); + void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback); + + void onTimeSeriesUpdate(EntityId entityId, List update, TbCallback callback); + + void onAttributesUpdate(TransportProtos.TbSubUpdateProto attrUpdate, TbCallback callback); - void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback); + void onAttributesUpdate(EntityId entityId, List update, TbCallback callback); - void onSubscriptionUpdate(String sessionId, int subscriptionId, NotificationsSubscriptionUpdate update, TbCallback callback); + void onAlarmUpdate(EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback); - void onApplicationEvent(PartitionChangeEvent event); + void onAlarmUpdate(TransportProtos.TbAlarmSubUpdateProto update, TbCallback callback); + + void onNotificationUpdate(EntityId entityId, NotificationsSubscriptionUpdate subscriptionUpdate, TbCallback callback); void onApplicationEvent(ClusterTopologyChangeEvent event); + + void onCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg); + + void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update, TbCallback callback); + + void onNotificationUpdate(TransportProtos.NotificationsSubUpdateProto notificationsUpdate, TbCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java index dfad6a41de..1cda590a1f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java @@ -33,7 +33,7 @@ public abstract class TbSubscription { private final TenantId tenantId; private final EntityId entityId; private final TbSubscriptionType type; - private final BiConsumer, T> updateProcessor; + private final BiConsumer, T> updateProcessor; @Override public boolean equals(Object o) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 8905b1b3ad..2fd49f0645 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.subscription; +import org.apache.commons.lang3.StringUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.id.EntityId; @@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; @@ -39,29 +41,20 @@ import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgPr import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue; +import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; -import org.thingsboard.server.service.ws.notification.sub.NotificationsCountSubscription; -import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscription; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; -import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -69,172 +62,118 @@ import java.util.UUID; public class TbSubscriptionUtils { - public static ToCoreMsg toNewSubscriptionProto(TbSubscription subscription) { + public static ToCoreMsg toSubEventProto(String serviceId, TbEntitySubEvent event) { SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); - TbSubscriptionProto subscriptionProto = TbSubscriptionProto.newBuilder() - .setServiceId(subscription.getServiceId()) - .setSessionId(subscription.getSessionId()) - .setSubscriptionId(subscription.getSubscriptionId()) - .setTenantIdMSB(subscription.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(subscription.getTenantId().getId().getLeastSignificantBits()) - .setEntityType(subscription.getEntityId().getEntityType().name()) - .setEntityIdMSB(subscription.getEntityId().getId().getMostSignificantBits()) - .setEntityIdLSB(subscription.getEntityId().getId().getLeastSignificantBits()).build(); - - switch (subscription.getType()) { - case TIMESERIES: - TbTimeseriesSubscription tSub = (TbTimeseriesSubscription) subscription; - TbTimeSeriesSubscriptionProto.Builder tSubProto = TbTimeSeriesSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setAllKeys(tSub.isAllKeys()); - tSub.getKeyStates().forEach((key, value) -> tSubProto.addKeyStates( - TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build())); - tSubProto.setStartTime(tSub.getStartTime()); - tSubProto.setEndTime(tSub.getEndTime()); - tSubProto.setLatestValues(tSub.isLatestValues()); - msgBuilder.setTelemetrySub(tSubProto.build()); - break; - case ATTRIBUTES: - TbAttributeSubscription aSub = (TbAttributeSubscription) subscription; - TbAttributeSubscriptionProto.Builder aSubProto = TbAttributeSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setAllKeys(aSub.isAllKeys()) - .setScope(aSub.getScope().name()); - aSub.getKeyStates().forEach((key, value) -> aSubProto.addKeyStates( - TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build())); - msgBuilder.setAttributeSub(aSubProto.build()); - break; - case ALARMS: - TbAlarmsSubscription alarmSub = (TbAlarmsSubscription) subscription; - TransportProtos.TbAlarmSubscriptionProto.Builder alarmSubProto = TransportProtos.TbAlarmSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setTs(alarmSub.getTs()); - msgBuilder.setAlarmSub(alarmSubProto.build()); - break; - case NOTIFICATIONS: - NotificationsSubscription notificationsSub = (NotificationsSubscription) subscription; - msgBuilder.setNotificationsSub(TransportProtos.NotificationsSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setLimit(notificationsSub.getLimit())); - break; - case NOTIFICATIONS_COUNT: - NotificationsCountSubscription notificationsCountSub = (NotificationsCountSubscription) subscription; - msgBuilder.setNotificationsCountSub(TransportProtos.NotificationsCountSubscriptionProto.newBuilder() - .setSub(subscriptionProto)); - break; + var builder = TbEntitySubEventProto.newBuilder() + .setServiceId(serviceId) + .setSeqNumber(event.getSeqNumber()) + .setTenantIdMSB(event.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(event.getTenantId().getId().getLeastSignificantBits()) + .setEntityType(event.getEntityId().getEntityType().name()) + .setEntityIdMSB(event.getEntityId().getId().getMostSignificantBits()) + .setEntityIdLSB(event.getEntityId().getId().getLeastSignificantBits()) + .setType(event.getType().name()); + TbSubscriptionsInfo info = event.getInfo(); + if (info != null) { + builder.setNotifications(info.notifications) + .setAlarms(info.alarms) + .setTsAllKeys(info.tsAllKeys) + .setAttrAllKeys(info.attrAllKeys); + if (info.tsKeys != null) { + builder.addAllTsKeys(info.tsKeys); + } + if (info.attrKeys != null) { + builder.addAllAttrKeys(info.attrKeys); + } } - return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); + msgBuilder.setSubEvent(builder); + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build(); } - public static ToCoreMsg toCloseSubscriptionProto(TbSubscription subscription) { - SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); - TbSubscriptionCloseProto closeProto = TbSubscriptionCloseProto.newBuilder() - .setSessionId(subscription.getSessionId()) - .setSubscriptionId(subscription.getSubscriptionId()).build(); - msgBuilder.setSubClose(closeProto); - return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); + public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) { + TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder() + .setEntityIdMSB(id.getMostSignificantBits()) + .setEntityIdLSB(id.getLeastSignificantBits()) + .setSeqNumber(seqNumber) + .setAttributesUpdateTs(update.attributesUpdateTs) + .setTimeSeriesUpdateTs(update.timeSeriesUpdateTs); + return ToCoreNotificationMsg.newBuilder() + .setToLocalSubscriptionServiceMsg( + TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() + .setSubEventCallback(updateProto) + .build()) + .build(); } - public static TbSubscription fromProto(TbAttributeSubscriptionProto attributeSub) { - TbSubscriptionProto subProto = attributeSub.getSub(); - TbAttributeSubscription.TbAttributeSubscriptionBuilder builder = TbAttributeSubscription.builder() - .serviceId(subProto.getServiceId()) - .sessionId(subProto.getSessionId()) - .subscriptionId(subProto.getSubscriptionId()) - .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB()))) - .tenantId(TenantId.fromUUID(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB()))); - - builder.scope(TbAttributeSubscriptionScope.valueOf(attributeSub.getScope())); - builder.allKeys(attributeSub.getAllKeys()); - Map keyStates = new HashMap<>(); - attributeSub.getKeyStatesList().forEach(ksProto -> keyStates.put(ksProto.getKey(), ksProto.getTs())); - builder.keyStates(keyStates); - return builder.build(); - } - public static TbSubscription fromProto(TbTimeSeriesSubscriptionProto telemetrySub) { - TbSubscriptionProto subProto = telemetrySub.getSub(); - TbTimeseriesSubscription.TbTimeseriesSubscriptionBuilder builder = TbTimeseriesSubscription.builder() - .serviceId(subProto.getServiceId()) - .sessionId(subProto.getSessionId()) - .subscriptionId(subProto.getSubscriptionId()) - .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB()))) - .tenantId(TenantId.fromUUID(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB()))); - - builder.allKeys(telemetrySub.getAllKeys()); - Map keyStates = new HashMap<>(); - telemetrySub.getKeyStatesList().forEach(ksProto -> keyStates.put(ksProto.getKey(), ksProto.getTs())); - builder.startTime(telemetrySub.getStartTime()); - builder.endTime(telemetrySub.getEndTime()); - builder.latestValues(telemetrySub.getLatestValues()); - builder.keyStates(keyStates); + public static TbEntitySubEvent fromProto(TbEntitySubEventProto proto) { + ComponentLifecycleEvent event = ComponentLifecycleEvent.valueOf(proto.getType()); + var builder = TbEntitySubEvent.builder() + .tenantId(TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()))) + .seqNumber(proto.getSeqNumber()) + .entityId(EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()))) + .type(event); + if (!ComponentLifecycleEvent.DELETED.equals(event)) { + builder.info(new TbSubscriptionsInfo(proto.getNotifications(), proto.getAlarms(), + proto.getTsAllKeys(), proto.getTsKeysCount() > 0 ? new HashSet<>(proto.getTsKeysList()) : null, + proto.getAttrAllKeys(), proto.getAttrKeysCount() > 0 ? new HashSet<>(proto.getAttrKeysList()) : null, + proto.getSeqNumber())); + } return builder.build(); } - public static TbSubscription fromProto(TransportProtos.TbAlarmSubscriptionProto alarmSub) { - TbSubscriptionProto subProto = alarmSub.getSub(); - TbAlarmsSubscription.TbAlarmsSubscriptionBuilder builder = TbAlarmsSubscription.builder() - .serviceId(subProto.getServiceId()) - .sessionId(subProto.getSessionId()) - .subscriptionId(subProto.getSubscriptionId()) - .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB()))) - .tenantId(TenantId.fromUUID(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB()))); - builder.ts(alarmSub.getTs()); - return builder.build(); + public static AlarmSubscriptionUpdate fromProto(TransportProtos.TbAlarmSubUpdateProto proto) { + if (proto.getErrorCode() > 0) { + return new AlarmSubscriptionUpdate(SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); + } else { + AlarmInfo alarm = JacksonUtil.fromString(proto.getAlarm(), AlarmInfo.class); + return new AlarmSubscriptionUpdate(alarm, proto.getDeleted()); + } } - public static NotificationsSubscription fromProto(TransportProtos.NotificationsSubscriptionProto notificationsSub) { - TbSubscriptionProto sub = notificationsSub.getSub(); - return NotificationsSubscription.builder() - .serviceId(sub.getServiceId()) - .sessionId(sub.getSessionId()) - .subscriptionId(sub.getSubscriptionId()) - .tenantId(TenantId.fromUUID(new UUID(sub.getTenantIdMSB(), sub.getTenantIdLSB()))) - .entityId(EntityIdFactory.getByTypeAndUuid(sub.getEntityType(), new UUID(sub.getEntityIdMSB(), sub.getEntityIdLSB()))) - .limit(notificationsSub.getLimit()) - .build(); + public static NotificationsSubscriptionUpdate fromProto(TransportProtos.NotificationsSubUpdateProto proto) { + NotificationsSubscriptionUpdate update; + if (StringUtils.isNotEmpty(proto.getNotificationUpdate())) { + NotificationUpdate notificationUpdate = JacksonUtil.fromString(proto.getNotificationUpdate(), NotificationUpdate.class); + update = new NotificationsSubscriptionUpdate(notificationUpdate); + } else { + NotificationRequestUpdate notificationRequestUpdate = JacksonUtil.fromString(proto.getNotificationRequestUpdate(), NotificationRequestUpdate.class); + update = new NotificationsSubscriptionUpdate(notificationRequestUpdate); + } + return update; } - public static NotificationsCountSubscription fromProto(TransportProtos.NotificationsCountSubscriptionProto notificationsCountSub) { - TbSubscriptionProto sub = notificationsCountSub.getSub(); - return NotificationsCountSubscription.builder() - .serviceId(sub.getServiceId()) - .sessionId(sub.getSessionId()) - .subscriptionId(sub.getSubscriptionId()) - .tenantId(TenantId.fromUUID(new UUID(sub.getTenantIdMSB(), sub.getTenantIdLSB()))) - .entityId(EntityIdFactory.getByTypeAndUuid(sub.getEntityType(), new UUID(sub.getEntityIdMSB(), sub.getEntityIdLSB()))) + public static ToCoreNotificationMsg toAlarmSubUpdateToProto(EntityId entityId, AlarmInfo alarmInfo, boolean deleted) { + TransportProtos.TbAlarmSubUpdateProto.Builder updateProto = TransportProtos.TbAlarmSubUpdateProto.newBuilder() + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()) + .setAlarm(JacksonUtil.toString(alarmInfo)) + .setDeleted(deleted); + return ToCoreNotificationMsg.newBuilder() + .setToLocalSubscriptionServiceMsg( + TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() + .setAlarmUpdate(updateProto) + .build()) .build(); } - public static TelemetrySubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) { - if (proto.getErrorCode() > 0) { - return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); - } else { - Map> data = new TreeMap<>(); - proto.getDataList().forEach(v -> { - List values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>()); - for (int i = 0; i < v.getTsValueCount(); i++) { - Object[] value = new Object[2]; - TbSubscriptionUpdateTsValue tsValue = v.getTsValue(i); - value[0] = tsValue.getTs(); - value[1] = tsValue.hasValue() ? tsValue.getValue() : null; - values.add(value); - } - }); - return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), data); + public static ToCoreNotificationMsg notificationsSubUpdateToProto(EntityId entityId, NotificationsSubscriptionUpdate update) { + TransportProtos.NotificationsSubUpdateProto.Builder updateProto = TransportProtos.NotificationsSubUpdateProto.newBuilder() + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + if (update.getNotificationUpdate() != null) { + updateProto.setNotificationUpdate(JacksonUtil.toString(update.getNotificationUpdate())); } - } - - public static AlarmSubscriptionUpdate fromProto(TransportProtos.TbAlarmSubscriptionUpdateProto proto) { - if (proto.getErrorCode() > 0) { - return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); - } else { - AlarmInfo alarm = JacksonUtil.fromString(proto.getAlarm(), AlarmInfo.class); - return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), alarm, proto.getDeleted()); + if (update.getNotificationRequestUpdate() != null) { + updateProto.setNotificationRequestUpdate(JacksonUtil.toString(update.getNotificationRequestUpdate())); } + return ToCoreNotificationMsg.newBuilder() + .setToLocalSubscriptionServiceMsg(TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() + .setNotificationsUpdate(updateProto) + .build()) + .build(); } - public static ToCoreMsg toTimeseriesUpdateProto(TenantId tenantId, EntityId entityId, List ts) { TbTimeSeriesUpdateProto.Builder builder = TbTimeSeriesUpdateProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); @@ -317,6 +256,31 @@ public class TbSubscriptionUtils { return TsKvProto.newBuilder().setTs(ts).setKv(dataBuilder); } + private static TransportProtos.TsValueProto toTsValueProto(long ts, KvEntry attr) { + TransportProtos.TsValueProto.Builder dataBuilder = TransportProtos.TsValueProto.newBuilder(); + dataBuilder.setTs(ts); + dataBuilder.setType(KeyValueType.forNumber(attr.getDataType().ordinal())); + switch (attr.getDataType()) { + case BOOLEAN: + attr.getBooleanValue().ifPresent(dataBuilder::setBoolV); + break; + case LONG: + attr.getLongValue().ifPresent(dataBuilder::setLongV); + break; + case DOUBLE: + attr.getDoubleValue().ifPresent(dataBuilder::setDoubleV); + break; + case JSON: + attr.getJsonValue().ifPresent(dataBuilder::setJsonV); + break; + case STRING: + attr.getStrValue().ifPresent(dataBuilder::setStringV); + break; + } + return dataBuilder.build(); + } + + public static EntityId toEntityId(String entityType, long entityIdMSB, long entityIdLSB) { return EntityIdFactory.getByTypeAndUuid(entityType, new UUID(entityIdMSB, entityIdLSB)); } @@ -356,6 +320,35 @@ public class TbSubscriptionUtils { return entry; } + public static List toTsKvEntityList(String key, List dataList) { + List result = new ArrayList<>(dataList.size()); + dataList.forEach(proto -> result.add(new BasicTsKvEntry(proto.getTs(), getKvEntry(key, proto)))); + return result; + } + + private static KvEntry getKvEntry(String key, TransportProtos.TsValueProto proto) { + KvEntry entry = null; + DataType type = DataType.values()[proto.getType().getNumber()]; + switch (type) { + case BOOLEAN: + entry = new BooleanDataEntry(key, proto.getBoolV()); + break; + case LONG: + entry = new LongDataEntry(key, proto.getLongV()); + break; + case DOUBLE: + entry = new DoubleDataEntry(key, proto.getDoubleV()); + break; + case STRING: + entry = new StringDataEntry(key, proto.getStringV()); + break; + case JSON: + entry = new JsonDataEntry(key, proto.getJsonV()); + break; + } + return entry; + } + public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, AlarmInfo alarm) { TbAlarmUpdateProto.Builder builder = TbAlarmUpdateProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); @@ -382,23 +375,6 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); } - public static ToCoreNotificationMsg notificationsSubUpdateToProto(TbSubscription subscription, NotificationsSubscriptionUpdate update) { - TransportProtos.NotificationsSubscriptionUpdateProto.Builder updateProto = TransportProtos.NotificationsSubscriptionUpdateProto.newBuilder() - .setSessionId(subscription.getSessionId()) - .setSubscriptionId(subscription.getSubscriptionId()); - if (update.getNotificationUpdate() != null) { - updateProto.setNotificationUpdate(JacksonUtil.toString(update.getNotificationUpdate())); - } - if (update.getNotificationRequestUpdate() != null) { - updateProto.setNotificationRequestUpdate(JacksonUtil.toString(update.getNotificationRequestUpdate())); - } - return ToCoreNotificationMsg.newBuilder() - .setToLocalSubscriptionServiceMsg(TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() - .setNotificationsSubUpdate(updateProto) - .build()) - .build(); - } - public static ToCoreMsg notificationUpdateToProto(TenantId tenantId, UserId recipientId, NotificationUpdate notificationUpdate) { TransportProtos.NotificationUpdateProto updateProto = TransportProtos.NotificationUpdateProto.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) @@ -427,4 +403,40 @@ public class TbSubscriptionUtils { .build(); } + public static List fromProto(TransportProtos.TbSubUpdateProto proto) { + List result = new ArrayList<>(); + for (var p : proto.getDataList()) { + result.addAll(toTsKvEntityList(p.getKey(), p.getTsValueList())); + } + return result; + } + + static ToCoreNotificationMsg toProto(boolean timeSeries, EntityId entityId, List updates) { + TransportProtos.TbSubUpdateProto.Builder builder = TransportProtos.TbSubUpdateProto.newBuilder(); + + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + + Map> data = new TreeMap<>(); + + for (TsKvEntry tsEntry : updates) { + data.computeIfAbsent(tsEntry.getKey(), k -> new ArrayList<>()).add(toTsValueProto(tsEntry.getTs(), tsEntry)); + } + + data.forEach((key, value) -> { + TransportProtos.TsValueListProto.Builder dataBuilder = TransportProtos.TsValueListProto.newBuilder(); + dataBuilder.setKey(key); + dataBuilder.addAllTsValue(value); + builder.addData(dataBuilder.build()); + }); + + var result = TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder(); + if (timeSeries) { + result.setTsUpdate(builder); + } else { + result.setAttrUpdate(builder); + } + return ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(result).build(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java new file mode 100644 index 0000000000..93b1fb0857 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Information about the local websocket subscriptions. + */ +@RequiredArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(exclude = {"seqNumber"}) +@ToString +public class TbSubscriptionsInfo { + + protected boolean notifications; + protected boolean alarms; + protected boolean tsAllKeys; + protected Set tsKeys; + protected boolean attrAllKeys; + protected Set attrKeys; + protected int seqNumber; + + public boolean isEmpty() { + return !notifications && !alarms && !tsAllKeys && !attrAllKeys && tsKeys == null && attrKeys == null; + } + + protected TbSubscriptionsInfo copy() { + return copy(0); + } + + protected TbSubscriptionsInfo copy(int seqNumber) { + return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys, attrAllKeys, attrKeys, seqNumber); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeseriesSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java similarity index 84% rename from application/src/main/java/org/thingsboard/server/service/subscription/TbTimeseriesSubscription.java rename to application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java index 400cf07e7e..d3ebe4cd3f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeseriesSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java @@ -24,8 +24,10 @@ import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpda import java.util.Map; import java.util.function.BiConsumer; -public class TbTimeseriesSubscription extends TbSubscription { +public class TbTimeSeriesSubscription extends TbSubscription { + @Getter + private final long queryTs; @Getter private final boolean allKeys; @Getter @@ -38,10 +40,11 @@ public class TbTimeseriesSubscription extends TbSubscription, TelemetrySubscriptionUpdate> updateProcessor, - boolean allKeys, Map keyStates, long startTime, long endTime, boolean latestValues) { + long queryTs, boolean allKeys, Map keyStates, long startTime, long endTime, boolean latestValues) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES, updateProcessor); + this.queryTs = queryTs; this.allKeys = allKeys; this.keyStates = keyStates; this.startTime = startTime; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 21d69d6b42..d0334f90b6 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -62,7 +62,7 @@ import org.thingsboard.server.service.subscription.TbAttributeSubscription; import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; -import org.thingsboard.server.service.subscription.TbTimeseriesSubscription; +import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription; import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.WsCmd; @@ -443,7 +443,7 @@ public class DefaultWebSocketService implements WebSocketService { } } } catch (IOException e) { - log.warn("[{}] Failed to send session close: {}", sessionRef.getSessionId(), e); + log.warn("[{}] Failed to send session close:", sessionRef.getSessionId(), e); return false; } return true; @@ -477,6 +477,7 @@ public class DefaultWebSocketService implements WebSocketService { private void handleWsAttributesSubscriptionByKeys(WebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId, List keys) { + long queryTs = System.currentTimeMillis(); FutureCallback> callback = new FutureCallback<>() { @Override public void onSuccess(List data) { @@ -495,6 +496,7 @@ public class DefaultWebSocketService implements WebSocketService { .subscriptionId(cmd.getCmdId()) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) + .queryTs(queryTs) .allKeys(false) .keyStates(subState) .scope(scope) @@ -591,7 +593,10 @@ public class DefaultWebSocketService implements WebSocketService { } private void handleWsAttributesSubscription(WebSocketSessionRef sessionRef, - AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + AttributesSubscriptionCmd cmd, + String sessionId, + EntityId entityId) { + long queryTs = System.currentTimeMillis(); FutureCallback> callback = new FutureCallback<>() { @Override public void onSuccess(List data) { @@ -609,6 +614,7 @@ public class DefaultWebSocketService implements WebSocketService { .subscriptionId(cmd.getCmdId()) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) + .queryTs(queryTs) .allKeys(true) .keyStates(subState) .updateProcessor((subscription, update) -> { @@ -664,17 +670,18 @@ public class DefaultWebSocketService implements WebSocketService { Optional> keysOptional = getKeys(cmd); if (keysOptional.isPresent()) { - handleWsTimeseriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); + handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); } else { - handleWsTimeseriesSubscription(sessionRef, cmd, sessionId, entityId); + handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId); } } } } - private void handleWsTimeseriesSubscriptionByKeys(WebSocketSessionRef sessionRef, + private void handleWsTimeSeriesSubscriptionByKeys(WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) { long startTs; + long queryTs = System.currentTimeMillis(); if (cmd.getTimeWindow() > 0) { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); @@ -682,22 +689,22 @@ public class DefaultWebSocketService implements WebSocketService { long endTs = cmd.getStartTs() + cmd.getTimeWindow(); List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); - - final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); + final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, queryTs, startTs, keys); accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, on(r -> Futures.addCallback(tsService.findAll(sessionRef.getSecurityCtx().getTenantId(), entityId, queries), callback, executor), callback::onFailure)); } else { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); startTs = System.currentTimeMillis(); log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), entityId); - final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); + final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, queryTs, startTs, keys); accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, on(r -> Futures.addCallback(tsService.findLatest(sessionRef.getSecurityCtx().getTenantId(), entityId, keys), callback, executor), callback::onFailure)); } } - private void handleWsTimeseriesSubscription(WebSocketSessionRef sessionRef, + private void handleWsTimeSeriesSubscription(WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + long queryTs = System.currentTimeMillis(); FutureCallback> callback = new FutureCallback>() { @Override public void onSuccess(List data) { @@ -705,7 +712,7 @@ public class DefaultWebSocketService implements WebSocketService { data.forEach(v -> subState.put(v.getKey(), v.getTs())); Lock subLock = new ReentrantLock(); - TbTimeseriesSubscription sub = TbTimeseriesSubscription.builder() + TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) .subscriptionId(cmd.getCmdId()) @@ -719,6 +726,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.unlock(); } }) + .queryTs(queryTs) .allKeys(true) .keyStates(subState) .build(); @@ -749,7 +757,8 @@ public class DefaultWebSocketService implements WebSocketService { on(r -> Futures.addCallback(tsService.findAllLatest(sessionRef.getSecurityCtx().getTenantId(), entityId), callback, executor), callback::onFailure)); } - private FutureCallback> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List keys) { + private FutureCallback> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, + final String sessionId, final EntityId entityId, final long queryTs, final long startTs, final List keys) { return new FutureCallback<>() { @Override public void onSuccess(List data) { @@ -758,7 +767,7 @@ public class DefaultWebSocketService implements WebSocketService { data.forEach(v -> subState.put(v.getKey(), v.getTs())); Lock subLock = new ReentrantLock(); - TbTimeseriesSubscription sub = TbTimeseriesSubscription.builder() + TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) .subscriptionId(cmd.getCmdId()) @@ -772,6 +781,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.unlock(); } }) + .queryTs(queryTs) .allKeys(false) .keyStates(subState) .build(); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java index 012808b253..0937275a0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java @@ -32,6 +32,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; +import org.thingsboard.server.service.subscription.TbSubscription; import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd; @@ -119,7 +120,8 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH /* Notifications subscription update handling */ - private void handleNotificationsSubscriptionUpdate(NotificationsSubscription subscription, NotificationsSubscriptionUpdate subscriptionUpdate) { + private void handleNotificationsSubscriptionUpdate(TbSubscription sub, NotificationsSubscriptionUpdate subscriptionUpdate) { + NotificationsSubscription subscription = (NotificationsSubscription) sub; try { if (subscriptionUpdate.getNotificationUpdate() != null) { handleNotificationUpdate(subscription, subscriptionUpdate.getNotificationUpdate()); @@ -178,7 +180,8 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH /* Notifications count subscription update handling */ - private void handleNotificationsCountSubscriptionUpdate(NotificationsCountSubscription subscription, NotificationsSubscriptionUpdate subscriptionUpdate) { + private void handleNotificationsCountSubscriptionUpdate(TbSubscription sub, NotificationsSubscriptionUpdate subscriptionUpdate) { + NotificationsCountSubscription subscription = (NotificationsCountSubscription) sub; try { if (subscriptionUpdate.getNotificationUpdate() != null) { handleNotificationUpdate(subscription, subscriptionUpdate.getNotificationUpdate()); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java index cb5d140d21..5263f9d651 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java @@ -33,7 +33,7 @@ public class NotificationsCountSubscription extends TbSubscription updateProcessor) { + BiConsumer, NotificationsSubscriptionUpdate> updateProcessor) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.NOTIFICATIONS_COUNT, updateProcessor); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java index 591781a5f6..ede82f9ff7 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java @@ -43,7 +43,7 @@ public class NotificationsSubscription extends TbSubscription updateProcessor, + BiConsumer, NotificationsSubscriptionUpdate> updateProcessor, int limit) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.NOTIFICATIONS, updateProcessor); this.limit = limit; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java index 90041ffa13..66d18b24b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java @@ -16,13 +16,13 @@ package org.thingsboard.server.service.ws.telemetry.sub; import lombok.Getter; +import lombok.ToString; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; +@ToString public class AlarmSubscriptionUpdate { - @Getter - private int subscriptionId; @Getter private int errorCode; @Getter @@ -32,31 +32,24 @@ public class AlarmSubscriptionUpdate { @Getter private boolean alarmDeleted; - public AlarmSubscriptionUpdate(int subscriptionId, AlarmInfo alarm) { - this(subscriptionId, alarm, false); + public AlarmSubscriptionUpdate(AlarmInfo alarm) { + this(alarm, false); } - public AlarmSubscriptionUpdate(int subscriptionId, AlarmInfo alarm, boolean alarmDeleted) { + public AlarmSubscriptionUpdate(AlarmInfo alarm, boolean alarmDeleted) { super(); - this.subscriptionId = subscriptionId; this.alarm = alarm; this.alarmDeleted = alarmDeleted; } - public AlarmSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode) { - this(subscriptionId, errorCode, null); + public AlarmSubscriptionUpdate(SubscriptionErrorCode errorCode) { + this(errorCode, null); } - public AlarmSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode, String errorMsg) { + public AlarmSubscriptionUpdate(SubscriptionErrorCode errorCode, String errorMsg) { super(); - this.subscriptionId = subscriptionId; this.errorCode = errorCode.getCode(); this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg(); } - @Override - public String toString() { - return "AlarmUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + - ", alarm=" + alarm + "]"; - } } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java index 108c5cf1d4..cea1731a31 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java @@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,7 +28,7 @@ import java.util.stream.Collectors; public class TelemetrySubscriptionUpdate { - private int subscriptionId; + private final int subscriptionId; private int errorCode; private String errorMsg; private Map> data; @@ -94,7 +95,14 @@ public class TelemetrySubscriptionUpdate { @Override public String toString() { - return "TsSubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data=" - + data + "]"; + StringBuilder result = new StringBuilder("TelemetrySubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data="); + data.forEach((k, v) -> { + result.append(k).append("=["); + for(Object a : v){ + result.append(Arrays.toString((Object[])a)).append("|"); + } + result.append("]"); + }); + return result.toString(); } } diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index 981bcab132..3d0ce3f933 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -38,6 +38,8 @@ + + diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index e81aaed39d..d2cbde34e4 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -40,6 +41,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueClusterService; +import java.util.List; import java.util.UUID; public interface TbClusterService extends TbQueueClusterService { @@ -50,6 +52,8 @@ public interface TbClusterService extends TbQueueClusterService { void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback); + void broadcastToCore(TransportProtos.ToCoreNotificationMsg msg); + void pushMsgToVersionControl(TenantId tenantId, ToVersionControlServiceMsg msg, TbQueueCallback callback); void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 60dfcae1de..db9528897f 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -246,6 +246,12 @@ message QueueDeleteMsg { string queueName = 5; } +message CoreStartupMsg { + string serviceId = 1; + repeated int32 partitions = 2; + int64 ts = 3; +} + message LwM2MRegistrationRequestMsg { string tenantId = 1; string endpoint = 2; @@ -531,6 +537,71 @@ message TransportToRuleEngineMsg { * TB Core Data Structures */ +message TbEntitySubEventProto { + string serviceId = 1; + int32 seqNumber = 2; + int64 tenantIdMSB = 3; + int64 tenantIdLSB = 4; + string entityType = 5; + int64 entityIdMSB = 6; + int64 entityIdLSB = 7; + string type = 8; + bool notifications = 9; + bool alarms = 10; + bool tsAllKeys = 11; + repeated string tsKeys = 12; + bool attrAllKeys = 13; + repeated string attrKeys = 14; +} + +message TbEntitySubEventCallbackProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + int32 seqNumber = 3; + int64 attributesUpdateTs = 4; + int64 timeSeriesUpdateTs = 5; +} + +message TsValueProto { + int64 ts = 1; + KeyValueType type = 2; + bool bool_v = 3; + int64 long_v = 4; + double double_v = 5; + string string_v = 6; + string json_v = 7; +} + +message TsValueListProto { + string key = 1; + repeated TsValueProto tsValue = 2; +} + +message TbSubUpdateProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + int32 errorCode = 3; + string errorMsg = 4; + repeated TsValueListProto data = 5; +} + +message TbAlarmSubUpdateProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + int32 errorCode = 3; + string errorMsg = 4; + string alarm = 5; + bool deleted = 6; +} + +message NotificationsSubUpdateProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + string notificationUpdate = 3; + string notificationRequestUpdate = 4; +} + +// DEPRECATED. FOR REMOVAL message TbSubscriptionProto { string serviceId = 1; string sessionId = 2; @@ -542,6 +613,7 @@ message TbSubscriptionProto { int64 entityIdLSB = 8; } +// DEPRECATED. FOR REMOVAL message TbTimeSeriesSubscriptionProto { TbSubscriptionProto sub = 1; bool allKeys = 2; @@ -551,6 +623,7 @@ message TbTimeSeriesSubscriptionProto { bool latestValues = 6; } +// DEPRECATED. FOR REMOVAL message TbAttributeSubscriptionProto { TbSubscriptionProto sub = 1; bool allKeys = 2; @@ -558,20 +631,24 @@ message TbAttributeSubscriptionProto { string scope = 4; } +// DEPRECATED. FOR REMOVAL message TbAlarmSubscriptionProto { TbSubscriptionProto sub = 1; int64 ts = 2; } +// DEPRECATED. FOR REMOVAL message NotificationsSubscriptionProto { TbSubscriptionProto sub = 1; int32 limit = 2; } +// DEPRECATED. FOR REMOVAL message NotificationsCountSubscriptionProto { TbSubscriptionProto sub = 1; } +// DEPRECATED. FOR REMOVAL message TbSubscriptionUpdateProto { string sessionId = 1; int32 subscriptionId = 2; @@ -580,6 +657,7 @@ message TbSubscriptionUpdateProto { repeated TbSubscriptionUpdateValueListProto data = 5; } +// DEPRECATED. FOR REMOVAL message TbAlarmSubscriptionUpdateProto { string sessionId = 1; int32 subscriptionId = 2; @@ -589,6 +667,7 @@ message TbAlarmSubscriptionUpdateProto { bool deleted = 6; } +// DEPRECATED. FOR REMOVAL message NotificationsSubscriptionUpdateProto { string sessionId = 1; int32 subscriptionId = 2; @@ -667,6 +746,7 @@ message TbTimeSeriesUpdateProto { repeated TsKvProto data = 6; } +// DEPRECATED. FOR REMOVAL message TbSubscriptionCloseProto { string sessionId = 1; int32 subscriptionId = 2; @@ -702,26 +782,32 @@ message DeviceStateServiceMsgProto { } message SubscriptionMgrMsgProto { - TbTimeSeriesSubscriptionProto telemetrySub = 1; - TbAttributeSubscriptionProto attributeSub = 2; - TbSubscriptionCloseProto subClose = 3; + TbTimeSeriesSubscriptionProto telemetrySub = 1; // DEPRECATED. FOR REMOVAL + TbAttributeSubscriptionProto attributeSub = 2; // DEPRECATED. FOR REMOVAL + TbSubscriptionCloseProto subClose = 3; // DEPRECATED. FOR REMOVAL TbTimeSeriesUpdateProto tsUpdate = 4; TbAttributeUpdateProto attrUpdate = 5; TbAttributeDeleteProto attrDelete = 6; - TbAlarmSubscriptionProto alarmSub = 7; + TbAlarmSubscriptionProto alarmSub = 7; // DEPRECATED. FOR REMOVAL TbAlarmUpdateProto alarmUpdate = 8; TbAlarmDeleteProto alarmDelete = 9; TbTimeSeriesDeleteProto tsDelete = 10; - NotificationsSubscriptionProto notificationsSub = 11; - NotificationsCountSubscriptionProto notificationsCountSub = 12; + NotificationsSubscriptionProto notificationsSub = 11; // DEPRECATED. FOR REMOVAL + NotificationsCountSubscriptionProto notificationsCountSub = 12; // DEPRECATED. FOR REMOVAL NotificationUpdateProto notificationUpdate = 13; NotificationRequestUpdateProto notificationRequestUpdate = 14; + TbEntitySubEventProto subEvent = 15; } message LocalSubscriptionServiceMsgProto { - TbSubscriptionUpdateProto subUpdate = 1; - TbAlarmSubscriptionUpdateProto alarmSubUpdate = 2; - NotificationsSubscriptionUpdateProto notificationsSubUpdate = 3; + TbSubscriptionUpdateProto subUpdate = 1; // DEPRECATED. FOR REMOVAL + TbAlarmSubscriptionUpdateProto alarmSubUpdate = 2; // DEPRECATED. FOR REMOVAL + NotificationsSubscriptionUpdateProto notificationsSubUpdate = 3; // DEPRECATED. FOR REMOVAL + TbEntitySubEventCallbackProto subEventCallback = 4; + TbSubUpdateProto tsUpdate = 5; + TbSubUpdateProto attrUpdate = 6; + TbAlarmSubUpdateProto alarmUpdate = 7; + NotificationsSubUpdateProto notificationsUpdate = 8; } message FromDeviceRPCResponseProto { @@ -992,6 +1078,7 @@ message ToCoreNotificationMsg { bytes fromEdgeSyncResponseMsg = 9; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10; NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11; + CoreStartupMsg coreStartupMsg = 12; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 5be5caf6ff..7afc40f2c2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -118,6 +118,11 @@ public class HashPartitionService implements PartitionService { } } + @Override + public List getMyPartitions(QueueKey queueKey) { + return myPartitions.get(queueKey); + } + private void doInitRuleEnginePartitions() { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index b55ba79f67..fe06db789b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -37,6 +37,8 @@ public interface PartitionService { boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId); + List getMyPartitions(QueueKey queueKey); + /** * Received from the Discovery service when network topology is changed. * @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index e99817de17..d94ff082d1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -35,10 +35,12 @@ import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent; import org.thingsboard.server.queue.util.AfterStartUp; import javax.annotation.PostConstruct; @@ -74,6 +76,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi protected final ConcurrentHashMap> delayedTasks; + private final ApplicationEventPublisher applicationEventPublisher; private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; @@ -85,8 +88,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private volatile boolean stopped = true; - public ZkDiscoveryService(TbServiceInfoProvider serviceInfoProvider, + public ZkDiscoveryService(ApplicationEventPublisher applicationEventPublisher, + TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) { + this.applicationEventPublisher = applicationEventPublisher; this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; delayedTasks = new ConcurrentHashMap<>(); @@ -141,9 +146,11 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi return; } log.info("Going to publish current server..."); - zkExecutorService.scheduleAtFixedRate(this::publishCurrentServer, 0, 1, TimeUnit.MINUTES); + publishCurrentServer(); log.info("Going to recalculate partitions..."); recalculatePartitions(); + + zkExecutorService.scheduleAtFixedRate(this::publishCurrentServer, 1, 1, TimeUnit.MINUTES); } @SneakyThrows @@ -319,6 +326,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } break; case CHILD_REMOVED: + zkExecutorService.submit(() -> applicationEventPublisher.publishEvent(new OtherServiceShutdownEvent(this, serviceId, serviceTypesList))); ScheduledFuture future = zkExecutorService.schedule(() -> { log.debug("[{}] Going to recalculate partitions due to removed node [{}]", serviceId, serviceTypesList); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java new file mode 100644 index 0000000000..5de09696c6 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery.event; + +import com.google.protobuf.ProtocolStringList; +import lombok.Getter; +import org.thingsboard.server.common.msg.queue.ServiceType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class OtherServiceShutdownEvent extends TbApplicationEvent { + + private static final long serialVersionUID = -2441739930040282254L; + + @Getter + private final String serviceId; + @Getter + private final Set serviceTypes; + + public OtherServiceShutdownEvent(Object source, String serviceId, List serviceTypes) { + super(source); + this.serviceId = serviceId; + this.serviceTypes = serviceTypes.stream().map(ServiceType::valueOf).collect(Collectors.toSet()); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java index e000735fba..d235acb4fa 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java @@ -34,6 +34,7 @@ public @interface AfterStartUp { int QUEUE_INFO_INITIALIZATION = 1; int DISCOVERY_SERVICE = 2; + int STARTUP_SERVICE = 8; int ACTOR_SYSTEM = 9; int REGULAR_SERVICE = 10; diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java index a8810efd0e..3af4683ab3 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -26,6 +26,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.transport.TransportProtos; @@ -51,6 +52,8 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class ZkDiscoveryServiceTest { + @Mock + private ApplicationEventPublisher applicationEventPublisher; @Mock private TbServiceInfoProvider serviceInfoProvider; @@ -77,7 +80,7 @@ public class ZkDiscoveryServiceTest { @Before public void setup() { - zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); + zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(applicationEventPublisher, serviceInfoProvider, partitionService)); ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 4cf6db0fc1..fdf537a817 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.sql.query; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Repository;