diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index dcebe085b3..508cdff2a9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -493,9 +493,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); - consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic()); + String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic(); + consumerBuilder.topic(topic); consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer")); + consumerBuilder.groupId(topic); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(edgeEventAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index e9c42b0022..14766ecbdb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -439,9 +439,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); - consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic()); + String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic(); + consumerBuilder.topic(topic); consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer")); + consumerBuilder.groupId(topic); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(edgeEventAdmin); consumerBuilder.statsService(consumerStatsService);