Browse Source

removed ServiceId from kafka consumer groupId

pull/3911/head
YevhenBondarenko 5 years ago
committed by Andrew Shvayka
parent
commit
4de258f2ae
  1. 12
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  2. 8
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
  3. 6
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
  4. 4
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java

12
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -153,7 +153,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
return consumerBuilder.build();
@ -165,7 +165,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-rule-engine-notifications-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
return consumerBuilder.build();
@ -177,7 +177,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getTopic());
consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-core-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();
@ -189,7 +189,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-core-notifications-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
return consumerBuilder.build();
@ -230,7 +230,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node");
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
@ -256,7 +256,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getUsageStatsTopic());
consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-us-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();

8
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

@ -147,7 +147,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getTopic());
consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-node-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-node");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();
@ -159,7 +159,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-notifications-node");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
return consumerBuilder.build();
@ -200,7 +200,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node");
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
@ -226,7 +226,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getUsageStatsTopic());
consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-us-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();

6
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -142,7 +142,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
return consumerBuilder.build();
@ -154,7 +154,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-rule-engine-notifications-node");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
return consumerBuilder.build();
@ -173,7 +173,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node");
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);

4
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java

@ -92,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("transport-node");
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
responseBuilder.admin(transportApiAdmin);
@ -133,7 +133,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("transport-node");
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
responseBuilder.admin(notificationAdmin);
return responseBuilder.build();

Loading…
Cancel
Save