From 55775f2815db89d78596c6a99a74ebb3cffaa830 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 27 Jul 2023 18:33:03 +0300 Subject: [PATCH] Consumer group per isolated tenant's queue; remove sleeping for Kafka consumer --- .../queue/common/AbstractTbQueueConsumerTemplate.java | 8 +++++++- .../server/queue/kafka/TbKafkaConsumerTemplate.java | 6 ++++++ .../server/queue/provider/KafkaMonolithQueueFactory.java | 2 +- .../queue/provider/KafkaTbRuleEngineQueueFactory.java | 2 +- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index a152e0c466..5b0e84c2ed 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -103,7 +103,9 @@ public abstract class AbstractTbQueueConsumerTemplate i consumerLock.unlock(); } - if (records.isEmpty()) { return sleepAndReturnEmpty(startNanos, durationInMillis); } + if (records.isEmpty() && !isLongPollingSupported()) { + return sleepAndReturnEmpty(startNanos, durationInMillis); + } return decodeRecords(records); } @@ -189,4 +191,8 @@ public abstract class AbstractTbQueueConsumerTemplate i abstract protected void doUnsubscribe(); + protected boolean isLongPollingSupported() { + return false; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 00bb7aa541..9f58446966 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -122,4 +122,10 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue statsService.unregisterClientGroup(groupId); } } + + @Override + public boolean isLongPollingSupported() { + return true; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 364abcff4c..779bf9fae3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -187,7 +187,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(configuration.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId("re-" + queueName + "-consumer"); + consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index b8e07a45f7..eb387a84f4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -166,7 +166,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(configuration.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId("re-" + queueName + "-consumer"); + consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService);