diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 95745b48fa..4f73a59961 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -77,6 +77,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { public void init() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId()); requestBuilder.defaultTopic(requestTopic); requestBuilder.encoder(new RemoteJsRequestEncoder()); requestBuilder.enricher((request, responseTopic, requestId) -> { diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java index 36b38e262f..4c0f9bdfee 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -112,6 +112,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ public void init() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder notificationsProducerBuilder = TBKafkaProducerTemplate.builder(); notificationsProducerBuilder.settings(kafkaSettings); + notificationsProducerBuilder.clientId("producer-transport-notification-" + nodeIdProvider.getNodeId()); notificationsProducerBuilder.encoder(new ToTransportMsgEncoder()); notificationsProducer = notificationsProducerBuilder.build(); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 770648be16..724fb3f8b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -68,6 +68,7 @@ public class RemoteTransportApiService { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder responseBuilder = TBKafkaProducerTemplate.builder(); responseBuilder.settings(kafkaSettings); + responseBuilder.clientId("producer-transport-api-response-" + nodeIdProvider.getNodeId()); responseBuilder.encoder(new TransportApiResponseEncoder()); TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder requestBuilder = TBKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index ad489c14a3..bcb6797781 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -62,10 +62,13 @@ public class TBKafkaProducerTemplate { @Builder private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder encoder, TbKafkaEnricher enricher, - TbKafkaPartitioner partitioner, String defaultTopic) { + TbKafkaPartitioner partitioner, String defaultTopic, String clientId) { Properties props = settings.toProps(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + if (!StringUtils.isEmpty(clientId)) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + } this.settings = settings; this.producer = new KafkaProducer<>(props); this.encoder = encoder; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index c13569c1f1..58f3ce22a9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -104,6 +104,7 @@ public class RemoteTransportService extends AbstractTransportService { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("producer-transport-api-request-" + nodeIdProvider.getNodeId()); requestBuilder.defaultTopic(transportApiRequestsTopic); requestBuilder.encoder(new TransportApiRequestEncoder()); @@ -128,6 +129,7 @@ public class RemoteTransportService extends AbstractTransportService { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder(); ruleEngineProducerBuilder.settings(kafkaSettings); + ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + nodeIdProvider.getNodeId()); ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic); ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder()); ruleEngineProducer = ruleEngineProducerBuilder.build(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 3a2970d750..5486b50352 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @RuleNode( @@ -54,6 +55,7 @@ public class TbKafkaNode implements TbNode { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class); Properties properties = new Properties(); + properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getNodeId()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());