From 3f2a3e35ff2ad2af9a6b20ee7dc242f0dcf8f770 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 12 Sep 2023 09:58:56 +0300 Subject: [PATCH] Fixes for partition recalculation with isolated queues --- .../DefaultTbRuleEngineConsumerService.java | 1 + .../discovery/HashPartitionServiceTest.java | 175 +++++++++++++++--- .../queue/discovery/HashPartitionService.java | 24 ++- 3 files changed, 165 insertions(+), 35 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 670dbcaa6d..f13a4ef39c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -501,6 +501,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } } + partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 7bd9ec576f..5e800d52a5 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -23,6 +23,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; @@ -35,7 +36,9 @@ import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -49,12 +52,17 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Slf4j @@ -78,15 +86,7 @@ public class HashPartitionServiceTest { applicationEventPublisher = mock(ApplicationEventPublisher.class); routingInfoService = mock(TenantRoutingInfoService.class); queueRoutingInfoService = mock(QueueRoutingInfoService.class); - clusterRoutingService = new HashPartitionService(discoveryService, - routingInfoService, - applicationEventPublisher, - queueRoutingInfoService); - ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core"); - ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 10); - ReflectionTestUtils.setField(clusterRoutingService, "vcTopic", "tb.vc"); - ReflectionTestUtils.setField(clusterRoutingService, "vcPartitions", 10); - ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); + clusterRoutingService = createPartitionService(); ServiceInfo currentServer = ServiceInfo.newBuilder() .setServiceId("tb-core-0") .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) @@ -101,8 +101,6 @@ public class HashPartitionServiceTest { .build()); } - clusterRoutingService.init(); - clusterRoutingService.partitionsInit(); clusterRoutingService.recalculatePartitions(currentServer, otherServers); } @@ -194,25 +192,12 @@ public class HashPartitionServiceTest { } List queues = new ArrayList<>(); - Queue systemQueue = new Queue(); - systemQueue.setTenantId(TenantId.SYS_TENANT_ID); - systemQueue.setName("Main"); - systemQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC); - systemQueue.setPartitions(10); - systemQueue.setId(new QueueId(UUID.randomUUID())); - queues.add(systemQueue); + queues.add(createQueue(TenantId.SYS_TENANT_ID, 10)); tenants.forEach((tenantId, profileId) -> { - Queue isolatedQueue = new Queue(); - isolatedQueue.setTenantId(tenantId); - isolatedQueue.setName("Main"); - isolatedQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC); - isolatedQueue.setPartitions(2); - isolatedQueue.setId(new QueueId(UUID.randomUUID())); - queues.add(isolatedQueue); - when(routingInfoService.getRoutingInfo(eq(tenantId))).thenReturn(new TenantRoutingInfo(tenantId, profileId, true)); + queues.add(createQueue(tenantId, 2)); + mockRoutingInfo(tenantId, profileId, true); }); - when(queueRoutingInfoService.getAllQueuesRoutingInfo()).thenReturn(queues.stream() - .map(QueueRoutingInfo::new).collect(Collectors.toList())); + mockQueues(queues); List ruleEngines = new ArrayList<>(); Map> dedicatedServers = new HashMap<>(); @@ -275,6 +260,97 @@ public class HashPartitionServiceTest { }); } + @Test + public void testPartitionChangeEvents_isolatedProfile_oneCommonServer_oneDedicated() { + ServiceInfo commonRuleEngine = ServiceInfo.newBuilder() + .setServiceId("tb-rule-engine-1") + .addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name())) + .build(); + TenantProfileId tenantProfileId = new TenantProfileId(UUID.randomUUID()); + ServiceInfo dedicatedRuleEngine = ServiceInfo.newBuilder() + .setServiceId("tb-rule-engine-isolated-1") + .addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name())) + .addAssignedTenantProfiles(tenantProfileId.toString()) + .build(); + + List queues = new ArrayList<>(); + Queue systemQueue = createQueue(TenantId.SYS_TENANT_ID, 10); + queues.add(systemQueue); + + TenantId tenantId = new TenantId(UUID.randomUUID()); + mockRoutingInfo(tenantId, tenantProfileId, false); // not isolated yet + mockQueues(queues); + + when(discoveryService.isService(eq(ServiceType.TB_RULE_ENGINE))).thenReturn(true); + Mockito.reset(applicationEventPublisher); + HashPartitionService partitionService_common = createPartitionService(); + partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); + verifyPartitionChangeEvent(event -> { + return event.getQueueKey().getTenantId().isSysTenantId() && + event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && + event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet()) + .size() == systemQueue.getPartitions(); + }); + + Mockito.reset(applicationEventPublisher); + HashPartitionService partitionService_dedicated = createPartitionService(); + partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); + verify(applicationEventPublisher, never()).publishEvent(any(PartitionChangeEvent.class)); + + + Queue isolatedQueue = createQueue(tenantId, 3); + queues.add(isolatedQueue); + mockQueues(queues); + mockRoutingInfo(tenantId, tenantProfileId, true); // making isolated + TransportProtos.QueueUpdateMsg queueUpdateMsg = TransportProtos.QueueUpdateMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setQueueIdMSB(isolatedQueue.getUuidId().getMostSignificantBits()) + .setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits()) + .setQueueName(isolatedQueue.getName()) + .setQueueTopic(isolatedQueue.getTopic()) + .setPartitions(isolatedQueue.getPartitions()) + .build(); + + partitionService_common.updateQueue(queueUpdateMsg); + partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); + // expecting event about no partitions for isolated queue key + verifyPartitionChangeEvent(event -> { + return event.getQueueKey().getTenantId().equals(tenantId) && + event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && + event.getPartitions().isEmpty(); + }); + + partitionService_dedicated.updateQueue(queueUpdateMsg); + partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); + verifyPartitionChangeEvent(event -> { + return event.getQueueKey().getTenantId().equals(tenantId) && + event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && + event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet()) + .size() == isolatedQueue.getPartitions(); + }); + + + queues = List.of(systemQueue); + mockQueues(queues); + mockRoutingInfo(tenantId, tenantProfileId, false); // turning off isolation + Mockito.reset(applicationEventPublisher); + TransportProtos.QueueDeleteMsg queueDeleteMsg = TransportProtos.QueueDeleteMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setQueueIdMSB(isolatedQueue.getUuidId().getMostSignificantBits()) + .setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits()) + .setQueueName(isolatedQueue.getName()) + .build(); + partitionService_dedicated.removeQueue(queueDeleteMsg); + partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); + verifyPartitionChangeEvent(event -> { + return event.getQueueKey().getTenantId().equals(tenantId) && + event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && + event.getPartitions().isEmpty(); + }); + } + @Test public void testIsManagedByCurrentServiceCheck() { TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID()); @@ -282,9 +358,9 @@ public class HashPartitionServiceTest { TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID()); TenantId isolatedTenantId = new TenantId(UUID.randomUUID()); - when(routingInfoService.getRoutingInfo(eq(isolatedTenantId))).thenReturn(new TenantRoutingInfo(isolatedTenantId, isolatedProfileId, true)); + mockRoutingInfo(isolatedTenantId, isolatedProfileId, true); TenantId regularTenantId = new TenantId(UUID.randomUUID()); - when(routingInfoService.getRoutingInfo(eq(regularTenantId))).thenReturn(new TenantRoutingInfo(regularTenantId, regularProfileId, false)); + mockRoutingInfo(regularTenantId, regularProfileId, false); assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue(); assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse(); @@ -296,4 +372,43 @@ public class HashPartitionServiceTest { assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue(); } + private void verifyPartitionChangeEvent(Predicate predicate) { + verify(applicationEventPublisher).publishEvent(argThat(event -> event instanceof PartitionChangeEvent && predicate.test((PartitionChangeEvent) event))); + } + + private void mockRoutingInfo(TenantId tenantId, TenantProfileId tenantProfileId, boolean isolatedTbRuleEngine) { + when(routingInfoService.getRoutingInfo(eq(tenantId))) + .thenReturn(new TenantRoutingInfo(tenantId, tenantProfileId, isolatedTbRuleEngine)); + } + + private void mockQueues(List queues) { + when(queueRoutingInfoService.getAllQueuesRoutingInfo()).thenReturn(queues.stream() + .map(QueueRoutingInfo::new).collect(Collectors.toList())); + } + + private Queue createQueue(TenantId tenantId, int partitions) { + Queue systemQueue = new Queue(); + systemQueue.setTenantId(tenantId); + systemQueue.setName("Main"); + systemQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC); + systemQueue.setPartitions(partitions); + systemQueue.setId(new QueueId(UUID.randomUUID())); + return systemQueue; + } + + private HashPartitionService createPartitionService() { + HashPartitionService partitionService = new HashPartitionService(discoveryService, + routingInfoService, + applicationEventPublisher, + queueRoutingInfoService); + ReflectionTestUtils.setField(partitionService, "coreTopic", "tb.core"); + ReflectionTestUtils.setField(partitionService, "corePartitions", 10); + ReflectionTestUtils.setField(partitionService, "vcTopic", "tb.vc"); + ReflectionTestUtils.setField(partitionService, "vcPartitions", 10); + ReflectionTestUtils.setField(partitionService, "hashFunctionName", hashFunctionName); + partitionService.init(); + partitionService.partitionsInit(); + return partitionService; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 2db342d4e6..33fb350292 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -179,7 +179,6 @@ public class HashPartitionService implements PartitionService { public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); //TODO: remove after merging tb entity services @@ -272,12 +271,23 @@ public class HashPartitionService implements PartitionService { final ConcurrentMap> oldPartitions = myPartitions; myPartitions = newPartitions; + Set removed = new HashSet<>(); oldPartitions.forEach((queueKey, partitions) -> { - if (!myPartitions.containsKey(queueKey)) { - log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey); - applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet())); + if (!newPartitions.containsKey(queueKey)) { + removed.add(queueKey); } }); + if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { + partitionSizesMap.keySet().stream() + .filter(queueKey -> queueKey.getType() == ServiceType.TB_RULE_ENGINE && + !queueKey.getTenantId().isSysTenantId() && + !newPartitions.containsKey(queueKey)) + .forEach(removed::add); + } + removed.forEach(queueKey -> { + log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey); + applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet())); + }); myPartitions.forEach((queueKey, partitions) -> { if (!partitions.equals(oldPartitions.get(queueKey))) { @@ -306,7 +316,11 @@ public class HashPartitionService implements PartitionService { if (!changes.isEmpty()) { applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes)); responsibleServices.forEach((profileId, serviceInfos) -> { - log.info("Servers responsible for tenant profile {}: {}", profileId, toServiceIds(serviceInfos)); + if (profileId != null) { + log.info("Servers responsible for tenant profile {}: {}", profileId, toServiceIds(serviceInfos)); + } else { + log.info("Servers responsible for system queues: {}", toServiceIds(serviceInfos)); + } }); } }