From 8e126e57dc8050fe33b040f501dcc786fc85c00e Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 12 Jan 2024 13:46:19 +0200 Subject: [PATCH] Refactoring and fixes for partitions recalculation --- .../queue/DefaultTbCoreConsumerService.java | 4 - .../DefaultTbRuleEngineConsumerService.java | 32 +++++-- .../processing/AbstractConsumerService.java | 90 +++++++++---------- .../TbQueueConsumerManagerTask.java | 17 ++-- .../TbRuleEngineQueueConsumerManager.java | 16 ++-- .../controller/TenantControllerTest.java | 53 +++++++++-- .../TbRuleEngineQueueConsumerManagerTest.java | 4 +- .../queue/discovery/HashPartitionService.java | 32 +++++-- .../queue/discovery/PartitionService.java | 4 +- .../service/DefaultTransportService.java | 7 +- 10 files changed, 165 insertions(+), 94 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 3df8876b60..8e5fde9ae8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -373,10 +373,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService 0 ? RpcError.values()[proto.getError()] : null; @@ -164,10 +165,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< tbDeviceRpcService.processRpcResponseFromDevice(response); callback.onSuccess(); } else if (nfMsg.hasQueueUpdateMsg()) { - ctx.getScheduler().execute(() -> updateQueue(nfMsg.getQueueUpdateMsg())); + updateQueue(nfMsg.getQueueUpdateMsg()); callback.onSuccess(); } else if (nfMsg.hasQueueDeleteMsg()) { - ctx.getScheduler().execute(() -> deleteQueue(nfMsg.getQueueDeleteMsg())); + deleteQueue(nfMsg.getQueueDeleteMsg()); callback.onSuccess(); } else { log.trace("Received notification with missing handler"); @@ -204,13 +205,30 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); var consumerManager = consumers.remove(queueKey); if (consumerManager != null) { - consumerManager.delete(); + consumerManager.delete(true); } partitionService.removeQueue(queueDeleteMsg); partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } + @EventListener + public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) { + if (event.getEntityId().getEntityType() == EntityType.TENANT) { + if (event.getEvent() == ComponentLifecycleEvent.DELETED) { + List toRemove = consumers.keySet().stream() + .filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId())) + .collect(Collectors.toList()); + toRemove.forEach(queueKey -> { + var consumerManager = consumers.remove(queueKey); + if (consumerManager != null) { + consumerManager.delete(false); + } + }); + } + } + } + private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) { return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 091a5a4988..a8252e7fa0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationEventPublisher; @@ -30,7 +29,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -166,57 +164,51 @@ public abstract class AbstractConsumerService actorMsgOpt = encodingService.decode(nfMsg.toByteArray()); - actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg)); - } - - protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) { - if (actorMsg instanceof ComponentLifecycleMsg) { - ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; - log.debug("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(), - componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent()); - if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId()); - tenantProfileCache.evict(tenantProfileId); + protected final void handleComponentLifecycleMsg(UUID id, ComponentLifecycleMsg componentLifecycleMsg) { + TenantId tenantId = componentLifecycleMsg.getTenantId(); + log.debug("[{}][{}][{}] Received Lifecycle event: {}", tenantId, componentLifecycleMsg.getEntityId().getEntityType(), + componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent()); + if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId()); + tenantProfileCache.evict(tenantProfileId); + if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { + apiUsageStateService.onTenantProfileUpdate(tenantProfileId); + } + } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + if (TenantId.SYS_TENANT_ID.equals(tenantId)) { + jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings); + return; + } else { + tenantProfileCache.evict(tenantId); + partitionService.evictTenantInfo(tenantId); if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { - apiUsageStateService.onTenantProfileUpdate(tenantProfileId); - } - } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - if (TenantId.SYS_TENANT_ID.equals(componentLifecycleMsg.getTenantId())) { - jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings); - return; - } else { - tenantProfileCache.evict(componentLifecycleMsg.getTenantId()); - partitionService.removeTenant(componentLifecycleMsg.getTenantId()); - if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { - apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId()); - } else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) { - apiUsageStateService.onTenantDelete((TenantId) componentLifecycleMsg.getEntityId()); - } - } - } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetProfileId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg); - } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - apiUsageStateService.onApiUsageStateUpdate(componentLifecycleMsg.getTenantId()); - } else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) { - apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId()); + apiUsageStateService.onTenantUpdate(tenantId); + } else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) { + apiUsageStateService.onTenantDelete(tenantId); + partitionService.removeTenant(tenantId); } } - eventPublisher.publishEvent(componentLifecycleMsg); + } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + assetProfileCache.evict(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + assetProfileCache.evict(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg); + } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + apiUsageStateService.onApiUsageStateUpdate(tenantId); + } else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) { + apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId()); + } } - log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, actorMsg); - actorContext.tellWithHighPriority(actorMsg); + + eventPublisher.publishEvent(componentLifecycleMsg); + log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, componentLifecycleMsg); + actorContext.tellWithHighPriority(componentLifecycleMsg); } protected abstract void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) throws Exception; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java index 6f1adbdcf2..029cc699b1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue.ruleengine; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.queue.Queue; @@ -24,24 +25,24 @@ import java.util.Set; @Getter @ToString +@AllArgsConstructor public class TbQueueConsumerManagerTask { private final QueueEvent event; private Queue queue; private Set partitions; + private boolean drainQueue; - public TbQueueConsumerManagerTask(QueueEvent event) { - this.event = event; + public static TbQueueConsumerManagerTask delete(boolean drainQueue) { + return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue); } - public TbQueueConsumerManagerTask(QueueEvent event, Queue queue) { - this.event = event; - this.queue = queue; + public static TbQueueConsumerManagerTask configUpdate(Queue queue) { + return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue, null, false); } - public TbQueueConsumerManagerTask(QueueEvent event, Set partitions) { - this.event = event; - this.partitions = partitions; + public static TbQueueConsumerManagerTask partitionChange(Set partitions) { + return new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 5a59cb124e..800f13e428 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -95,15 +95,15 @@ public class TbRuleEngineQueueConsumerManager { } public void update(Queue queue) { - addTask(new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue)); + addTask(TbQueueConsumerManagerTask.configUpdate(queue)); } public void update(Set partitions) { - addTask(new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, partitions)); + addTask(TbQueueConsumerManagerTask.partitionChange(partitions)); } - public void delete() { - addTask(new TbQueueConsumerManagerTask(QueueEvent.DELETE)); + public void delete(boolean drainQueue) { + addTask(TbQueueConsumerManagerTask.delete(drainQueue)); } private void addTask(TbQueueConsumerManagerTask todo) { @@ -138,7 +138,7 @@ public class TbRuleEngineQueueConsumerManager { } else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) { newConfiguration = task.getQueue(); } else if (task.getEvent() == QueueEvent.DELETE) { - doDelete(); + doDelete(task.isDrainQueue()); return; } } @@ -205,7 +205,7 @@ public class TbRuleEngineQueueConsumerManager { log.debug("[{}] Unsubscribed and stopped consumers", queueKey); } - private void doDelete() { + private void doDelete(boolean drainQueue) { stopped = true; log.info("[{}] Handling queue deletion", queueKey); consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion); @@ -213,7 +213,9 @@ public class TbRuleEngineQueueConsumerManager { List>> queueConsumers = consumerWrapper.getConsumers().stream() .map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList()); ctx.getConsumersExecutor().submit(() -> { - drainQueue(queueConsumers); + if (drainQueue) { + drainQueue(queueConsumers); + } queueConsumers.forEach(consumer -> { for (String topic : consumer.getFullTopicNames()) { diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index 27a9872ba5..ff6c69dec8 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantInfo; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -64,6 +65,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; import java.util.ArrayList; import java.util.Collections; @@ -71,7 +73,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -80,6 +81,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.argThat; @@ -700,6 +702,45 @@ public class TenantControllerTest extends AbstractControllerTest { }); } + @Test + public void whenTenantIsDeleted_thenDeleteQueues() throws Exception { + loginSysAdmin(); + TenantProfile tenantProfile = new TenantProfile(); + tenantProfile.setName("Test profile"); + TenantProfileData tenantProfileData = new TenantProfileData(); + tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); + tenantProfile.setProfileData(tenantProfileData); + tenantProfile.setIsolatedTbRuleEngine(true); + addQueueConfig(tenantProfile, MAIN_QUEUE_NAME); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + createDifferentTenant(); + loginSysAdmin(); + savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); + savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class); + TenantId tenantId = differentTenantId; + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull(); + }); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId); + assertThat(tpi.getTenantId()).hasValue(tenantId); + TbMsg tbMsg = publishTbMsg(tenantId, tpi); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + verify(actorContext).tell(argThat(msg -> { + return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId()); + })); + }); + + deleteDifferentTenant(); + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNull(); + assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId)) + .isInstanceOf(TenantNotFoundException.class); + + verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName())); + }); + } + private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) { TbMsg tbMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}"); TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() @@ -759,7 +800,7 @@ public class TenantControllerTest extends AbstractControllerTest { queueConfiguration.setName(queueName); queueConfiguration.setTopic(topic); queueConfiguration.setPollInterval(25); - queueConfiguration.setPartitions(1 + new Random().nextInt(99)); + queueConfiguration.setPartitions(12); queueConfiguration.setConsumerPerPartition(true); queueConfiguration.setPackProcessingTimeout(2000); SubmitStrategy submitStrategy = new SubmitStrategy(); @@ -799,20 +840,20 @@ public class TenantControllerTest extends AbstractControllerTest { ArgumentMatcher matcherTenant = cntTime == 1 ? argument -> argument.equals(tenant) : argument -> argument.getClass().equals(Tenant.class); if (ComponentLifecycleEvent.DELETED.equals(event)) { - Mockito.verify(tbClusterService, times( cntTime)).onTenantDelete(Mockito.argThat(matcherTenant), + Mockito.verify(tbClusterService, times(cntTime)).onTenantDelete(Mockito.argThat(matcherTenant), Mockito.isNull()); } else { - Mockito.verify(tbClusterService, times( cntTime)).onTenantChange(Mockito.argThat(matcherTenant), + Mockito.verify(tbClusterService, times(cntTime)).onTenantChange(Mockito.argThat(matcherTenant), Mockito.isNull()); } TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant); - testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime); + testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime); Mockito.reset(tbClusterService); } private void testBroadcastEntityStateChangeEventNeverTenant() { Mockito.verify(tbClusterService, never()).onTenantChange(Mockito.any(Tenant.class), - Mockito.isNull()); + Mockito.isNull()); testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant())); Mockito.reset(tbClusterService); } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 6acf71e2bb..16b394d914 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -447,7 +447,7 @@ public class TbRuleEngineQueueConsumerManagerTest { verifyMsgProcessed(consumer1.testMsg); verifyMsgProcessed(consumer2.testMsg); - consumerManager.delete(); + consumerManager.delete(true); await().atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> { @@ -488,7 +488,7 @@ public class TbRuleEngineQueueConsumerManagerTest { verifySubscribedAndLaunched(consumer, partitions); verifyMsgProcessed(consumer.testMsg); - consumerManager.delete(); + consumerManager.delete(true); await().atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index a02d4054ac..bfc50e076b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -35,7 +35,6 @@ import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent; import org.thingsboard.server.queue.util.AfterStartUp; import javax.annotation.PostConstruct; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -190,14 +189,25 @@ public class HashPartitionService implements PartitionService { myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); - //TODO: remove after merging tb entity services - removeTenant(tenantId); - + evictTenantInfo(tenantId); if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet())); } } + @Override + public void removeTenant(TenantId tenantId) { + List queueKeys = partitionSizesMap.keySet().stream() + .filter(queueKey -> tenantId.equals(queueKey.getTenantId())) + .collect(Collectors.toList()); + queueKeys.forEach(queueKey -> { + myPartitions.remove(queueKey); + partitionTopicsMap.remove(queueKey); + partitionSizesMap.remove(queueKey); + }); + evictTenantInfo(tenantId); + } + @Override public boolean isManagedByCurrentService(TenantId tenantId) { Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); @@ -258,6 +268,7 @@ public class HashPartitionService implements PartitionService { @Override public synchronized void recalculatePartitions(ServiceInfo currentService, List otherServices) { + log.info("Recalculating partitions"); tbTransportServicesByType.clear(); responsibleServices.clear(); logServiceInfo(currentService); @@ -274,9 +285,14 @@ public class HashPartitionService implements PartitionService { final ConcurrentMap> newPartitions = new ConcurrentHashMap<>(); partitionSizesMap.forEach((queueKey, size) -> { for (int i = 0; i < size; i++) { - ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); - if (currentService.equals(serviceInfo)) { - newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + try { + ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); + log.trace("Server responsible for {}[{}] - {}", queueKey, i, serviceInfo != null ? serviceInfo.getServiceId() : "none"); + if (currentService.equals(serviceInfo)) { + newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + } + } catch (Exception e) { + log.warn("Failed to resolve server responsible for {}[{}]", queueKey, i, e); } } }); @@ -399,7 +415,7 @@ public class HashPartitionService implements PartitionService { } @Override - public void removeTenant(TenantId tenantId) { + public void evictTenantInfo(TenantId tenantId) { tenantRoutingInfoMap.remove(tenantId); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index fe06db789b..e9049a8085 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -59,7 +59,7 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); - void removeTenant(TenantId tenantId); + void evictTenantInfo(TenantId tenantId); int countTransportsByType(String type); @@ -67,6 +67,8 @@ public interface PartitionService { void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + void removeTenant(TenantId tenantId); + boolean isManagedByCurrentService(TenantId tenantId); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index c4840b2402..6f962ca269 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -96,6 +96,7 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -1001,8 +1002,8 @@ public class DefaultTransportService implements TransportService { Optional profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); if (profileOpt.isPresent()) { Tenant tenant = profileOpt.get(); - partitionService.removeTenant(tenant.getId()); boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); + partitionService.evictTenantInfo(tenant.getId()); if (updated) { rateLimitService.update(tenant.getId()); } @@ -1027,7 +1028,9 @@ public class DefaultTransportService implements TransportService { } else if (EntityType.TENANT_PROFILE.equals(entityType)) { tenantProfileCache.remove(new TenantProfileId(entityUuid)); } else if (EntityType.TENANT.equals(entityType)) { - rateLimitService.remove(TenantId.fromUUID(entityUuid)); + TenantId tenantId = TenantId.fromUUID(entityUuid); + rateLimitService.remove(tenantId); + partitionService.removeTenant(tenantId); } else if (EntityType.DEVICE.equals(entityType)) { rateLimitService.remove(new DeviceId(entityUuid)); onDeviceDeleted(new DeviceId(entityUuid));