diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java index 4e83aa0060..9edc81f9fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/InMemoryHouseKeeperServiceService.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -91,7 +90,7 @@ public class InMemoryHouseKeeperServiceService implements HouseKeeperService { } @Override - public void onFailure(@NotNull Throwable throwable) { + public void onFailure(Throwable throwable) { queueSize.decrementAndGet(); totalProcessedCounter.incrementAndGet(); log.error("[{}][{}] unassignDeletedUserAlarms failed, pending queue size: {}, total processed count: {}", diff --git a/application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java b/application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java new file mode 100644 index 0000000000..3139ff256e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/partition/TbCoreStartupService.java @@ -0,0 +1,52 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.partition; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.util.AfterStartUp; +import org.thingsboard.server.queue.util.TbCoreComponent; + +@Slf4j +@TbCoreComponent +@Service +@RequiredArgsConstructor +public class TbCoreStartupService { + + private final PartitionService partitionService; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbClusterService clusterService; + + @AfterStartUp(order = AfterStartUp.STARTUP_SERVICE) + public void onApplicationEvent(ApplicationReadyEvent event) { + var myPartitions = partitionService.getMyPartitions(new QueueKey(ServiceType.TB_CORE)); + if (myPartitions != null && !myPartitions.isEmpty()) { + TransportProtos.ToCoreNotificationMsg toCoreMsg = TransportProtos.ToCoreNotificationMsg.newBuilder() + .setCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder() + .setServiceId(serviceInfoProvider.getServiceId()).addAllPartitions(myPartitions).build()).build(); + clusterService.broadcastToCore(toCoreMsg); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 53ef38d96a..308d3b2f43 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -77,6 +77,7 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -138,6 +139,18 @@ public class DefaultTbClusterService implements TbClusterService { toCoreMsgs.incrementAndGet(); } + @Override + public void broadcastToCore(ToCoreNotificationMsg toCoreMsg) { + UUID msgId = UUID.randomUUID(); + TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); + Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); + for (String serviceId : tbCoreServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msgId, toCoreMsg), null); + toCoreNfs.incrementAndGet(); + } + } + @Override public void pushMsgToVersionControl(TenantId tenantId, TransportProtos.ToVersionControlServiceMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId); @@ -203,6 +216,7 @@ public class DefaultTbClusterService implements TbClusterService { } return null; } + private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile) { if (ruleEngineProfile != null) { RuleChainId targetRuleChainId = ruleEngineProfile.getDefaultRuleChainId(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fd5a252cc5..3494125d5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -58,7 +58,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -66,6 +65,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -93,7 +93,6 @@ import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; -import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -343,10 +342,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService Do NOTHING. + callback.onSuccess(); } else { throwNotHandled(msg, callback); } } + private void forwardCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg, TbCallback callback) { + log.info("[{}] Processing core startup with partitions: {}", coreStartupMsg.getServiceId(), coreStartupMsg.getPartitionsList()); + localSubscriptionService.onCoreStartupMsg(coreStartupMsg); + callback.onSuccess(); + } + private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) { - if (msg.hasAttributeSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAttributeSub()), callback); + if (msg.hasSubEvent()) { + TbEntitySubEventProto subEvent = msg.getSubEvent(); + subscriptionManagerService.onSubEvent(subEvent.getServiceId(), TbSubscriptionUtils.fromProto(subEvent), callback); } else if (msg.hasTelemetrySub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getTelemetrySub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasAlarmSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAlarmSub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasNotificationsSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getNotificationsSub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasNotificationsCountSub()) { - subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getNotificationsCountSub()), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasSubClose()) { - TbSubscriptionCloseProto closeProto = msg.getSubClose(); - subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback); + callback.onSuccess(); + // Deprecated, for removal; Left intentionally to avoid throwNotHandled } else if (msg.hasTsUpdate()) { TbTimeSeriesUpdateProto proto = msg.getTsUpdate(); long tenantIdMSB = proto.getTenantIdMSB(); @@ -576,7 +586,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService implements SubscriptionManagerService { - private final AttributesService attrService; - private final TimeseriesService tsService; private final NotificationsTopicService notificationsTopicService; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; @@ -97,126 +78,89 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene private final TbLocalSubscriptionService localSubscriptionService; private final DeviceStateService deviceStateService; private final TbClusterService clusterService; + private final SubscriptionSchedulerComponent scheduler; - private final Map> subscriptionsByEntityId = new ConcurrentHashMap<>(); - private final Map> subscriptionsByWsSessionId = new ConcurrentHashMap<>(); - private final ConcurrentMap> partitionedSubscriptions = new ConcurrentHashMap<>(); - private final Set currentPartitions = ConcurrentHashMap.newKeySet(); + private final Lock subsLock = new ReentrantLock(); + private final ConcurrentMap entitySubscriptions = new ConcurrentHashMap<>(); + + private final ConcurrentMap entityUpdates = new ConcurrentHashMap<>(); - private ExecutorService tsCallBackExecutor; private String serviceId; private TbQueueProducer> toCoreNotificationsProducer; + private long initTs; + @PostConstruct public void initExecutor() { - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); serviceId = serviceInfoProvider.getServiceId(); + initTs = System.currentTimeMillis(); toCoreNotificationsProducer = producerProvider.getTbCoreNotificationsMsgProducer(); - } - - @PreDestroy - public void shutdownExecutor() { - if (tsCallBackExecutor != null) { - tsCallBackExecutor.shutdownNow(); - } + scheduler.scheduleWithFixedDelay(this::cleanupEntityUpdates, 1, 1, TimeUnit.HOURS); } @Override - public void addSubscription(TbSubscription subscription, TbCallback callback) { - log.trace("[{}][{}][{}] Registering subscription for entity [{}]", - subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); - if (currentPartitions.contains(tpi)) { - partitionedSubscriptions.computeIfAbsent(tpi, k -> ConcurrentHashMap.newKeySet()).add(subscription); + public void onSubEvent(String serviceId, TbEntitySubEvent event, TbCallback callback) { + var tenantId = event.getTenantId(); + var entityId = event.getEntityId(); + log.trace("[{}][{}][{}] Processing subscription event {}", tenantId, entityId, serviceId, event); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (tpi.isMyPartition()) { + subsLock.lock(); + try { + var entitySubs = entitySubscriptions.computeIfAbsent(entityId, id -> new TbEntityRemoteSubsInfo(tenantId, entityId)); + boolean empty = entitySubs.updateAndCheckIsEmpty(serviceId, event); + if (empty) { + entitySubscriptions.remove(entityId); + } + } finally { + subsLock.unlock(); + } callback.onSuccess(); + if (event.hasTsOrAttrSub()) { + sendSubEventCallback(serviceId, entityId, event.getSeqNumber()); + } } else { - log.warn("[{}][{}] Entity belongs to external partition. Probably rebalancing is in progress. Topic: {}" - , subscription.getTenantId(), subscription.getEntityId(), tpi.getFullTopicName()); + log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}" + , tenantId, entityId, serviceId, tpi.getFullTopicName()); callback.onFailure(new RuntimeException("Entity belongs to external partition " + tpi.getFullTopicName() + "!")); } - boolean newSubscription = subscriptionsByEntityId - .computeIfAbsent(subscription.getEntityId(), k -> ConcurrentHashMap.newKeySet()).add(subscription); - subscriptionsByWsSessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()).put(subscription.getSubscriptionId(), subscription); - if (newSubscription) { - switch (subscription.getType()) { - case TIMESERIES: - handleNewTelemetrySubscription((TbTimeseriesSubscription) subscription); - break; - case ATTRIBUTES: - handleNewAttributeSubscription((TbAttributeSubscription) subscription); - break; - case ALARMS: - handleNewAlarmsSubscription((TbAlarmsSubscription) subscription); - break; - } - } } @Override - public void cancelSubscription(String sessionId, int subscriptionId, TbCallback callback) { - log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); - Map sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId); - if (sessionSubscriptions != null) { - TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); - if (subscription != null) { - removeSubscriptionFromEntityMap(subscription); - removeSubscriptionFromPartitionMap(subscription); - if (sessionSubscriptions.isEmpty()) { - subscriptionsByWsSessionId.remove(sessionId); - } - } else { - log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); + @EventListener(OtherServiceShutdownEvent.class) + public void onApplicationEvent(OtherServiceShutdownEvent event) { + if (event.getServiceTypes() != null && event.getServiceTypes().contains(ServiceType.TB_CORE)) { + subsLock.lock(); + try { + int sizeBeforeCleanup = entitySubscriptions.size(); + entitySubscriptions.entrySet().removeIf(kv -> kv.getValue().removeAndCheckIsEmpty(event.getServiceId())); + log.info("[{}][{}] Removed {} entity subscription records due to server shutdown.", serviceId, event.getServiceId(), entitySubscriptions.size() - sizeBeforeCleanup); + } finally { + subsLock.unlock(); } + } + } + + private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) { + var update = getEntityUpdatesInfo(entityId); + if (serviceId.equals(targetId)) { + localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY); } else { - log.debug("[{}] No session subscriptions found!", sessionId); + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update)); } - callback.onSuccess(); } @Override protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { - Set removedPartitions = new HashSet<>(currentPartitions); - removedPartitions.removeAll(partitionChangeEvent.getPartitions()); - - currentPartitions.clear(); - currentPartitions.addAll(partitionChangeEvent.getPartitions()); - - // We no longer manage current partition of devices; - removedPartitions.forEach(partition -> { - Set subs = partitionedSubscriptions.remove(partition); - if (subs != null) { - subs.forEach(sub -> { - if (!serviceId.equals(sub.getServiceId())) { - removeSubscriptionFromEntityMap(sub); - } - }); - } - }); + entitySubscriptions.values().removeIf(sub -> + !partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()).isMyPartition()); } } @Override public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { - return (TbTimeseriesSubscription) s; - } else { - return null; - } - }, s -> true, s -> { - List subscriptionUpdate = null; - for (TsKvEntry kv : ts) { - if ((s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey())))) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(kv); - } - } - return subscriptionUpdate; - }, true); + onTimeSeriesUpdate(entityId, ts); if (entityId.getEntityType() == EntityType.DEVICE) { updateDeviceInactivityTimeout(tenantId, entityId, ts); } @@ -224,168 +168,68 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } @Override - public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback) { - onAttributesUpdate(tenantId, entityId, scope, attributes, true, callback); - } - - @Override - public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { - return (TbAttributeSubscription) s; - } else { - return null; - } - }, - s -> (TbAttributeSubscriptionScope.ANY_SCOPE.equals(s.getScope()) || scope.equals(s.getScope().name())), - s -> { - List subscriptionUpdate = null; - for (AttributeKvEntry kv : attributes) { - if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv)); - } - } - return subscriptionUpdate; - }, true); + public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback) { + onTimeSeriesUpdate(entityId, + keys.stream().map(key -> new BasicTsKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList())); if (entityId.getEntityType() == EntityType.DEVICE) { - if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) { - updateDeviceInactivityTimeout(tenantId, entityId, attributes); - } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { - clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, - new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) - , null); - } + deleteDeviceInactivityTimeout(tenantId, entityId, keys); } callback.onSuccess(); } - private void updateDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List kvEntries) { - for (KvEntry kvEntry : kvEntries) { - if (kvEntry.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { - deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), getLongValue(kvEntry)); - } + public void onTimeSeriesUpdate(EntityId entityId, List update) { + getEntityUpdatesInfo(entityId).timeSeriesUpdateTs = System.currentTimeMillis(); + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + log.trace("[{}] Handling time-series update: {}", entityId, update); + subInfo.getSubs().forEach((serviceId, sub) -> { + if (sub.tsAllKeys) { + onTimeSeriesUpdate(serviceId, entityId, update); + } else if (sub.tsKeys != null) { + List tmp = getSubList(update, sub.tsKeys); + if (tmp != null) { + onTimeSeriesUpdate(serviceId, entityId, tmp); + } + } + }); + } else { + log.trace("[{}] No time-series subscriptions for entity.", entityId); } } - private void deleteDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List keys) { - for (String key : keys) { - if (key.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { - deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), 0); - } + private void onTimeSeriesUpdate(String targetId, EntityId entityId, List update) { + if (serviceId.equals(targetId)) { + localSubscriptionService.onTimeSeriesUpdate(entityId, update, TbCallback.EMPTY); + } else { + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(true, entityId, update)); } } @Override - public void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { - onLocalAlarmSubUpdate(entityId, - s -> { - if (TbSubscriptionType.ALARMS.equals(s.getType())) { - return (TbAlarmsSubscription) s; - } else { - return null; - } - }, - s -> alarm.getCreatedTime() >= s.getTs() || alarm.getAssignTs() >= s.getTs(), - alarm, false - ); - callback.onSuccess(); - } - - @Override - public void onAlarmDeleted(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { - onLocalAlarmSubUpdate(entityId, - s -> { - if (TbSubscriptionType.ALARMS.equals(s.getType())) { - return (TbAlarmsSubscription) s; - } else { - return null; - } - }, - s -> alarm.getCreatedTime() >= s.getTs(), - alarm, true - ); - callback.onSuccess(); - } - - @Override - public void onNotificationUpdate(TenantId tenantId, UserId recipientId, NotificationUpdate notificationUpdate, TbCallback callback) { - Set subscriptions = subscriptionsByEntityId.get(recipientId); - if (subscriptions != null) { - NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate(notificationUpdate); - log.trace("Handling notificationUpdate for user {}: {}", recipientId, notificationUpdate); - subscriptions.stream() - .filter(subscription -> subscription.getType() == TbSubscriptionType.NOTIFICATIONS - || subscription.getType() == TbSubscriptionType.NOTIFICATIONS_COUNT) - .forEach(subscription -> onNotificationsSubUpdate(subscriptionUpdate, subscription)); - } - callback.onSuccess(); + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback) { + onAttributesUpdate(tenantId, entityId, scope, attributes, true, callback); } @Override - public void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate notificationRequestUpdate, TbCallback callback) { - NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate(notificationRequestUpdate); - subscriptionsByEntityId.forEach((entityId, subscriptions) -> { - if (entityId.getEntityType() != EntityType.USER) { - return; + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, TbCallback callback) { + getEntityUpdatesInfo(entityId).attributesUpdateTs = System.currentTimeMillis(); + processAttributesUpdate(entityId, attributes); + if (entityId.getEntityType() == EntityType.DEVICE) { + if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) { + updateDeviceInactivityTimeout(tenantId, entityId, attributes); + } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { + clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, + new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) + , null); } - log.trace("Handling notificationRequestUpdate for user {}: {}", entityId, notificationRequestUpdate); - subscriptions.forEach(subscription -> { - if (subscription.getType() != TbSubscriptionType.NOTIFICATIONS && - subscription.getType() != TbSubscriptionType.NOTIFICATIONS_COUNT) { - return; - } - if (!subscription.getTenantId().equals(tenantId)) { - return; - } - onNotificationsSubUpdate(subscriptionUpdate, subscription); - }); - }); - callback.onSuccess(); - } - - private void onNotificationsSubUpdate(NotificationsSubscriptionUpdate subscriptionUpdate, TbSubscription subscription) { - if (serviceId.equals(subscription.getServiceId())) { - log.trace("[{}][{}][{}] Subscription session is managed by current service, forwarding to localSubscriptionService (update: {})", - subscription.getServiceId(), subscription.getEntityId(), subscription.getSessionId(), subscriptionUpdate); - localSubscriptionService.onSubscriptionUpdate(subscription.getSessionId(), - subscription.getSubscriptionId(), subscriptionUpdate, TbCallback.EMPTY); - } else { - log.trace("[{}][{}][{}] Subscription session is not managed by current service (update: {})", - subscription.getServiceId(), subscription.getEntityId(), subscription.getSessionId(), subscriptionUpdate); - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - ToCoreNotificationMsg updateProto = TbSubscriptionUtils.notificationsSubUpdateToProto(subscription, subscriptionUpdate); - TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(subscription.getEntityId().getId(), updateProto); - toCoreNotificationsProducer.send(tpi, queueMsg, null); } + callback.onSuccess(); } @Override public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { - return (TbAttributeSubscription) s; - } else { - return null; - } - }, - s -> (TbAttributeSubscriptionScope.ANY_SCOPE.equals(s.getScope()) || scope.equals(s.getScope().name())), - s -> { - List subscriptionUpdate = null; - for (String key : keys) { - if (s.isAllKeys() || s.getKeyStates().containsKey(key)) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, ""))); - } - } - return subscriptionUpdate; - }, false); + processAttributesUpdate(entityId, + keys.stream().map(key -> new BaseAttributeKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList())); if (entityId.getEntityType() == EntityType.DEVICE) { if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope) || TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) { @@ -398,222 +242,117 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene callback.onSuccess(); } - @Override - public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback) { - onLocalTelemetrySubUpdate(entityId, - s -> { - if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { - return (TbTimeseriesSubscription) s; - } else { - return null; - } - }, s -> true, s -> { - List subscriptionUpdate = null; - for (String key : keys) { - if (s.isAllKeys() || s.getKeyStates().containsKey(key)) { - if (subscriptionUpdate == null) { - subscriptionUpdate = new ArrayList<>(); - } - subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, ""))); - } - } - return subscriptionUpdate; - }, false); - if (entityId.getEntityType() == EntityType.DEVICE) { - deleteDeviceInactivityTimeout(tenantId, entityId, keys); - } - callback.onSuccess(); - } - - private void onLocalTelemetrySubUpdate(EntityId entityId, - Function castFunction, - Predicate filterFunction, - Function> processFunction, - boolean ignoreEmptyUpdates) { - Set entitySubscriptions = subscriptionsByEntityId.get(entityId); - if (entitySubscriptions != null) { - entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> { - List subscriptionUpdate = processFunction.apply(s); - if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) { - if (serviceId.equals(s.getServiceId())) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate); - localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); - } else { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(s, subscriptionUpdate, ignoreEmptyUpdates), null); + public void processAttributesUpdate(EntityId entityId, List update) { + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + log.trace("[{}] Handling attributes update: {}", entityId, update); + subInfo.getSubs().forEach((serviceId, sub) -> { + if (sub.attrAllKeys) { + processAttributesUpdate(serviceId, entityId, update); + } else if (sub.attrKeys != null) { + List tmp = getSubList(update, sub.attrKeys); + if (tmp != null) { + processAttributesUpdate(serviceId, entityId, tmp); } } }); } else { - log.debug("[{}] No device subscriptions to process!", entityId); + log.trace("[{}] No attributes subscriptions for entity.", entityId); } } - private void onLocalAlarmSubUpdate(EntityId entityId, - Function castFunction, - Predicate filterFunction, - AlarmInfo alarm, boolean deleted) { - Set entitySubscriptions = subscriptionsByEntityId.get(entityId); - if (alarm == null) { - log.warn("[{}] empty alarm update!", entityId); - return; - } - if (entitySubscriptions != null) { - entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> { - if (serviceId.equals(s.getServiceId())) { - AlarmSubscriptionUpdate update = new AlarmSubscriptionUpdate(s.getSubscriptionId(), alarm, deleted); - localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); - } else { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(s, alarm, deleted), null); - } - }); + private void processAttributesUpdate(String targetId, EntityId entityId, List update) { + List tsKvEntryList = update.stream().map(attr -> new BasicTsKvEntry(attr.getLastUpdateTs(), attr)).collect(Collectors.toList()); + if (serviceId.equals(targetId)) { + localSubscriptionService.onAttributesUpdate(entityId, tsKvEntryList, TbCallback.EMPTY); } else { - log.debug("[{}] No device subscriptions to process!", entityId); + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(false, entityId, tsKvEntryList)); } } - private void removeSubscriptionFromEntityMap(TbSubscription sub) { - Set entitySubSet = subscriptionsByEntityId.get(sub.getEntityId()); - if (entitySubSet != null) { - entitySubSet.remove(sub); - if (entitySubSet.isEmpty()) { - subscriptionsByEntityId.remove(sub.getEntityId()); + private void updateDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List kvEntries) { + for (KvEntry kvEntry : kvEntries) { + if (kvEntry.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { + deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), getLongValue(kvEntry)); } } } - private void removeSubscriptionFromPartitionMap(TbSubscription sub) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()); - Set subs = partitionedSubscriptions.get(tpi); - if (subs != null) { - subs.remove(sub); + private void deleteDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List keys) { + for (String key : keys) { + if (key.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { + deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), 0); + } } } - private void handleNewAttributeSubscription(TbAttributeSubscription subscription) { - log.trace("[{}][{}][{}] Processing remote attribute subscription for entity [{}]", - serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - - final Map keyStates = subscription.getKeyStates(); - DonAsynchron.withCallback(attrService.find(subscription.getTenantId(), subscription.getEntityId(), DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> { - List missedUpdates = new ArrayList<>(); - values.forEach(latestEntry -> { - if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) { - missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); - } - }); - if (!missedUpdates.isEmpty()) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null); - } - }, - e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor); + @Override + public void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { + onAlarmSubUpdate(tenantId, entityId, alarm, false, callback); } - private void handleNewAlarmsSubscription(TbAlarmsSubscription subscription) { - log.trace("[{}][{}][{}] Processing remote alarm subscription for entity [{}]", - serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - //TODO: @dlandiak search all new alarms for this entity. + @Override + public void onAlarmDeleted(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback) { + onAlarmSubUpdate(tenantId, entityId, alarm, true, callback); } - private void handleNewTelemetrySubscription(TbTimeseriesSubscription subscription) { - log.trace("[{}][{}][{}] Processing remote telemetry subscription for entity [{}]", - serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); - - long curTs = System.currentTimeMillis(); - - if (subscription.isLatestValues()) { - DonAsynchron.withCallback(tsService.findLatest(subscription.getTenantId(), subscription.getEntityId(), subscription.getKeyStates().keySet()), - missedUpdates -> { - if (missedUpdates != null && !missedUpdates.isEmpty()) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null); - } - }, - e -> log.error("Failed to fetch missed updates.", e), - tsCallBackExecutor); - } else { - List queries = new ArrayList<>(); - subscription.getKeyStates().forEach((key, value) -> { - if (curTs > value) { - long startTs = subscription.getStartTime() > 0 ? Math.max(subscription.getStartTime(), value + 1L) : (value + 1L); - long endTs = subscription.getEndTime() > 0 ? Math.min(subscription.getEndTime(), curTs) : curTs; - queries.add(new BaseReadTsKvQuery(key, startTs, endTs, 0, 1000, Aggregation.NONE)); + private void onAlarmSubUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback) { + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + log.trace("[{}][{}] Handling alarm update {}: {}", tenantId, entityId, alarm, deleted); + for (Map.Entry entry : subInfo.getSubs().entrySet()) { + if (entry.getValue().notifications) { + onAlarmSubUpdate(entry.getKey(), entityId, alarm, deleted); } - }); - if (!queries.isEmpty()) { - DonAsynchron.withCallback(tsService.findAll(subscription.getTenantId(), subscription.getEntityId(), queries), - missedUpdates -> { - if (missedUpdates != null && !missedUpdates.isEmpty()) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId()); - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null); - } - }, - e -> log.error("Failed to fetch missed updates.", e), - tsCallBackExecutor); } } + callback.onSuccess(); } - private TbProtoQueueMsg toProto(TbSubscription subscription, List updates) { - return toProto(subscription, updates, true); + private void onAlarmSubUpdate(String targetServiceId, EntityId entityId, AlarmInfo alarm, boolean deleted) { + if (alarm == null) { + log.warn("[{}] empty alarm update!", entityId); + return; + } + if (serviceId.equals(targetServiceId)) { + log.trace("[{}] Forwarding to local service: {} deleted: {}", entityId, alarm, deleted); + localSubscriptionService.onAlarmUpdate(entityId, alarm, deleted, TbCallback.EMPTY); + } else { + sendCoreNotification(targetServiceId, entityId, + TbSubscriptionUtils.toAlarmSubUpdateToProto(entityId, alarm, deleted)); + } } - private TbProtoQueueMsg toProto(TbSubscription subscription, List updates, boolean ignoreEmptyUpdates) { - TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder(); - - builder.setSessionId(subscription.getSessionId()); - builder.setSubscriptionId(subscription.getSubscriptionId()); - - Map> data = new TreeMap<>(); - for (TsKvEntry tsEntry : updates) { - List values = data.computeIfAbsent(tsEntry.getKey(), k -> new ArrayList<>()); - Object[] value = new Object[2]; - value[0] = tsEntry.getTs(); - value[1] = tsEntry.getValueAsString(); - values.add(value); - } + private void sendCoreNotification(String targetServiceId, EntityId entityId, ToCoreNotificationMsg msg) { + log.trace("[{}] Forwarding to remote service [{}]: {}", entityId, targetServiceId, msg); + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, targetServiceId); + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(entityId.getId(), msg); + toCoreNotificationsProducer.send(tpi, queueMsg, null); + } - data.forEach((key, value) -> { - TbSubscriptionUpdateValueListProto.Builder dataBuilder = TbSubscriptionUpdateValueListProto.newBuilder(); - dataBuilder.setKey(key); - boolean hasData = false; - for (Object v : value) { - Object[] array = (Object[]) v; - TbSubscriptionUpdateTsValue.Builder tsValueBuilder = TbSubscriptionUpdateTsValue.newBuilder(); - tsValueBuilder.setTs((long) array[0]); - String strVal = (String) array[1]; - if (strVal != null) { - hasData = true; - tsValueBuilder.setValue(strVal); + @Override + public void onNotificationUpdate(TenantId tenantId, UserId entityId, NotificationUpdate notificationUpdate, TbCallback callback) { + TbEntityRemoteSubsInfo subInfo = entitySubscriptions.get(entityId); + if (subInfo != null) { + NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate(notificationUpdate); + log.trace("[{}][{}] Handling notificationUpdate for user {}", tenantId, entityId, notificationUpdate); + for (Map.Entry entry : subInfo.getSubs().entrySet()) { + if (entry.getValue().notifications) { + onNotificationsSubUpdate(entry.getKey(), entityId, subscriptionUpdate); } - dataBuilder.addTsValue(tsValueBuilder.build()); - } - if (!ignoreEmptyUpdates || hasData) { - builder.addData(dataBuilder.build()); } - }); - - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg( - LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build()) - .build(); - return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); + } + callback.onSuccess(); } - private TbProtoQueueMsg toProto(TbSubscription subscription, AlarmInfo alarm, boolean deleted) { - TbAlarmSubscriptionUpdateProto.Builder builder = TbAlarmSubscriptionUpdateProto.newBuilder(); - - builder.setSessionId(subscription.getSessionId()); - builder.setSubscriptionId(subscription.getSubscriptionId()); - builder.setAlarm(JacksonUtil.toString(alarm)); - builder.setDeleted(deleted); - - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg( - LocalSubscriptionServiceMsgProto.newBuilder() - .setAlarmSubUpdate(builder.build()).build()) - .build(); - return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); + private void onNotificationsSubUpdate(String targetServiceId, EntityId entityId, NotificationsSubscriptionUpdate subscriptionUpdate) { + if (serviceId.equals(targetServiceId)) { + log.trace("[{}] Forwarding to local service: {}", entityId, subscriptionUpdate); + localSubscriptionService.onNotificationUpdate(entityId, subscriptionUpdate, TbCallback.EMPTY); + } else { + sendCoreNotification(targetServiceId, entityId, + TbSubscriptionUtils.notificationsSubUpdateToProto(entityId, subscriptionUpdate)); + } } private static long getLongValue(KvEntry kve) { @@ -639,4 +378,31 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } } + private static List getSubList(List ts, Set keys) { + List update = null; + for (T entry : ts) { + if (keys.contains(entry.getKey())) { + if (update == null) { + update = new ArrayList<>(ts.size()); + } + update.add(entry); + } + } + return update; + } + + private TbEntityUpdatesInfo getEntityUpdatesInfo(EntityId entityId) { + return entityUpdates.computeIfAbsent(entityId, id -> new TbEntityUpdatesInfo(initTs)); + } + + private void cleanupEntityUpdates() { + initTs = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); + int sizeBeforeCleanup = entityUpdates.size(); + entityUpdates.entrySet().removeIf(kv -> { + var v = kv.getValue(); + return initTs > v.attributesUpdateTs && initTs > v.timeSeriesUpdateTs; + }); + log.info("Removed {} old entity update records.", entityUpdates.size() - sizeBeforeCleanup); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 4b37ba26b5..b45450cec8 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -641,7 +641,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); - //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. + //Fetch the latest values for telemetry keys in case they are not copied from NoSQL to SQL DB in hybrid mode. if (!tsInSqlDB) { log.trace("[{}][{}] Going to fetch missing latest values: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); List allTsKeys = latestCmd.getKeys().stream() diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 64f1de8662..f15a25fe4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -16,85 +16,97 @@ package org.thingsboard.server.service.subscription; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.alarm.AlarmInfo; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.TbApplicationEventListener; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; @Slf4j @TbCoreComponent @Service public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionService { - private final Set currentPartitions = ConcurrentHashMap.newKeySet(); - private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); + private final ConcurrentMap>> subscriptionsBySessionId = new ConcurrentHashMap<>(); + private final ConcurrentMap subscriptionsByEntityId = new ConcurrentHashMap<>(); + private final ConcurrentMap entityUpdates = new ConcurrentHashMap<>(); - @Autowired - private PartitionService partitionService; + private final AttributesService attrService; + private final TimeseriesService tsService; + private final TbServiceInfoProvider serviceInfoProvider; + private final PartitionService partitionService; + private final TbClusterService clusterService; + private final SubscriptionManagerService subscriptionManagerService; - @Autowired - private TbClusterService clusterService; + private ExecutorService tsCallBackExecutor; - @Autowired - @Lazy - private SubscriptionManagerService subscriptionManagerService; + public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, + PartitionService partitionService, TbClusterService clusterService, + @Lazy SubscriptionManagerService subscriptionManagerService) { + this.attrService = attrService; + this.tsService = tsService; + this.serviceInfoProvider = serviceInfoProvider; + this.partitionService = partitionService; + this.clusterService = clusterService; + this.subscriptionManagerService = subscriptionManagerService; + } + private String serviceId; private ExecutorService subscriptionUpdateExecutor; - private TbApplicationEventListener partitionChangeListener = new TbApplicationEventListener<>() { - @Override - protected void onTbApplicationEvent(PartitionChangeEvent event) { - if (ServiceType.TB_CORE.equals(event.getServiceType())) { - currentPartitions.clear(); - currentPartitions.addAll(event.getPartitions()); - } - } - }; - - private TbApplicationEventListener clusterTopologyChangeListener = new TbApplicationEventListener<>() { - @Override - protected void onTbApplicationEvent(ClusterTopologyChangeEvent event) { - if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getType()))) { - /* - * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again. - * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart. - * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically - * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService. - * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache - * Since number of subscriptions is usually much less then number of devices that are pushing data. - */ - subscriptionsBySessionId.values().forEach(map -> map.values() - .forEach(sub -> pushSubscriptionToManagerService(sub, true))); - } - } - }; + private final Lock subsLock = new ReentrantLock(); @PostConstruct public void initExecutor() { subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); + serviceId = serviceInfoProvider.getServiceId(); } @PreDestroy @@ -102,79 +114,78 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer if (subscriptionUpdateExecutor != null) { subscriptionUpdateExecutor.shutdownNow(); } - } - - @Override - @EventListener(PartitionChangeEvent.class) - public void onApplicationEvent(PartitionChangeEvent event) { - partitionChangeListener.onApplicationEvent(event); + if (tsCallBackExecutor != null) { + tsCallBackExecutor.shutdownNow(); + } } @Override @EventListener(ClusterTopologyChangeEvent.class) public void onApplicationEvent(ClusterTopologyChangeEvent event) { - clusterTopologyChangeListener.onApplicationEvent(event); + if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getType()))) { + /* + * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again. + * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart. + * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically + * It is also cheaper than caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService. + * Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache + * Since number of subscriptions is usually much less than number of devices that are pushing data. + */ + subscriptionsByEntityId.values().forEach(sub -> pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED))); + } } - //TODO 3.1: replace null callbacks with callbacks from websocket service. @Override - public void addSubscription(TbSubscription subscription) { - pushSubscriptionToManagerService(subscription, true); - registerSubscription(subscription); + public void onCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg) { + subscriptionUpdateExecutor.submit(() -> { + Set partitions = new HashSet<>(coreStartupMsg.getPartitionsList()); + AtomicInteger counter = new AtomicInteger(); + subscriptionsByEntityId.values().forEach(sub -> { + var tpi = partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()); + if (!tpi.isMyPartition() && partitions.contains(tpi.getPartition().orElse(Integer.MAX_VALUE))) { + pushToQueue(sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED), tpi); + counter.incrementAndGet(); + } + }); + log.info("[{}] Pushed {} subscriptions to [{}]", serviceId, counter.get(), coreStartupMsg.getServiceId()); + }); } - private void pushSubscriptionToManagerService(TbSubscription subscription, boolean pushToLocalService) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); - if (currentPartitions.contains(tpi)) { - // Subscription is managed on the same server; - if (pushToLocalService) { - subscriptionManagerService.addSubscription(subscription, TbCallback.EMPTY); - } - } else { - // Push to the queue; - TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription); - clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); - } + @Override + public void addSubscription(TbSubscription subscription) { + TenantId tenantId = subscription.getTenantId(); + EntityId entityId = subscription.getEntityId(); + log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); + Map> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); + sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); + modifySubscription(tenantId, entityId, subscription, true); } @Override - @SuppressWarnings("unchecked") - public void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback) { - TbSubscription subscription = subscriptionsBySessionId - .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); - if (subscription != null) { - switch (subscription.getType()) { - case TIMESERIES: - TbTimeseriesSubscription tsSub = (TbTimeseriesSubscription) subscription; - update.getLatestValues().forEach((key, value) -> tsSub.getKeyStates().put(key, value)); - break; - case ATTRIBUTES: - TbAttributeSubscription attrSub = (TbAttributeSubscription) subscription; - update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value)); - break; - } - subscriptionUpdateExecutor.submit(() -> subscription.getUpdateProcessor().accept(subscription, update)); - } - callback.onSuccess(); + public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) { + UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB()); + onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); } @Override - @SuppressWarnings("unchecked") - public void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback) { - TbSubscription subscription = subscriptionsBySessionId - .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); - if (subscription != null && subscription.getType() == TbSubscriptionType.ALARMS) { - subscriptionUpdateExecutor.submit(() -> subscription.getUpdateProcessor().accept(subscription, update)); - } - callback.onSuccess(); + public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback); } - @Override - public void onSubscriptionUpdate(String sessionId, int subscriptionId, NotificationsSubscriptionUpdate update, TbCallback callback) { - TbSubscription subscription = subscriptionsBySessionId.getOrDefault(sessionId, Collections.emptyMap()).get(subscriptionId); - if (subscription != null && (subscription.getType() == TbSubscriptionType.NOTIFICATIONS - || subscription.getType() == TbSubscriptionType.NOTIFICATIONS_COUNT)) { - subscriptionUpdateExecutor.submit(() -> subscription.getUpdateProcessor().accept(subscription, update)); + public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + entityUpdates.put(entityId, entityUpdatesInfo); + Set> pendingSubs = null; + subsLock.lock(); + try { + TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId); + if (entitySubs != null) { + pendingSubs = entitySubs.clearPendingSubscriptions(seqNumber); + } + } finally { + subsLock.unlock(); + } + if (pendingSubs != null) { + pendingSubs.forEach(this::checkMissedUpdates); } callback.onSuccess(); } @@ -182,22 +193,14 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void cancelSubscription(String sessionId, int subscriptionId) { log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); - Map sessionSubscriptions = subscriptionsBySessionId.get(sessionId); + Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); if (sessionSubscriptions != null) { - TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); + TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); if (subscription != null) { if (sessionSubscriptions.isEmpty()) { subscriptionsBySessionId.remove(sessionId); } - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); - if (currentPartitions.contains(tpi)) { - // Subscription is managed on the same server; - subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbCallback.EMPTY); - } else { - // Push to the queue; - TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription); - clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); - } + modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); } else { log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); } @@ -208,16 +211,301 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void cancelAllSessionSubscriptions(String sessionId) { - Map subscriptions = subscriptionsBySessionId.get(sessionId); - if (subscriptions != null) { - Set toRemove = new HashSet<>(subscriptions.keySet()); - toRemove.forEach(id -> cancelSubscription(sessionId, id)); + log.debug("[{}] Going to remove session subscriptions.", sessionId); + Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); + if (sessionSubscriptions != null) { + for (TbSubscription subscription : sessionSubscriptions.values()) { + modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); + } + } else { + log.debug("[{}] No session subscriptions found!", sessionId); } } - private void registerSubscription(TbSubscription subscription) { - Map sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); - sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); + @Override + public void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto proto, TbCallback callback) { + //TODO: optimize to avoid re-wrapping from TsValueListProto -> List -> Map>. Low priority. + onTimeSeriesUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onTimeSeriesUpdate(EntityId entityId, List data, TbCallback callback) { + onTimeSeriesUpdate(entityId.getId(), data, callback); + } + + private void onTimeSeriesUpdate(UUID entityId, List data, TbCallback callback) { + entityUpdates.get(entityId).timeSeriesUpdateTs = System.currentTimeMillis(); + processSubscriptionData(entityId, + sub -> TbSubscriptionType.TIMESERIES.equals(sub.getType()), + s -> { + TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s; + List updateData = null; + if (sub.isAllKeys()) { + updateData = data; + } else { + for (TsKvEntry kv : data) { + if (sub.getKeyStates().containsKey((kv.getKey()))) { + if (updateData == null) { + updateData = new ArrayList<>(); + } + updateData.add(kv); + } + } + } + if (updateData != null) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), updateData); + update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put(key, value)); + subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, update)); + } + }, callback); + } + + @Override + public void onAttributesUpdate(TransportProtos.TbSubUpdateProto proto, TbCallback callback) { + onAttributesUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onAttributesUpdate(EntityId entityId, List data, TbCallback callback) { + onAttributesUpdate(entityId.getId(), data, callback); + } + + private void onAttributesUpdate(UUID entityId, List data, TbCallback callback) { + entityUpdates.get(entityId).attributesUpdateTs = System.currentTimeMillis(); + processSubscriptionData(entityId, + sub -> TbSubscriptionType.ATTRIBUTES.equals(sub.getType()), + s -> { + TbAttributeSubscription sub = (TbAttributeSubscription) s; + List updateData = null; + if (sub.isAllKeys()) { + updateData = data; + } else { + for (TsKvEntry kv : data) { + if (sub.getKeyStates().containsKey((kv.getKey()))) { + if (updateData == null) { + updateData = new ArrayList<>(); + } + updateData.add(kv); + } + } + } + if (updateData != null) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), updateData); + update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put(key, value)); + subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, update)); + } + }, callback); + } + + @Override + public void onAlarmUpdate(TransportProtos.TbAlarmSubUpdateProto proto, TbCallback callback) { + onAlarmUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onAlarmUpdate(EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback) { + onAlarmUpdate(entityId.getId(), new AlarmSubscriptionUpdate(alarm, deleted), callback); + } + + private void onAlarmUpdate(UUID entityId, AlarmSubscriptionUpdate update, TbCallback callback) { + processSubscriptionData(entityId, + sub -> TbSubscriptionType.ALARMS.equals(sub.getType()), + update, callback); + } + + @Override + public void onNotificationUpdate(TransportProtos.NotificationsSubUpdateProto proto, TbCallback callback) { + onNotificationUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback); + } + + @Override + public void onNotificationUpdate(EntityId entityId, NotificationsSubscriptionUpdate update, TbCallback callback) { + onNotificationUpdate(entityId.getId(), update, callback); + } + + private void onNotificationUpdate(UUID entityId, NotificationsSubscriptionUpdate update, TbCallback callback) { + processSubscriptionData(entityId, + sub -> TbSubscriptionType.NOTIFICATIONS.equals(sub.getType()) || TbSubscriptionType.NOTIFICATIONS_COUNT.equals(sub.getType()), + update, callback); + } + + @Override + @SuppressWarnings("unchecked") + public void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update, TbCallback callback) { + log.trace("[{}] Received notification request update: {}", tenantId, update); + NotificationsSubscriptionUpdate theUpdate = new NotificationsSubscriptionUpdate(update); + subscriptionsByEntityId.values().forEach(subInfo -> { + if (subInfo.isNf() && tenantId.equals(subInfo.getTenantId()) && EntityType.USER.equals(subInfo.getEntityId().getEntityType())) { + subInfo.getSubs().forEach(s -> { + TbSubscription sub = (TbSubscription) s; + subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, theUpdate)); + }); + } + }); + callback.onSuccess(); + } + + @SuppressWarnings("unchecked") + private void processSubscriptionData(UUID entityId, + Predicate> filter, + T data, + TbCallback callback) { + log.trace("[{}] Received subscription data: {}", entityId, data); + var subs = subscriptionsByEntityId.get(entityId); + if (subs != null) { + subs.getSubs().forEach(s -> { + if (filter.test(s)) { + subscriptionUpdateExecutor.submit(() -> { + TbSubscription sub = (TbSubscription) s; + sub.getUpdateProcessor().accept(sub, data); + }); + } + }); + } + callback.onSuccess(); + } + + private void processSubscriptionData(UUID entityId, + Predicate> filter, + Consumer> processor, + TbCallback callback) { + var subs = subscriptionsByEntityId.get(entityId); + if (subs != null) { + subs.getSubs().forEach(s -> { + if (filter.test(s)) { + processor.accept(s); + } + }); + } + callback.onSuccess(); + } + + private void modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription subscription, boolean add) { + TbSubscription missedUpdatesCandidate = null; + TbEntitySubEvent event; + subsLock.lock(); + try { + TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.computeIfAbsent(entityId.getId(), id -> new TbEntityLocalSubsInfo(tenantId, entityId)); + event = add ? entitySubs.add(subscription) : entitySubs.remove(subscription); + if (entitySubs.isEmpty()) { + subscriptionsByEntityId.remove(entityId.getId()); + entityUpdates.remove(entityId.getId()); + } else if (add) { + missedUpdatesCandidate = entitySubs.registerPendingSubscription(subscription, event); + } + } finally { + subsLock.unlock(); + } + if (event != null) { + log.trace("[{}][{}][{}] Event: {}", tenantId, entityId, subscription.getSubscriptionId(), event); + pushSubEventToManagerService(tenantId, entityId, event); + if (missedUpdatesCandidate != null) { + checkMissedUpdates(missedUpdatesCandidate); + } + } else { + log.trace("[{}][{}][{}] No changes detected.", tenantId, entityId, subscription.getSubscriptionId()); + } + } + + private void pushSubEventToManagerService(TenantId tenantId, EntityId entityId, TbEntitySubEvent event) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (tpi.isMyPartition()) { + // Subscription is managed on the same server; + subscriptionManagerService.onSubEvent(serviceId, event, TbCallback.EMPTY); + } else { + pushToQueue(entityId, event, tpi); + } + } + + private void pushToQueue(EntityId entityId, TbEntitySubEvent event, TopicPartitionInfo tpi) { + clusterService.pushMsgToCore(tpi, entityId.getId(), TbSubscriptionUtils.toSubEventProto(serviceId, event), null); + } + + private void checkMissedUpdates(TbSubscription subscription) { + log.trace("[{}][{}][{}] Check missed updates for subscription: {}", + subscription.getTenantId(), subscription.getEntityId(), subscription.getSubscriptionId(), subscription); + switch (subscription.getType()) { + case TIMESERIES: + handleNewTelemetrySubscription((TbTimeSeriesSubscription) subscription); + break; + case ATTRIBUTES: + handleNewAttributeSubscription((TbAttributeSubscription) subscription); + break; + } + } + + private void handleNewAttributeSubscription(TbAttributeSubscription subscription) { + log.trace("[{}][{}][{}] Processing attribute subscription for entity [{}]", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); + var entityUpdateInfo = entityUpdates.get(subscription.getEntityId().getId()); + if (entityUpdateInfo != null && entityUpdateInfo.attributesUpdateTs > 0 && subscription.getQueryTs() > entityUpdateInfo.attributesUpdateTs) { + log.trace("[{}][{}][{}] No need to check for missed updates [{}]", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); + return; + } + final Map keyStates = subscription.getKeyStates(); + DonAsynchron.withCallback(attrService.find(subscription.getTenantId(), subscription.getEntityId(), DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> { + List updates = new ArrayList<>(); + values.forEach(latestEntry -> { + if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) { + updates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); + } + }); + var missedUpdates = updates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList()); + if (!missedUpdates.isEmpty()) { + onAttributesUpdate(subscription.getEntityId(), missedUpdates, TbCallback.EMPTY); + } + }, + e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor); + } + + private void handleNewTelemetrySubscription(TbTimeSeriesSubscription subscription) { + log.trace("[{}][{}][{}] Processing telemetry subscription for entity [{}]", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); + var entityUpdateInfo = entityUpdates.get(subscription.getEntityId().getId()); + if (entityUpdateInfo != null && entityUpdateInfo.timeSeriesUpdateTs > 0 && subscription.getQueryTs() > entityUpdateInfo.timeSeriesUpdateTs) { + log.trace("[{}][{}][{}] No need to check for missed updates. time [{}][{}] diff: {}ms", + subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getQueryTs(), entityUpdateInfo.timeSeriesUpdateTs, subscription.getQueryTs() - entityUpdateInfo.timeSeriesUpdateTs); + return; + } + + long curTs = System.currentTimeMillis(); + + if (subscription.isLatestValues()) { + DonAsynchron.withCallback(tsService.findLatest(subscription.getTenantId(), subscription.getEntityId(), subscription.getKeyStates().keySet()), + missedUpdates -> { + if (missedUpdates != null && !missedUpdates.isEmpty()) { + missedUpdates = missedUpdates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList()); + if (!missedUpdates.isEmpty()) { + onTimeSeriesUpdate(subscription.getEntityId(), missedUpdates, TbCallback.EMPTY); + } + } + }, + e -> log.error("Failed to fetch missed updates.", e), + tsCallBackExecutor); + } else { + List queries = new ArrayList<>(); + subscription.getKeyStates().forEach((key, value) -> { + if (curTs > value) { + long startTs = subscription.getStartTime() > 0 ? Math.max(subscription.getStartTime(), value + 1L) : (value + 1L); + long endTs = subscription.getEndTime() > 0 ? Math.min(subscription.getEndTime(), curTs) : curTs; + queries.add(new BaseReadTsKvQuery(key, startTs, endTs, 0, 1000, Aggregation.NONE)); + } + }); + if (!queries.isEmpty()) { + DonAsynchron.withCallback(tsService.findAll(subscription.getTenantId(), subscription.getEntityId(), queries), + missedUpdates -> { + if (missedUpdates != null && !missedUpdates.isEmpty()) { + missedUpdates = missedUpdates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList()); + if (!missedUpdates.isEmpty()) { + onTimeSeriesUpdate(subscription.getEntityId(), missedUpdates, TbCallback.EMPTY); + } + } + }, + e -> log.error("Failed to fetch missed updates.", e), + tsCallBackExecutor); + } + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index 736a251dff..4f6d16e6c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.subscription; import org.springframework.context.ApplicationListener; +import org.springframework.context.event.EventListener; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -23,6 +24,8 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; @@ -31,9 +34,9 @@ import java.util.List; public interface SubscriptionManagerService extends ApplicationListener { - void addSubscription(TbSubscription subscription, TbCallback callback); + void onSubEvent(String serviceId, TbEntitySubEvent event, TbCallback empty); - void cancelSubscription(String sessionId, int subscriptionId, TbCallback callback); + void onApplicationEvent(OtherServiceShutdownEvent event); void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbCallback callback); @@ -51,6 +54,4 @@ public interface SubscriptionManagerService extends ApplicationListener scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index 3f11679849..f02a466994 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -186,6 +186,7 @@ public abstract class TbAbstractDataSubCtx sendWsMsg(sub.getSessionId(), subscriptionUpdate, keysType)) + .queryTs(createdTime) .allKeys(false) .keyStates(keyStates) .scope(scope) @@ -207,19 +208,20 @@ public abstract class TbAbstractDataSubCtx keyStates) { + private TbTimeSeriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates) { return createTsSub(entityData, subIdx, latestValues, startTs, endTs, keyStates, latestValues); } - private TbTimeseriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates, boolean resultToLatestValues) { + private TbTimeSeriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates, boolean resultToLatestValues) { log.trace("[{}][{}][{}] Creating time-series subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates); - return TbTimeseriesSubscription.builder() + return TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionRef.getSessionId()) .subscriptionId(subIdx) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityData.getEntityId()) .updateProcessor((sub, subscriptionUpdate) -> sendWsMsg(sub.getSessionId(), subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues)) + .queryTs(createdTime) .allKeys(false) .keyStates(keyStates) .latestValues(latestValues) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java index 7eba28bccd..102431f63b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java @@ -78,11 +78,14 @@ public abstract class TbAbstractSubCtx { @Setter protected volatile ScheduledFuture refreshTask; protected volatile boolean stopped; + @Getter + protected long createdTime; public TbAbstractSubCtx(String serviceId, WebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, SubscriptionServiceStatistics stats, WebSocketSessionRef sessionRef, int cmdId) { + this.createdTime = System.currentTimeMillis(); this.serviceId = serviceId; this.wsService = wsService; this.entityService = entityService; @@ -142,6 +145,7 @@ public abstract class TbAbstractSubCtx { .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) .updateProcessor((subscription, subscriptionUpdate) -> dynamicValueSubUpdate(subscription.getSessionId(), subscriptionUpdate, dynamicValueKeySubMap)) + .queryTs(createdTime) .allKeys(false) .keyStates(keyStates) .scope(TbAttributeSubscriptionScope.SERVER_SCOPE) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java index 3746758caa..2392899dc7 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAttributeSubscription.java @@ -26,6 +26,7 @@ import java.util.function.BiConsumer; public class TbAttributeSubscription extends TbSubscription { + @Getter private final long queryTs; @Getter private final boolean allKeys; @Getter private final Map keyStates; @Getter private final TbAttributeSubscriptionScope scope; @@ -33,8 +34,9 @@ public class TbAttributeSubscription extends TbSubscription, TelemetrySubscriptionUpdate> updateProcessor, - boolean allKeys, Map keyStates, TbAttributeSubscriptionScope scope) { + long queryTs, boolean allKeys, Map keyStates, TbAttributeSubscriptionScope scope) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES, updateProcessor); + this.queryTs = queryTs; this.allKeys = allKeys; this.keyStates = keyStates; this.scope = scope; diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java new file mode 100644 index 0000000000..96ea628de2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java @@ -0,0 +1,245 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Information about the local websocket subscriptions. + */ +@Slf4j +@RequiredArgsConstructor +public class TbEntityLocalSubsInfo { + + @Getter + private final TenantId tenantId; + @Getter + private final EntityId entityId; + @Getter + private final Lock lock = new ReentrantLock(); + @Getter + private final Set> subs = ConcurrentHashMap.newKeySet(); + private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo(); + + private final Map>> pendingSubs = new ConcurrentHashMap<>(); + @Getter + @Setter + private int pendingTimeSeriesEvent; + @Getter + @Setter + private long pendingTimeSeriesEventTs; + @Getter + @Setter + private int pendingAttributesEvent; + @Getter + @Setter + private long pendingAttributesEventTs; + + private int seqNumber = 0; + + public TbEntitySubEvent add(TbSubscription subscription) { + log.trace("[{}][{}][{}] Adding: {}", tenantId, entityId, subscription.getSubscriptionId(), subscription); + boolean created = subs.isEmpty(); + subs.add(subscription); + TbSubscriptionsInfo newState = created ? state : state.copy(); + boolean stateChanged = false; + switch (subscription.getType()) { + case NOTIFICATIONS: + case NOTIFICATIONS_COUNT: + if (!newState.notifications) { + newState.notifications = true; + stateChanged = true; + } + break; + case ALARMS: + if (!newState.alarms) { + newState.alarms = true; + stateChanged = true; + } + break; + case ATTRIBUTES: + var attrSub = (TbAttributeSubscription) subscription; + if (!newState.attrAllKeys) { + if (attrSub.isAllKeys()) { + newState.attrAllKeys = true; + stateChanged = true; + } else { + if (newState.attrKeys == null) { + newState.attrKeys = new HashSet<>(attrSub.getKeyStates().keySet()); + stateChanged = true; + } else if (newState.attrKeys.addAll(attrSub.getKeyStates().keySet())) { + stateChanged = true; + } + } + } + break; + case TIMESERIES: + var tsSub = (TbTimeSeriesSubscription) subscription; + if (!newState.tsAllKeys) { + if (tsSub.isAllKeys()) { + newState.tsAllKeys = true; + stateChanged = true; + } else { + if (newState.tsKeys == null) { + newState.tsKeys = new HashSet<>(tsSub.getKeyStates().keySet()); + stateChanged = true; + } else if (newState.tsKeys.addAll(tsSub.getKeyStates().keySet())) { + stateChanged = true; + } + } + } + break; + } + if (stateChanged) { + state = newState; + } + if (created) { + return toEvent(ComponentLifecycleEvent.CREATED); + } else if (stateChanged) { + return toEvent(ComponentLifecycleEvent.UPDATED); + } else { + return null; + } + } + + public TbEntitySubEvent remove(TbSubscription sub) { + log.trace("[{}][{}][{}] Removing: {}", tenantId, entityId, sub.getSubscriptionId(), sub); + if (!subs.remove(sub)) { + return null; + } + if (subs.isEmpty()) { + return toEvent(ComponentLifecycleEvent.DELETED); + } + TbSubscriptionsInfo oldState = state.copy(); + TbSubscriptionsInfo newState = new TbSubscriptionsInfo(); + for (TbSubscription subscription : subs) { + switch (subscription.getType()) { + case NOTIFICATIONS: + case NOTIFICATIONS_COUNT: + if (!newState.notifications) { + newState.notifications = true; + } + break; + case ALARMS: + if (!newState.alarms) { + newState.alarms = true; + } + break; + case ATTRIBUTES: + var attrSub = (TbAttributeSubscription) subscription; + if (!newState.attrAllKeys && attrSub.isAllKeys()) { + newState.attrAllKeys = true; + continue; + } + if (newState.attrKeys == null) { + newState.attrKeys = new HashSet<>(attrSub.getKeyStates().keySet()); + } else { + newState.attrKeys.addAll(attrSub.getKeyStates().keySet()); + } + break; + case TIMESERIES: + var tsSub = (TbTimeSeriesSubscription) subscription; + if (!newState.tsAllKeys && tsSub.isAllKeys()) { + newState.tsAllKeys = true; + continue; + } + if (newState.tsKeys == null) { + newState.tsKeys = new HashSet<>(tsSub.getKeyStates().keySet()); + } else { + newState.tsKeys.addAll(tsSub.getKeyStates().keySet()); + } + break; + } + } + if (newState.equals(oldState)) { + return null; + } else { + this.state = newState; + return toEvent(ComponentLifecycleEvent.UPDATED); + } + } + + public TbEntitySubEvent toEvent(ComponentLifecycleEvent type) { + seqNumber++; + var result = TbEntitySubEvent.builder().tenantId(tenantId).entityId(entityId).type(type).seqNumber(seqNumber); + if (!ComponentLifecycleEvent.DELETED.equals(type)) { + result.info(state.copy(seqNumber)); + } + return result.build(); + } + + public boolean isNf() { + return state.notifications; + } + + + public boolean isEmpty() { + return state.isEmpty(); + } + + public TbSubscription registerPendingSubscription(TbSubscription subscription, TbEntitySubEvent event) { + if (TbSubscriptionType.ATTRIBUTES.equals(subscription.getType())) { + if (event != null) { + log.trace("[{}][{}] Registering new pending attributes subscription event: {} for subscription: {}", tenantId, entityId, event.getSeqNumber(), subscription.getSubscriptionId()); + pendingAttributesEvent = event.getSeqNumber(); + pendingAttributesEventTs = System.currentTimeMillis(); + pendingSubs.computeIfAbsent(pendingAttributesEvent, e -> new HashSet<>()).add(subscription); + } else if (pendingAttributesEvent > 0) { + log.trace("[{}][{}] Registering pending attributes subscription {} for event: {} ", tenantId, entityId, subscription.getSubscriptionId(), pendingAttributesEvent); + pendingSubs.computeIfAbsent(pendingAttributesEvent, e -> new HashSet<>()).add(subscription); + } else { + return subscription; + } + } else if (subscription instanceof TbTimeSeriesSubscription) { + if (event != null) { + log.trace("[{}][{}] Registering new pending time-series subscription event: {} for subscription: {}", tenantId, entityId, event.getSeqNumber(), subscription.getSubscriptionId()); + pendingTimeSeriesEvent = event.getSeqNumber(); + pendingTimeSeriesEventTs = System.currentTimeMillis(); + pendingSubs.computeIfAbsent(pendingTimeSeriesEvent, e -> new HashSet<>()).add(subscription); + } else if (pendingTimeSeriesEvent > 0) { + log.trace("[{}][{}] Registering pending time-series subscription {} for event: {} ", tenantId, entityId, subscription.getSubscriptionId(), pendingTimeSeriesEvent); + pendingSubs.computeIfAbsent(pendingTimeSeriesEvent, e -> new HashSet<>()).add(subscription); + } else { + return subscription; + } + } + return null; + } + + public Set> clearPendingSubscriptions(int seqNumber) { + if (pendingTimeSeriesEvent == seqNumber) { + pendingTimeSeriesEvent = 0; + pendingTimeSeriesEventTs = 0L; + } else if (pendingAttributesEvent == seqNumber) { + pendingAttributesEvent = 0; + pendingAttributesEventTs = 0L; + } + return pendingSubs.remove(seqNumber); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java new file mode 100644 index 0000000000..df66b060da --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityRemoteSubsInfo.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.util.Pair; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Information about the local websocket subscriptions. + */ +@RequiredArgsConstructor +@Slf4j +public class TbEntityRemoteSubsInfo { + @Getter + private final TenantId tenantId; + @Getter + private final EntityId entityId; + @Getter + private final Map subs = new ConcurrentHashMap<>(); // By service ID + + public boolean updateAndCheckIsEmpty(String serviceId, TbEntitySubEvent event) { + var current = subs.get(serviceId); + if (current != null && current.seqNumber > event.getSeqNumber()) { + log.warn("[{}][{}] Duplicate subscription event received. Current: {}, Event: {}", + tenantId, entityId, current, event.getInfo()); + return false; + } + switch (event.getType()) { + case CREATED: + subs.put(serviceId, event.getInfo()); + break; + case UPDATED: + var newSubInfo = event.getInfo(); + if (newSubInfo.isEmpty()) { + subs.remove(serviceId); + return isEmpty(); + } else { + subs.put(serviceId, newSubInfo); + } + break; + case DELETED: + subs.remove(serviceId); + return isEmpty(); + } + return false; + } + + public boolean removeAndCheckIsEmpty(String serviceId) { + if (subs.remove(serviceId) != null) { + return subs.isEmpty(); + } else { + return false; + } + } + + public boolean isEmpty() { + return subs.isEmpty(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java new file mode 100644 index 0000000000..7e2956b5f0 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntitySubEvent.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; + +import java.util.Set; + +/** + * Information about the local websocket subscriptions. + */ +@Builder +@Data +public class TbEntitySubEvent { + + private final TenantId tenantId; + private final EntityId entityId; + private final ComponentLifecycleEvent type; + private final TbSubscriptionsInfo info; + private final int seqNumber; + + public boolean hasTsOrAttrSub() { + return info != null && (info.tsAllKeys || info.attrAllKeys || info.tsKeys != null || info.attrKeys != null); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java new file mode 100644 index 0000000000..a09da220d5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityUpdatesInfo.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class TbEntityUpdatesInfo { + + volatile long attributesUpdateTs; + volatile long timeSeriesUpdateTs; + + public TbEntityUpdatesInfo(long ts) { + this.attributesUpdateTs = ts; + this.timeSeriesUpdateTs = ts; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java index 7685c8fbed..d9fc87f51b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java @@ -15,28 +15,54 @@ */ package org.thingsboard.server.service.subscription; +import org.thingsboard.server.common.data.alarm.AlarmInfo; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; +import java.util.List; + public interface TbLocalSubscriptionService { - void addSubscription(TbSubscription subscription); + void addSubscription(TbSubscription subscription); + + void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); + + void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); void cancelSubscription(String sessionId, int subscriptionId); void cancelAllSessionSubscriptions(String sessionId); - void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback); + void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback); + + void onTimeSeriesUpdate(EntityId entityId, List update, TbCallback callback); + + void onAttributesUpdate(TransportProtos.TbSubUpdateProto attrUpdate, TbCallback callback); - void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback); + void onAttributesUpdate(EntityId entityId, List update, TbCallback callback); - void onSubscriptionUpdate(String sessionId, int subscriptionId, NotificationsSubscriptionUpdate update, TbCallback callback); + void onAlarmUpdate(EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback); - void onApplicationEvent(PartitionChangeEvent event); + void onAlarmUpdate(TransportProtos.TbAlarmSubUpdateProto update, TbCallback callback); + + void onNotificationUpdate(EntityId entityId, NotificationsSubscriptionUpdate subscriptionUpdate, TbCallback callback); void onApplicationEvent(ClusterTopologyChangeEvent event); + + void onCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg); + + void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update, TbCallback callback); + + void onNotificationUpdate(TransportProtos.NotificationsSubUpdateProto notificationsUpdate, TbCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java index dfad6a41de..1cda590a1f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java @@ -33,7 +33,7 @@ public abstract class TbSubscription { private final TenantId tenantId; private final EntityId entityId; private final TbSubscriptionType type; - private final BiConsumer, T> updateProcessor; + private final BiConsumer, T> updateProcessor; @Override public boolean equals(Object o) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 8905b1b3ad..2fd49f0645 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.subscription; +import org.apache.commons.lang3.StringUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.id.EntityId; @@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; @@ -39,29 +41,20 @@ import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgPr import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue; +import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; -import org.thingsboard.server.service.ws.notification.sub.NotificationsCountSubscription; -import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscription; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; -import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -69,172 +62,118 @@ import java.util.UUID; public class TbSubscriptionUtils { - public static ToCoreMsg toNewSubscriptionProto(TbSubscription subscription) { + public static ToCoreMsg toSubEventProto(String serviceId, TbEntitySubEvent event) { SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); - TbSubscriptionProto subscriptionProto = TbSubscriptionProto.newBuilder() - .setServiceId(subscription.getServiceId()) - .setSessionId(subscription.getSessionId()) - .setSubscriptionId(subscription.getSubscriptionId()) - .setTenantIdMSB(subscription.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(subscription.getTenantId().getId().getLeastSignificantBits()) - .setEntityType(subscription.getEntityId().getEntityType().name()) - .setEntityIdMSB(subscription.getEntityId().getId().getMostSignificantBits()) - .setEntityIdLSB(subscription.getEntityId().getId().getLeastSignificantBits()).build(); - - switch (subscription.getType()) { - case TIMESERIES: - TbTimeseriesSubscription tSub = (TbTimeseriesSubscription) subscription; - TbTimeSeriesSubscriptionProto.Builder tSubProto = TbTimeSeriesSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setAllKeys(tSub.isAllKeys()); - tSub.getKeyStates().forEach((key, value) -> tSubProto.addKeyStates( - TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build())); - tSubProto.setStartTime(tSub.getStartTime()); - tSubProto.setEndTime(tSub.getEndTime()); - tSubProto.setLatestValues(tSub.isLatestValues()); - msgBuilder.setTelemetrySub(tSubProto.build()); - break; - case ATTRIBUTES: - TbAttributeSubscription aSub = (TbAttributeSubscription) subscription; - TbAttributeSubscriptionProto.Builder aSubProto = TbAttributeSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setAllKeys(aSub.isAllKeys()) - .setScope(aSub.getScope().name()); - aSub.getKeyStates().forEach((key, value) -> aSubProto.addKeyStates( - TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build())); - msgBuilder.setAttributeSub(aSubProto.build()); - break; - case ALARMS: - TbAlarmsSubscription alarmSub = (TbAlarmsSubscription) subscription; - TransportProtos.TbAlarmSubscriptionProto.Builder alarmSubProto = TransportProtos.TbAlarmSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setTs(alarmSub.getTs()); - msgBuilder.setAlarmSub(alarmSubProto.build()); - break; - case NOTIFICATIONS: - NotificationsSubscription notificationsSub = (NotificationsSubscription) subscription; - msgBuilder.setNotificationsSub(TransportProtos.NotificationsSubscriptionProto.newBuilder() - .setSub(subscriptionProto) - .setLimit(notificationsSub.getLimit())); - break; - case NOTIFICATIONS_COUNT: - NotificationsCountSubscription notificationsCountSub = (NotificationsCountSubscription) subscription; - msgBuilder.setNotificationsCountSub(TransportProtos.NotificationsCountSubscriptionProto.newBuilder() - .setSub(subscriptionProto)); - break; + var builder = TbEntitySubEventProto.newBuilder() + .setServiceId(serviceId) + .setSeqNumber(event.getSeqNumber()) + .setTenantIdMSB(event.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(event.getTenantId().getId().getLeastSignificantBits()) + .setEntityType(event.getEntityId().getEntityType().name()) + .setEntityIdMSB(event.getEntityId().getId().getMostSignificantBits()) + .setEntityIdLSB(event.getEntityId().getId().getLeastSignificantBits()) + .setType(event.getType().name()); + TbSubscriptionsInfo info = event.getInfo(); + if (info != null) { + builder.setNotifications(info.notifications) + .setAlarms(info.alarms) + .setTsAllKeys(info.tsAllKeys) + .setAttrAllKeys(info.attrAllKeys); + if (info.tsKeys != null) { + builder.addAllTsKeys(info.tsKeys); + } + if (info.attrKeys != null) { + builder.addAllAttrKeys(info.attrKeys); + } } - return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); + msgBuilder.setSubEvent(builder); + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build(); } - public static ToCoreMsg toCloseSubscriptionProto(TbSubscription subscription) { - SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); - TbSubscriptionCloseProto closeProto = TbSubscriptionCloseProto.newBuilder() - .setSessionId(subscription.getSessionId()) - .setSubscriptionId(subscription.getSubscriptionId()).build(); - msgBuilder.setSubClose(closeProto); - return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); + public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) { + TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder() + .setEntityIdMSB(id.getMostSignificantBits()) + .setEntityIdLSB(id.getLeastSignificantBits()) + .setSeqNumber(seqNumber) + .setAttributesUpdateTs(update.attributesUpdateTs) + .setTimeSeriesUpdateTs(update.timeSeriesUpdateTs); + return ToCoreNotificationMsg.newBuilder() + .setToLocalSubscriptionServiceMsg( + TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() + .setSubEventCallback(updateProto) + .build()) + .build(); } - public static TbSubscription fromProto(TbAttributeSubscriptionProto attributeSub) { - TbSubscriptionProto subProto = attributeSub.getSub(); - TbAttributeSubscription.TbAttributeSubscriptionBuilder builder = TbAttributeSubscription.builder() - .serviceId(subProto.getServiceId()) - .sessionId(subProto.getSessionId()) - .subscriptionId(subProto.getSubscriptionId()) - .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB()))) - .tenantId(TenantId.fromUUID(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB()))); - - builder.scope(TbAttributeSubscriptionScope.valueOf(attributeSub.getScope())); - builder.allKeys(attributeSub.getAllKeys()); - Map keyStates = new HashMap<>(); - attributeSub.getKeyStatesList().forEach(ksProto -> keyStates.put(ksProto.getKey(), ksProto.getTs())); - builder.keyStates(keyStates); - return builder.build(); - } - public static TbSubscription fromProto(TbTimeSeriesSubscriptionProto telemetrySub) { - TbSubscriptionProto subProto = telemetrySub.getSub(); - TbTimeseriesSubscription.TbTimeseriesSubscriptionBuilder builder = TbTimeseriesSubscription.builder() - .serviceId(subProto.getServiceId()) - .sessionId(subProto.getSessionId()) - .subscriptionId(subProto.getSubscriptionId()) - .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB()))) - .tenantId(TenantId.fromUUID(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB()))); - - builder.allKeys(telemetrySub.getAllKeys()); - Map keyStates = new HashMap<>(); - telemetrySub.getKeyStatesList().forEach(ksProto -> keyStates.put(ksProto.getKey(), ksProto.getTs())); - builder.startTime(telemetrySub.getStartTime()); - builder.endTime(telemetrySub.getEndTime()); - builder.latestValues(telemetrySub.getLatestValues()); - builder.keyStates(keyStates); + public static TbEntitySubEvent fromProto(TbEntitySubEventProto proto) { + ComponentLifecycleEvent event = ComponentLifecycleEvent.valueOf(proto.getType()); + var builder = TbEntitySubEvent.builder() + .tenantId(TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()))) + .seqNumber(proto.getSeqNumber()) + .entityId(EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()))) + .type(event); + if (!ComponentLifecycleEvent.DELETED.equals(event)) { + builder.info(new TbSubscriptionsInfo(proto.getNotifications(), proto.getAlarms(), + proto.getTsAllKeys(), proto.getTsKeysCount() > 0 ? new HashSet<>(proto.getTsKeysList()) : null, + proto.getAttrAllKeys(), proto.getAttrKeysCount() > 0 ? new HashSet<>(proto.getAttrKeysList()) : null, + proto.getSeqNumber())); + } return builder.build(); } - public static TbSubscription fromProto(TransportProtos.TbAlarmSubscriptionProto alarmSub) { - TbSubscriptionProto subProto = alarmSub.getSub(); - TbAlarmsSubscription.TbAlarmsSubscriptionBuilder builder = TbAlarmsSubscription.builder() - .serviceId(subProto.getServiceId()) - .sessionId(subProto.getSessionId()) - .subscriptionId(subProto.getSubscriptionId()) - .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB()))) - .tenantId(TenantId.fromUUID(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB()))); - builder.ts(alarmSub.getTs()); - return builder.build(); + public static AlarmSubscriptionUpdate fromProto(TransportProtos.TbAlarmSubUpdateProto proto) { + if (proto.getErrorCode() > 0) { + return new AlarmSubscriptionUpdate(SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); + } else { + AlarmInfo alarm = JacksonUtil.fromString(proto.getAlarm(), AlarmInfo.class); + return new AlarmSubscriptionUpdate(alarm, proto.getDeleted()); + } } - public static NotificationsSubscription fromProto(TransportProtos.NotificationsSubscriptionProto notificationsSub) { - TbSubscriptionProto sub = notificationsSub.getSub(); - return NotificationsSubscription.builder() - .serviceId(sub.getServiceId()) - .sessionId(sub.getSessionId()) - .subscriptionId(sub.getSubscriptionId()) - .tenantId(TenantId.fromUUID(new UUID(sub.getTenantIdMSB(), sub.getTenantIdLSB()))) - .entityId(EntityIdFactory.getByTypeAndUuid(sub.getEntityType(), new UUID(sub.getEntityIdMSB(), sub.getEntityIdLSB()))) - .limit(notificationsSub.getLimit()) - .build(); + public static NotificationsSubscriptionUpdate fromProto(TransportProtos.NotificationsSubUpdateProto proto) { + NotificationsSubscriptionUpdate update; + if (StringUtils.isNotEmpty(proto.getNotificationUpdate())) { + NotificationUpdate notificationUpdate = JacksonUtil.fromString(proto.getNotificationUpdate(), NotificationUpdate.class); + update = new NotificationsSubscriptionUpdate(notificationUpdate); + } else { + NotificationRequestUpdate notificationRequestUpdate = JacksonUtil.fromString(proto.getNotificationRequestUpdate(), NotificationRequestUpdate.class); + update = new NotificationsSubscriptionUpdate(notificationRequestUpdate); + } + return update; } - public static NotificationsCountSubscription fromProto(TransportProtos.NotificationsCountSubscriptionProto notificationsCountSub) { - TbSubscriptionProto sub = notificationsCountSub.getSub(); - return NotificationsCountSubscription.builder() - .serviceId(sub.getServiceId()) - .sessionId(sub.getSessionId()) - .subscriptionId(sub.getSubscriptionId()) - .tenantId(TenantId.fromUUID(new UUID(sub.getTenantIdMSB(), sub.getTenantIdLSB()))) - .entityId(EntityIdFactory.getByTypeAndUuid(sub.getEntityType(), new UUID(sub.getEntityIdMSB(), sub.getEntityIdLSB()))) + public static ToCoreNotificationMsg toAlarmSubUpdateToProto(EntityId entityId, AlarmInfo alarmInfo, boolean deleted) { + TransportProtos.TbAlarmSubUpdateProto.Builder updateProto = TransportProtos.TbAlarmSubUpdateProto.newBuilder() + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()) + .setAlarm(JacksonUtil.toString(alarmInfo)) + .setDeleted(deleted); + return ToCoreNotificationMsg.newBuilder() + .setToLocalSubscriptionServiceMsg( + TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() + .setAlarmUpdate(updateProto) + .build()) .build(); } - public static TelemetrySubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) { - if (proto.getErrorCode() > 0) { - return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); - } else { - Map> data = new TreeMap<>(); - proto.getDataList().forEach(v -> { - List values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>()); - for (int i = 0; i < v.getTsValueCount(); i++) { - Object[] value = new Object[2]; - TbSubscriptionUpdateTsValue tsValue = v.getTsValue(i); - value[0] = tsValue.getTs(); - value[1] = tsValue.hasValue() ? tsValue.getValue() : null; - values.add(value); - } - }); - return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), data); + public static ToCoreNotificationMsg notificationsSubUpdateToProto(EntityId entityId, NotificationsSubscriptionUpdate update) { + TransportProtos.NotificationsSubUpdateProto.Builder updateProto = TransportProtos.NotificationsSubUpdateProto.newBuilder() + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + if (update.getNotificationUpdate() != null) { + updateProto.setNotificationUpdate(JacksonUtil.toString(update.getNotificationUpdate())); } - } - - public static AlarmSubscriptionUpdate fromProto(TransportProtos.TbAlarmSubscriptionUpdateProto proto) { - if (proto.getErrorCode() > 0) { - return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); - } else { - AlarmInfo alarm = JacksonUtil.fromString(proto.getAlarm(), AlarmInfo.class); - return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), alarm, proto.getDeleted()); + if (update.getNotificationRequestUpdate() != null) { + updateProto.setNotificationRequestUpdate(JacksonUtil.toString(update.getNotificationRequestUpdate())); } + return ToCoreNotificationMsg.newBuilder() + .setToLocalSubscriptionServiceMsg(TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() + .setNotificationsUpdate(updateProto) + .build()) + .build(); } - public static ToCoreMsg toTimeseriesUpdateProto(TenantId tenantId, EntityId entityId, List ts) { TbTimeSeriesUpdateProto.Builder builder = TbTimeSeriesUpdateProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); @@ -317,6 +256,31 @@ public class TbSubscriptionUtils { return TsKvProto.newBuilder().setTs(ts).setKv(dataBuilder); } + private static TransportProtos.TsValueProto toTsValueProto(long ts, KvEntry attr) { + TransportProtos.TsValueProto.Builder dataBuilder = TransportProtos.TsValueProto.newBuilder(); + dataBuilder.setTs(ts); + dataBuilder.setType(KeyValueType.forNumber(attr.getDataType().ordinal())); + switch (attr.getDataType()) { + case BOOLEAN: + attr.getBooleanValue().ifPresent(dataBuilder::setBoolV); + break; + case LONG: + attr.getLongValue().ifPresent(dataBuilder::setLongV); + break; + case DOUBLE: + attr.getDoubleValue().ifPresent(dataBuilder::setDoubleV); + break; + case JSON: + attr.getJsonValue().ifPresent(dataBuilder::setJsonV); + break; + case STRING: + attr.getStrValue().ifPresent(dataBuilder::setStringV); + break; + } + return dataBuilder.build(); + } + + public static EntityId toEntityId(String entityType, long entityIdMSB, long entityIdLSB) { return EntityIdFactory.getByTypeAndUuid(entityType, new UUID(entityIdMSB, entityIdLSB)); } @@ -356,6 +320,35 @@ public class TbSubscriptionUtils { return entry; } + public static List toTsKvEntityList(String key, List dataList) { + List result = new ArrayList<>(dataList.size()); + dataList.forEach(proto -> result.add(new BasicTsKvEntry(proto.getTs(), getKvEntry(key, proto)))); + return result; + } + + private static KvEntry getKvEntry(String key, TransportProtos.TsValueProto proto) { + KvEntry entry = null; + DataType type = DataType.values()[proto.getType().getNumber()]; + switch (type) { + case BOOLEAN: + entry = new BooleanDataEntry(key, proto.getBoolV()); + break; + case LONG: + entry = new LongDataEntry(key, proto.getLongV()); + break; + case DOUBLE: + entry = new DoubleDataEntry(key, proto.getDoubleV()); + break; + case STRING: + entry = new StringDataEntry(key, proto.getStringV()); + break; + case JSON: + entry = new JsonDataEntry(key, proto.getJsonV()); + break; + } + return entry; + } + public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, AlarmInfo alarm) { TbAlarmUpdateProto.Builder builder = TbAlarmUpdateProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); @@ -382,23 +375,6 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); } - public static ToCoreNotificationMsg notificationsSubUpdateToProto(TbSubscription subscription, NotificationsSubscriptionUpdate update) { - TransportProtos.NotificationsSubscriptionUpdateProto.Builder updateProto = TransportProtos.NotificationsSubscriptionUpdateProto.newBuilder() - .setSessionId(subscription.getSessionId()) - .setSubscriptionId(subscription.getSubscriptionId()); - if (update.getNotificationUpdate() != null) { - updateProto.setNotificationUpdate(JacksonUtil.toString(update.getNotificationUpdate())); - } - if (update.getNotificationRequestUpdate() != null) { - updateProto.setNotificationRequestUpdate(JacksonUtil.toString(update.getNotificationRequestUpdate())); - } - return ToCoreNotificationMsg.newBuilder() - .setToLocalSubscriptionServiceMsg(TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() - .setNotificationsSubUpdate(updateProto) - .build()) - .build(); - } - public static ToCoreMsg notificationUpdateToProto(TenantId tenantId, UserId recipientId, NotificationUpdate notificationUpdate) { TransportProtos.NotificationUpdateProto updateProto = TransportProtos.NotificationUpdateProto.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) @@ -427,4 +403,40 @@ public class TbSubscriptionUtils { .build(); } + public static List fromProto(TransportProtos.TbSubUpdateProto proto) { + List result = new ArrayList<>(); + for (var p : proto.getDataList()) { + result.addAll(toTsKvEntityList(p.getKey(), p.getTsValueList())); + } + return result; + } + + static ToCoreNotificationMsg toProto(boolean timeSeries, EntityId entityId, List updates) { + TransportProtos.TbSubUpdateProto.Builder builder = TransportProtos.TbSubUpdateProto.newBuilder(); + + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + + Map> data = new TreeMap<>(); + + for (TsKvEntry tsEntry : updates) { + data.computeIfAbsent(tsEntry.getKey(), k -> new ArrayList<>()).add(toTsValueProto(tsEntry.getTs(), tsEntry)); + } + + data.forEach((key, value) -> { + TransportProtos.TsValueListProto.Builder dataBuilder = TransportProtos.TsValueListProto.newBuilder(); + dataBuilder.setKey(key); + dataBuilder.addAllTsValue(value); + builder.addData(dataBuilder.build()); + }); + + var result = TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder(); + if (timeSeries) { + result.setTsUpdate(builder); + } else { + result.setAttrUpdate(builder); + } + return ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(result).build(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java new file mode 100644 index 0000000000..93b1fb0857 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Information about the local websocket subscriptions. + */ +@RequiredArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(exclude = {"seqNumber"}) +@ToString +public class TbSubscriptionsInfo { + + protected boolean notifications; + protected boolean alarms; + protected boolean tsAllKeys; + protected Set tsKeys; + protected boolean attrAllKeys; + protected Set attrKeys; + protected int seqNumber; + + public boolean isEmpty() { + return !notifications && !alarms && !tsAllKeys && !attrAllKeys && tsKeys == null && attrKeys == null; + } + + protected TbSubscriptionsInfo copy() { + return copy(0); + } + + protected TbSubscriptionsInfo copy(int seqNumber) { + return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys, attrAllKeys, attrKeys, seqNumber); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeseriesSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java similarity index 84% rename from application/src/main/java/org/thingsboard/server/service/subscription/TbTimeseriesSubscription.java rename to application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java index 400cf07e7e..d3ebe4cd3f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeseriesSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java @@ -24,8 +24,10 @@ import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpda import java.util.Map; import java.util.function.BiConsumer; -public class TbTimeseriesSubscription extends TbSubscription { +public class TbTimeSeriesSubscription extends TbSubscription { + @Getter + private final long queryTs; @Getter private final boolean allKeys; @Getter @@ -38,10 +40,11 @@ public class TbTimeseriesSubscription extends TbSubscription, TelemetrySubscriptionUpdate> updateProcessor, - boolean allKeys, Map keyStates, long startTime, long endTime, boolean latestValues) { + long queryTs, boolean allKeys, Map keyStates, long startTime, long endTime, boolean latestValues) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES, updateProcessor); + this.queryTs = queryTs; this.allKeys = allKeys; this.keyStates = keyStates; this.startTime = startTime; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 21d69d6b42..d0334f90b6 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -62,7 +62,7 @@ import org.thingsboard.server.service.subscription.TbAttributeSubscription; import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; -import org.thingsboard.server.service.subscription.TbTimeseriesSubscription; +import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription; import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.WsCmd; @@ -443,7 +443,7 @@ public class DefaultWebSocketService implements WebSocketService { } } } catch (IOException e) { - log.warn("[{}] Failed to send session close: {}", sessionRef.getSessionId(), e); + log.warn("[{}] Failed to send session close:", sessionRef.getSessionId(), e); return false; } return true; @@ -477,6 +477,7 @@ public class DefaultWebSocketService implements WebSocketService { private void handleWsAttributesSubscriptionByKeys(WebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId, List keys) { + long queryTs = System.currentTimeMillis(); FutureCallback> callback = new FutureCallback<>() { @Override public void onSuccess(List data) { @@ -495,6 +496,7 @@ public class DefaultWebSocketService implements WebSocketService { .subscriptionId(cmd.getCmdId()) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) + .queryTs(queryTs) .allKeys(false) .keyStates(subState) .scope(scope) @@ -591,7 +593,10 @@ public class DefaultWebSocketService implements WebSocketService { } private void handleWsAttributesSubscription(WebSocketSessionRef sessionRef, - AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + AttributesSubscriptionCmd cmd, + String sessionId, + EntityId entityId) { + long queryTs = System.currentTimeMillis(); FutureCallback> callback = new FutureCallback<>() { @Override public void onSuccess(List data) { @@ -609,6 +614,7 @@ public class DefaultWebSocketService implements WebSocketService { .subscriptionId(cmd.getCmdId()) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) + .queryTs(queryTs) .allKeys(true) .keyStates(subState) .updateProcessor((subscription, update) -> { @@ -664,17 +670,18 @@ public class DefaultWebSocketService implements WebSocketService { Optional> keysOptional = getKeys(cmd); if (keysOptional.isPresent()) { - handleWsTimeseriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); + handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); } else { - handleWsTimeseriesSubscription(sessionRef, cmd, sessionId, entityId); + handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId); } } } } - private void handleWsTimeseriesSubscriptionByKeys(WebSocketSessionRef sessionRef, + private void handleWsTimeSeriesSubscriptionByKeys(WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) { long startTs; + long queryTs = System.currentTimeMillis(); if (cmd.getTimeWindow() > 0) { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); @@ -682,22 +689,22 @@ public class DefaultWebSocketService implements WebSocketService { long endTs = cmd.getStartTs() + cmd.getTimeWindow(); List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); - - final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); + final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, queryTs, startTs, keys); accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, on(r -> Futures.addCallback(tsService.findAll(sessionRef.getSecurityCtx().getTenantId(), entityId, queries), callback, executor), callback::onFailure)); } else { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); startTs = System.currentTimeMillis(); log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), entityId); - final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); + final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, queryTs, startTs, keys); accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, on(r -> Futures.addCallback(tsService.findLatest(sessionRef.getSecurityCtx().getTenantId(), entityId, keys), callback, executor), callback::onFailure)); } } - private void handleWsTimeseriesSubscription(WebSocketSessionRef sessionRef, + private void handleWsTimeSeriesSubscription(WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + long queryTs = System.currentTimeMillis(); FutureCallback> callback = new FutureCallback>() { @Override public void onSuccess(List data) { @@ -705,7 +712,7 @@ public class DefaultWebSocketService implements WebSocketService { data.forEach(v -> subState.put(v.getKey(), v.getTs())); Lock subLock = new ReentrantLock(); - TbTimeseriesSubscription sub = TbTimeseriesSubscription.builder() + TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) .subscriptionId(cmd.getCmdId()) @@ -719,6 +726,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.unlock(); } }) + .queryTs(queryTs) .allKeys(true) .keyStates(subState) .build(); @@ -749,7 +757,8 @@ public class DefaultWebSocketService implements WebSocketService { on(r -> Futures.addCallback(tsService.findAllLatest(sessionRef.getSecurityCtx().getTenantId(), entityId), callback, executor), callback::onFailure)); } - private FutureCallback> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List keys) { + private FutureCallback> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, + final String sessionId, final EntityId entityId, final long queryTs, final long startTs, final List keys) { return new FutureCallback<>() { @Override public void onSuccess(List data) { @@ -758,7 +767,7 @@ public class DefaultWebSocketService implements WebSocketService { data.forEach(v -> subState.put(v.getKey(), v.getTs())); Lock subLock = new ReentrantLock(); - TbTimeseriesSubscription sub = TbTimeseriesSubscription.builder() + TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) .subscriptionId(cmd.getCmdId()) @@ -772,6 +781,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.unlock(); } }) + .queryTs(queryTs) .allKeys(false) .keyStates(subState) .build(); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java index 012808b253..0937275a0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java @@ -32,6 +32,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; +import org.thingsboard.server.service.subscription.TbSubscription; import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd; @@ -119,7 +120,8 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH /* Notifications subscription update handling */ - private void handleNotificationsSubscriptionUpdate(NotificationsSubscription subscription, NotificationsSubscriptionUpdate subscriptionUpdate) { + private void handleNotificationsSubscriptionUpdate(TbSubscription sub, NotificationsSubscriptionUpdate subscriptionUpdate) { + NotificationsSubscription subscription = (NotificationsSubscription) sub; try { if (subscriptionUpdate.getNotificationUpdate() != null) { handleNotificationUpdate(subscription, subscriptionUpdate.getNotificationUpdate()); @@ -178,7 +180,8 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH /* Notifications count subscription update handling */ - private void handleNotificationsCountSubscriptionUpdate(NotificationsCountSubscription subscription, NotificationsSubscriptionUpdate subscriptionUpdate) { + private void handleNotificationsCountSubscriptionUpdate(TbSubscription sub, NotificationsSubscriptionUpdate subscriptionUpdate) { + NotificationsCountSubscription subscription = (NotificationsCountSubscription) sub; try { if (subscriptionUpdate.getNotificationUpdate() != null) { handleNotificationUpdate(subscription, subscriptionUpdate.getNotificationUpdate()); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java index cb5d140d21..5263f9d651 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java @@ -33,7 +33,7 @@ public class NotificationsCountSubscription extends TbSubscription updateProcessor) { + BiConsumer, NotificationsSubscriptionUpdate> updateProcessor) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.NOTIFICATIONS_COUNT, updateProcessor); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java index 591781a5f6..ede82f9ff7 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java @@ -43,7 +43,7 @@ public class NotificationsSubscription extends TbSubscription updateProcessor, + BiConsumer, NotificationsSubscriptionUpdate> updateProcessor, int limit) { super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.NOTIFICATIONS, updateProcessor); this.limit = limit; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java index 90041ffa13..66d18b24b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/AlarmSubscriptionUpdate.java @@ -16,13 +16,13 @@ package org.thingsboard.server.service.ws.telemetry.sub; import lombok.Getter; +import lombok.ToString; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; +@ToString public class AlarmSubscriptionUpdate { - @Getter - private int subscriptionId; @Getter private int errorCode; @Getter @@ -32,31 +32,24 @@ public class AlarmSubscriptionUpdate { @Getter private boolean alarmDeleted; - public AlarmSubscriptionUpdate(int subscriptionId, AlarmInfo alarm) { - this(subscriptionId, alarm, false); + public AlarmSubscriptionUpdate(AlarmInfo alarm) { + this(alarm, false); } - public AlarmSubscriptionUpdate(int subscriptionId, AlarmInfo alarm, boolean alarmDeleted) { + public AlarmSubscriptionUpdate(AlarmInfo alarm, boolean alarmDeleted) { super(); - this.subscriptionId = subscriptionId; this.alarm = alarm; this.alarmDeleted = alarmDeleted; } - public AlarmSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode) { - this(subscriptionId, errorCode, null); + public AlarmSubscriptionUpdate(SubscriptionErrorCode errorCode) { + this(errorCode, null); } - public AlarmSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode, String errorMsg) { + public AlarmSubscriptionUpdate(SubscriptionErrorCode errorCode, String errorMsg) { super(); - this.subscriptionId = subscriptionId; this.errorCode = errorCode.getCode(); this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg(); } - @Override - public String toString() { - return "AlarmUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + - ", alarm=" + alarm + "]"; - } } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java index 108c5cf1d4..cea1731a31 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java @@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,7 +28,7 @@ import java.util.stream.Collectors; public class TelemetrySubscriptionUpdate { - private int subscriptionId; + private final int subscriptionId; private int errorCode; private String errorMsg; private Map> data; @@ -94,7 +95,14 @@ public class TelemetrySubscriptionUpdate { @Override public String toString() { - return "TsSubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data=" - + data + "]"; + StringBuilder result = new StringBuilder("TelemetrySubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data="); + data.forEach((k, v) -> { + result.append(k).append("=["); + for(Object a : v){ + result.append(Arrays.toString((Object[])a)).append("|"); + } + result.append("]"); + }); + return result.toString(); } } diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index 981bcab132..3d0ce3f933 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -38,6 +38,8 @@ + + diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index e81aaed39d..d2cbde34e4 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -40,6 +41,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueClusterService; +import java.util.List; import java.util.UUID; public interface TbClusterService extends TbQueueClusterService { @@ -50,6 +52,8 @@ public interface TbClusterService extends TbQueueClusterService { void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback); + void broadcastToCore(TransportProtos.ToCoreNotificationMsg msg); + void pushMsgToVersionControl(TenantId tenantId, ToVersionControlServiceMsg msg, TbQueueCallback callback); void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 60dfcae1de..db9528897f 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -246,6 +246,12 @@ message QueueDeleteMsg { string queueName = 5; } +message CoreStartupMsg { + string serviceId = 1; + repeated int32 partitions = 2; + int64 ts = 3; +} + message LwM2MRegistrationRequestMsg { string tenantId = 1; string endpoint = 2; @@ -531,6 +537,71 @@ message TransportToRuleEngineMsg { * TB Core Data Structures */ +message TbEntitySubEventProto { + string serviceId = 1; + int32 seqNumber = 2; + int64 tenantIdMSB = 3; + int64 tenantIdLSB = 4; + string entityType = 5; + int64 entityIdMSB = 6; + int64 entityIdLSB = 7; + string type = 8; + bool notifications = 9; + bool alarms = 10; + bool tsAllKeys = 11; + repeated string tsKeys = 12; + bool attrAllKeys = 13; + repeated string attrKeys = 14; +} + +message TbEntitySubEventCallbackProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + int32 seqNumber = 3; + int64 attributesUpdateTs = 4; + int64 timeSeriesUpdateTs = 5; +} + +message TsValueProto { + int64 ts = 1; + KeyValueType type = 2; + bool bool_v = 3; + int64 long_v = 4; + double double_v = 5; + string string_v = 6; + string json_v = 7; +} + +message TsValueListProto { + string key = 1; + repeated TsValueProto tsValue = 2; +} + +message TbSubUpdateProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + int32 errorCode = 3; + string errorMsg = 4; + repeated TsValueListProto data = 5; +} + +message TbAlarmSubUpdateProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + int32 errorCode = 3; + string errorMsg = 4; + string alarm = 5; + bool deleted = 6; +} + +message NotificationsSubUpdateProto { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + string notificationUpdate = 3; + string notificationRequestUpdate = 4; +} + +// DEPRECATED. FOR REMOVAL message TbSubscriptionProto { string serviceId = 1; string sessionId = 2; @@ -542,6 +613,7 @@ message TbSubscriptionProto { int64 entityIdLSB = 8; } +// DEPRECATED. FOR REMOVAL message TbTimeSeriesSubscriptionProto { TbSubscriptionProto sub = 1; bool allKeys = 2; @@ -551,6 +623,7 @@ message TbTimeSeriesSubscriptionProto { bool latestValues = 6; } +// DEPRECATED. FOR REMOVAL message TbAttributeSubscriptionProto { TbSubscriptionProto sub = 1; bool allKeys = 2; @@ -558,20 +631,24 @@ message TbAttributeSubscriptionProto { string scope = 4; } +// DEPRECATED. FOR REMOVAL message TbAlarmSubscriptionProto { TbSubscriptionProto sub = 1; int64 ts = 2; } +// DEPRECATED. FOR REMOVAL message NotificationsSubscriptionProto { TbSubscriptionProto sub = 1; int32 limit = 2; } +// DEPRECATED. FOR REMOVAL message NotificationsCountSubscriptionProto { TbSubscriptionProto sub = 1; } +// DEPRECATED. FOR REMOVAL message TbSubscriptionUpdateProto { string sessionId = 1; int32 subscriptionId = 2; @@ -580,6 +657,7 @@ message TbSubscriptionUpdateProto { repeated TbSubscriptionUpdateValueListProto data = 5; } +// DEPRECATED. FOR REMOVAL message TbAlarmSubscriptionUpdateProto { string sessionId = 1; int32 subscriptionId = 2; @@ -589,6 +667,7 @@ message TbAlarmSubscriptionUpdateProto { bool deleted = 6; } +// DEPRECATED. FOR REMOVAL message NotificationsSubscriptionUpdateProto { string sessionId = 1; int32 subscriptionId = 2; @@ -667,6 +746,7 @@ message TbTimeSeriesUpdateProto { repeated TsKvProto data = 6; } +// DEPRECATED. FOR REMOVAL message TbSubscriptionCloseProto { string sessionId = 1; int32 subscriptionId = 2; @@ -702,26 +782,32 @@ message DeviceStateServiceMsgProto { } message SubscriptionMgrMsgProto { - TbTimeSeriesSubscriptionProto telemetrySub = 1; - TbAttributeSubscriptionProto attributeSub = 2; - TbSubscriptionCloseProto subClose = 3; + TbTimeSeriesSubscriptionProto telemetrySub = 1; // DEPRECATED. FOR REMOVAL + TbAttributeSubscriptionProto attributeSub = 2; // DEPRECATED. FOR REMOVAL + TbSubscriptionCloseProto subClose = 3; // DEPRECATED. FOR REMOVAL TbTimeSeriesUpdateProto tsUpdate = 4; TbAttributeUpdateProto attrUpdate = 5; TbAttributeDeleteProto attrDelete = 6; - TbAlarmSubscriptionProto alarmSub = 7; + TbAlarmSubscriptionProto alarmSub = 7; // DEPRECATED. FOR REMOVAL TbAlarmUpdateProto alarmUpdate = 8; TbAlarmDeleteProto alarmDelete = 9; TbTimeSeriesDeleteProto tsDelete = 10; - NotificationsSubscriptionProto notificationsSub = 11; - NotificationsCountSubscriptionProto notificationsCountSub = 12; + NotificationsSubscriptionProto notificationsSub = 11; // DEPRECATED. FOR REMOVAL + NotificationsCountSubscriptionProto notificationsCountSub = 12; // DEPRECATED. FOR REMOVAL NotificationUpdateProto notificationUpdate = 13; NotificationRequestUpdateProto notificationRequestUpdate = 14; + TbEntitySubEventProto subEvent = 15; } message LocalSubscriptionServiceMsgProto { - TbSubscriptionUpdateProto subUpdate = 1; - TbAlarmSubscriptionUpdateProto alarmSubUpdate = 2; - NotificationsSubscriptionUpdateProto notificationsSubUpdate = 3; + TbSubscriptionUpdateProto subUpdate = 1; // DEPRECATED. FOR REMOVAL + TbAlarmSubscriptionUpdateProto alarmSubUpdate = 2; // DEPRECATED. FOR REMOVAL + NotificationsSubscriptionUpdateProto notificationsSubUpdate = 3; // DEPRECATED. FOR REMOVAL + TbEntitySubEventCallbackProto subEventCallback = 4; + TbSubUpdateProto tsUpdate = 5; + TbSubUpdateProto attrUpdate = 6; + TbAlarmSubUpdateProto alarmUpdate = 7; + NotificationsSubUpdateProto notificationsUpdate = 8; } message FromDeviceRPCResponseProto { @@ -992,6 +1078,7 @@ message ToCoreNotificationMsg { bytes fromEdgeSyncResponseMsg = 9; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10; NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11; + CoreStartupMsg coreStartupMsg = 12; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 5be5caf6ff..7afc40f2c2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -118,6 +118,11 @@ public class HashPartitionService implements PartitionService { } } + @Override + public List getMyPartitions(QueueKey queueKey) { + return myPartitions.get(queueKey); + } + private void doInitRuleEnginePartitions() { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index b55ba79f67..fe06db789b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -37,6 +37,8 @@ public interface PartitionService { boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId); + List getMyPartitions(QueueKey queueKey); + /** * Received from the Discovery service when network topology is changed. * @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index e99817de17..d94ff082d1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -35,10 +35,12 @@ import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent; import org.thingsboard.server.queue.util.AfterStartUp; import javax.annotation.PostConstruct; @@ -74,6 +76,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi protected final ConcurrentHashMap> delayedTasks; + private final ApplicationEventPublisher applicationEventPublisher; private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; @@ -85,8 +88,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private volatile boolean stopped = true; - public ZkDiscoveryService(TbServiceInfoProvider serviceInfoProvider, + public ZkDiscoveryService(ApplicationEventPublisher applicationEventPublisher, + TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) { + this.applicationEventPublisher = applicationEventPublisher; this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; delayedTasks = new ConcurrentHashMap<>(); @@ -141,9 +146,11 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi return; } log.info("Going to publish current server..."); - zkExecutorService.scheduleAtFixedRate(this::publishCurrentServer, 0, 1, TimeUnit.MINUTES); + publishCurrentServer(); log.info("Going to recalculate partitions..."); recalculatePartitions(); + + zkExecutorService.scheduleAtFixedRate(this::publishCurrentServer, 1, 1, TimeUnit.MINUTES); } @SneakyThrows @@ -319,6 +326,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } break; case CHILD_REMOVED: + zkExecutorService.submit(() -> applicationEventPublisher.publishEvent(new OtherServiceShutdownEvent(this, serviceId, serviceTypesList))); ScheduledFuture future = zkExecutorService.schedule(() -> { log.debug("[{}] Going to recalculate partitions due to removed node [{}]", serviceId, serviceTypesList); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java new file mode 100644 index 0000000000..5de09696c6 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/OtherServiceShutdownEvent.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery.event; + +import com.google.protobuf.ProtocolStringList; +import lombok.Getter; +import org.thingsboard.server.common.msg.queue.ServiceType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class OtherServiceShutdownEvent extends TbApplicationEvent { + + private static final long serialVersionUID = -2441739930040282254L; + + @Getter + private final String serviceId; + @Getter + private final Set serviceTypes; + + public OtherServiceShutdownEvent(Object source, String serviceId, List serviceTypes) { + super(source); + this.serviceId = serviceId; + this.serviceTypes = serviceTypes.stream().map(ServiceType::valueOf).collect(Collectors.toSet()); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java index e000735fba..d235acb4fa 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java @@ -34,6 +34,7 @@ public @interface AfterStartUp { int QUEUE_INFO_INITIALIZATION = 1; int DISCOVERY_SERVICE = 2; + int STARTUP_SERVICE = 8; int ACTOR_SYSTEM = 9; int REGULAR_SERVICE = 10; diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java index a8810efd0e..3af4683ab3 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -26,6 +26,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.transport.TransportProtos; @@ -51,6 +52,8 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class ZkDiscoveryServiceTest { + @Mock + private ApplicationEventPublisher applicationEventPublisher; @Mock private TbServiceInfoProvider serviceInfoProvider; @@ -77,7 +80,7 @@ public class ZkDiscoveryServiceTest { @Before public void setup() { - zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); + zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(applicationEventPublisher, serviceInfoProvider, partitionService)); ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 4cf6db0fc1..fdf537a817 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.sql.query; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Repository;