From 134db4249472f094523e35d2bb3efeb6f64be26f Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 19 Mar 2024 13:49:46 +0200 Subject: [PATCH] Configurable consumer per partition for tb-core queue --- .../server/service/queue/DefaultTbCoreConsumerService.java | 5 ++++- .../service/queue/processing/AbstractConsumerService.java | 3 +-- application/src/main/resources/thingsboard.yml | 2 ++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index c0ccaea9d0..474efc5ed7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -126,6 +126,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig>builder() .queueKey(new QueueKey(ServiceType.TB_CORE)) - .config(CoreQueueConfig.of(true, (int) pollInterval)) + .config(CoreQueueConfig.of(consumerPerPartitionEnabled, (int) pollInterval)) .msgPackProcessor(this::processMsgs) .consumerCreator(config -> queueFactory.createToCoreMsgConsumer()) .consumerExecutor(consumersExecutor) @@ -764,6 +766,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService