From 005d555bc5c8c3cc1e508062e51f3e51f925d1df Mon Sep 17 00:00:00 2001 From: Alexander Keidel Date: Fri, 9 Feb 2024 12:37:48 +0100 Subject: [PATCH] Global Queue Prefix is now correctly respected by js-executor, rule-node and tb-rule-engine-notifications-node- consumer groups --- .../server/queue/provider/KafkaMonolithQueueFactory.java | 2 +- .../server/queue/provider/KafkaTbCoreQueueFactory.java | 2 +- .../server/queue/provider/KafkaTbRuleEngineQueueFactory.java | 4 ++-- msa/js-executor/queue/kafkaTemplate.ts | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 662d29af5b..3459096e08 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -269,7 +269,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId(topicService.buildTopicName("rule-engine-node-") + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index db6ee52181..1852693304 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -226,7 +226,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId(topicService.buildTopicName("rule-engine-node-") + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 2b20afac84..63798f95a1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -179,7 +179,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-notifications-node-") + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); consumerBuilder.statsService(consumerStatsService); @@ -199,7 +199,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId(topicService.buildTopicName("rule-engine-node-") + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 07290d0d4c..4ba3a1aee1 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -64,7 +64,7 @@ export class KafkaTemplate implements IQueue { const queuePrefix: string = config.get('queue_prefix'); const requestTopic: string = queuePrefix ? queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); const useConfluent = config.get('kafka.use_confluent_cloud'); - + const groupId:string = queuePrefix ? queuePrefix + ".js-executor-group" : "js-executor-group"; this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); this.logger.info('Kafka Requests Topic: %s', requestTopic); @@ -119,7 +119,7 @@ export class KafkaTemplate implements IQueue { } } - this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); + this.consumer = this.kafkaClient.consumer({groupId: groupId}); this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); const {CRASH} = this.consumer.events;