Browse Source

Edge notification unique topic per edge must have the same group as topic name to avoid rebalance in case other edge disconnects

pull/13176/head
Volodymyr Babak 1 year ago
parent
commit
9bf528eee5
  1. 5
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  2. 5
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

5
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -493,9 +493,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic());
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
consumerBuilder.topic(topic);
consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
consumerBuilder.groupId(topic);
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(edgeEventAdmin);
consumerBuilder.statsService(consumerStatsService);

5
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

@ -439,9 +439,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic());
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
consumerBuilder.topic(topic);
consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
consumerBuilder.groupId(topic);
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(edgeEventAdmin);
consumerBuilder.statsService(consumerStatsService);

Loading…
Cancel
Save