From 492dc5916e8a273ff56f4505210cd80071076a2b Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 13 Feb 2024 11:07:34 +0200 Subject: [PATCH] Improvements for tenant actors init with dedicated rule engines --- .../server/actors/app/AppActor.java | 3 +- .../RuleChainActorMessageProcessor.java | 1 + .../ruleChain/RuleChainManagerActor.java | 4 ++ .../RuleNodeActorMessageProcessor.java | 9 +++- .../server/actors/tenant/TenantActor.java | 46 ++++++++++++------ .../DefaultTbRuleEngineConsumerService.java | 37 ++++++++------- .../discovery/HashPartitionServiceTest.java | 6 +-- .../DefaultTbServiceInfoProvider.java | 2 +- .../queue/discovery/HashPartitionService.java | 47 ++++++++++++------- .../queue/discovery/TenantRoutingInfo.java | 2 +- 10 files changed, 101 insertions(+), 56 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 563f381380..4ab54c0ad8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -202,8 +202,7 @@ public class AppActor extends ContextAwareActor { return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId), - () -> systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) || - systemContext.getPartitionService().isManagedByCurrentService(tenantId))); + () -> true)); } private void onToEdgeSessionMsg(EdgeSessionMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index e14ed8203b..4021bdf8ea 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -161,6 +161,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor actorRef.tellWithHighPriority(msg)); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java index 468da57e96..6f920bc9ab 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java @@ -50,6 +50,8 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { @Getter protected TbActorRef rootChainActor; + protected boolean ruleChainsInitialized; + public RuleChainManagerActor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext); this.tenantId = tenantId; @@ -57,6 +59,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { } protected void initRuleChains() { + ruleChainsInitialized = true; for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) { RuleChainId ruleChainId = ruleChain.getId(); log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId()); @@ -70,6 +73,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) { ctx.stop(new TbEntityActorId(ruleChain.getId())); } + ruleChainsInitialized = false; } protected void visit(RuleChain entity, TbActorRef actorRef) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 47de6bcdf1..df08c5a5c7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.ruleChain; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.server.actors.ActorSystemContext; @@ -39,6 +40,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; /** * @author Andrew Shvayka */ +@Slf4j public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor { private final String ruleChainName; @@ -61,6 +63,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) { - @Override - protected boolean testEntityId(EntityId entityId) { - return super.testEntityId(entityId) && !isMyPartition(entityId); - } - }); - deviceActorIds.forEach(id -> ctx.stop(id)); - } + onPartitionChangeMsg((PartitionChangeMsg) msg); break; case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); @@ -239,6 +228,35 @@ public class TenantActor extends RuleChainManagerActor { } } + private void onPartitionChangeMsg(PartitionChangeMsg msg) { + ServiceType serviceType = msg.getServiceType(); + if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { + if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + if (!ruleChainsInitialized) { + log.info("Tenant {} is already managed by this service, initializing rule chains", tenantId); + initRuleChains(); + } + } else { + if (ruleChainsInitialized) { + log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId); + destroyRuleChains(); + } + return; + } + + //To Rule Chain Actors + broadcast(msg); + } else if (ServiceType.TB_CORE.equals(serviceType)) { + List deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) { + @Override + protected boolean testEntityId(EntityId entityId) { + return super.testEntityId(entityId) && !isMyPartition(entityId); + } + }); + deviceActorIds.forEach(id -> ctx.stop(id)); + } + } + private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { if (msg.getEntityId().getEntityType().equals(EntityType.API_USAGE_STATE)) { ApiUsageState old = getApiUsageState(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index ce989c711d..3bc0af787d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -99,25 +99,28 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< List queues = queueService.findAllQueues(); for (Queue configuration : queues) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { - initConsumer(configuration); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration); + getOrCreateConsumer(queueKey).init(configuration); } } } - private void initConsumer(Queue configuration) { - getOrCreateConsumer(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration)).init(configuration); - } - @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { event.getPartitionsMap().forEach((queueKey, partitions) -> { - var consumer = consumers.get(queueKey); - if (consumer != null) { - consumer.update(partitions); - } else { - log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey); + if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { + getOrCreateConsumer(queueKey).update(partitions); } }); + consumers.keySet().stream() + .collect(Collectors.groupingBy(QueueKey::getTenantId)) + .forEach((tenantId, queueKeys) -> { + if (!partitionService.isManagedByCurrentService(tenantId)) { + queueKeys.forEach(queueKey -> { + removeConsumer(queueKey).ifPresent(TbRuleEngineQueueConsumerManager::stop); + }); + } + }); } @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @@ -211,10 +214,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< log.info("Received queue delete msg: [{}]", queueDeleteMsg); TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - var consumerManager = consumers.remove(queueKey); - if (consumerManager != null) { - consumerManager.delete(true); - } + removeConsumer(queueKey).ifPresent(consumer -> consumer.delete(true)); } partitionService.removeQueues(queueDeleteMsgs); @@ -229,10 +229,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< .filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId())) .collect(Collectors.toList()); toRemove.forEach(queueKey -> { - var consumerManager = consumers.remove(queueKey); - if (consumerManager != null) { - consumerManager.delete(false); - } + removeConsumer(queueKey).ifPresent(consumer -> consumer.delete(false)); }); } } @@ -242,6 +239,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); } + private Optional removeConsumer(QueueKey queueKey) { + return Optional.ofNullable(consumers.remove(queueKey)); + } + @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}") public void printStats() { if (ctx.isStatsEnabled()) { diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 7f8515f50b..e67bbb9506 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -156,7 +156,7 @@ public class HashPartitionServiceTest { for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "queue" + queueIndex, tenantId); for (int partition = 0; partition < partitionCount; partition++) { - ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition); + ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition, Collections.emptyMap()); String serviceId = serviceInfo.getServiceId(); map.put(serviceId, map.get(serviceId) + 1); } @@ -389,9 +389,9 @@ public class HashPartitionServiceTest { .limit(100).collect(Collectors.toList()); for (int partition = 0; partition < 10; partition++) { - ServiceInfo expectedAssignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId), partition); + ServiceInfo expectedAssignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId), partition, Collections.emptyMap()); for (QueueKey queueKey : queues) { - ServiceInfo assignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, queueKey, partition); + ServiceInfo assignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, queueKey, partition, Collections.emptyMap()); assertThat(assignedRuleEngine).as(queueKey + "[" + partition + "] should be assigned to " + expectedAssignedRuleEngine.getServiceId()) .isEqualTo(expectedAssignedRuleEngine); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index e60b5f7cc5..2dce7517bb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -81,7 +81,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { } log.info("Current Service ID: {}", serviceId); if (serviceType.equalsIgnoreCase("monolith")) { - serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values())); + serviceTypes = List.of(ServiceType.values()); } else { serviceTypes = Collections.singletonList(ServiceType.of(serviceType)); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 4630dc4bf8..72f7826c27 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -81,7 +82,7 @@ public class HashPartitionService implements PartitionService { private List currentOtherServices; private final Map> tbTransportServicesByType = new HashMap<>(); - private final Map> responsibleServices = new HashMap<>(); + private volatile Map> responsibleServices = Collections.emptyMap(); private HashFunction hashFunction; @@ -218,16 +219,26 @@ public class HashPartitionService implements PartitionService { @Override public boolean isManagedByCurrentService(TenantId tenantId) { - Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); - if (assignedTenantProfiles.isEmpty()) { - // TODO: refactor this for common servers + if (serviceInfoProvider.isService(ServiceType.TB_CORE) || !serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { return true; + } + + Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); + if (assignedTenantProfiles.isEmpty()) { // if this is regular rule engine + if (tenantId.isSysTenantId()) { + return true; + } + TenantRoutingInfo routingInfo = getRoutingInfo(tenantId); + if (routingInfo.isIsolated()) { + return CollectionsUtil.isEmpty(responsibleServices.get(routingInfo.getProfileId())); + } else { + return true; + } } else { if (tenantId.isSysTenantId()) { return false; } - TenantProfileId profileId = tenantRoutingInfoService.getRoutingInfo(tenantId).getProfileId(); - return assignedTenantProfiles.contains(profileId.getId()); + return assignedTenantProfiles.contains(getRoutingInfo(tenantId).getProfileId().getId()); } } @@ -283,14 +294,14 @@ public class HashPartitionService implements PartitionService { public synchronized void recalculatePartitions(ServiceInfo currentService, List otherServices) { log.info("Recalculating partitions"); tbTransportServicesByType.clear(); - responsibleServices.clear(); logServiceInfo(currentService); otherServices.forEach(this::logServiceInfo); Map> queueServicesMap = new HashMap<>(); - addNode(queueServicesMap, currentService); + Map> responsibleServices = new HashMap<>(); + addNode(currentService, queueServicesMap, responsibleServices); for (ServiceInfo other : otherServices) { - addNode(queueServicesMap, other); + addNode(other, queueServicesMap, responsibleServices); } queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId))); responsibleServices.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId))); @@ -299,7 +310,7 @@ public class HashPartitionService implements PartitionService { partitionSizesMap.forEach((queueKey, size) -> { for (int i = 0; i < size; i++) { try { - ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); + ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i, responsibleServices); log.trace("Server responsible for {}[{}] - {}", queueKey, i, serviceInfo != null ? serviceInfo.getServiceId() : "none"); if (currentService.equals(serviceInfo)) { newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); @@ -309,6 +320,7 @@ public class HashPartitionService implements PartitionService { } } }); + this.responsibleServices = responsibleServices; final ConcurrentMap> oldPartitions = myPartitions; myPartitions = newPartitions; @@ -474,20 +486,22 @@ public class HashPartitionService implements PartitionService { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { return false; } - TenantRoutingInfo routingInfo = tenantRoutingInfoMap.computeIfAbsent(tenantId, k -> { - return tenantRoutingInfoService.getRoutingInfo(tenantId); - }); + TenantRoutingInfo routingInfo = getRoutingInfo(tenantId); if (routingInfo == null) { throw new TenantNotFoundException(tenantId); } switch (serviceType) { case TB_RULE_ENGINE: - return routingInfo.isIsolatedTbRuleEngine(); + return routingInfo.isIsolated(); default: return false; } } + private TenantRoutingInfo getRoutingInfo(TenantId tenantId) { + return tenantRoutingInfoMap.computeIfAbsent(tenantId, tenantRoutingInfoService::getRoutingInfo); + } + private TenantId getIsolatedOrSystemTenantId(ServiceType serviceType, TenantId tenantId) { return isIsolated(serviceType, tenantId) ? tenantId : TenantId.SYS_TENANT_ID; } @@ -496,7 +510,7 @@ public class HashPartitionService implements PartitionService { log.info("[{}] Found common server: {}", server.getServiceId(), server.getServiceTypesList()); } - private void addNode(Map> queueServiceList, ServiceInfo instance) { + private void addNode(ServiceInfo instance, Map> queueServiceList, Map> responsibleServices) { for (String serviceTypeStr : instance.getServiceTypesList()) { ServiceType serviceType = ServiceType.of(serviceTypeStr); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { @@ -528,7 +542,8 @@ public class HashPartitionService implements PartitionService { } } - protected ServiceInfo resolveByPartitionIdx(List servers, QueueKey queueKey, int partition) { + protected ServiceInfo resolveByPartitionIdx(List servers, QueueKey queueKey, int partition, + Map> responsibleServices) { if (servers == null || servers.isEmpty()) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java index c221572bfe..caf4e823fd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java @@ -23,5 +23,5 @@ import org.thingsboard.server.common.data.id.TenantProfileId; public class TenantRoutingInfo { private final TenantId tenantId; private final TenantProfileId profileId; - private final boolean isolatedTbRuleEngine; + private final boolean isolated; }