Browse Source

Merge pull request #10185 from thingsboard/fix/consumer-group-prefix

Added global queue prefix to js-executor, rule-node and tb-rule-engine-notifications-node- consumer group id
pull/10219/head
Andrew Shvayka 2 years ago
committed by GitHub
parent
commit
7ec87494f9
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  2. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
  3. 4
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
  4. 4
      msa/js-executor/queue/kafkaTemplate.ts

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -269,7 +269,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId(topicService.buildTopicName("rule-engine-node-") + serviceInfoProvider.getServiceId());
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

@ -226,7 +226,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId(topicService.buildTopicName("rule-engine-node-") + serviceInfoProvider.getServiceId());
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);

4
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -179,7 +179,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-notifications-node-") + serviceInfoProvider.getServiceId());
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
consumerBuilder.statsService(consumerStatsService);
@ -199,7 +199,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId(topicService.buildTopicName("rule-engine-node-") + serviceInfoProvider.getServiceId());
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);

4
msa/js-executor/queue/kafkaTemplate.ts

@ -64,7 +64,7 @@ export class KafkaTemplate implements IQueue {
const queuePrefix: string = config.get('queue_prefix');
const requestTopic: string = queuePrefix ? queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
const useConfluent = config.get('kafka.use_confluent_cloud');
const groupId:string = queuePrefix ? queuePrefix + ".js-executor-group" : "js-executor-group";
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
this.logger.info('Kafka Requests Topic: %s', requestTopic);
@ -119,7 +119,7 @@ export class KafkaTemplate implements IQueue {
}
}
this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'});
this.consumer = this.kafkaClient.consumer({groupId: groupId});
this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner});
const {CRASH} = this.consumer.events;

Loading…
Cancel
Save