From 7de485f4535c86e6cbb3ddafb44d1784898f8d4f Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 17 Mar 2020 14:26:44 +0200 Subject: [PATCH] Kafka configuration --- .../DefaultTbCoreToTransportService.java | 2 +- .../org/thingsboard/server/TbQueueAdmin.java | 4 +- .../server/kafka/TBKafkaAdmin.java | 37 ++++++++--- .../server/kafka/TBKafkaConsumerTemplate.java | 11 +++- .../server/kafka/TBKafkaProducerTemplate.java | 10 ++- .../server/kafka/TbKafkaSettings.java | 5 +- .../InMemoryMonolithQueueProvider.java | 2 +- .../provider/KafkaMonolithQueueProvider.java | 63 +++++++++++-------- .../provider/KafkaTbCoreQueueProvider.java | 5 +- .../KafkaTbRuleEngineQueueProvider.java | 2 +- .../provider/KafkaTransportQueueProvider.java | 39 +++++++++--- .../server/provider/TbCoreQueueProvider.java | 2 +- .../provider/TbRuleEngineQueueProvider.java | 2 +- .../service/DefaultTransportService.java | 1 + 14 files changed, 122 insertions(+), 63 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java index a6e1a37eca..833b28c3b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java @@ -44,7 +44,7 @@ public class DefaultTbCoreToTransportService implements TbCoreToTransportService private String notificationsTopic; public DefaultTbCoreToTransportService(TbCoreQueueProvider tbCoreQueueProvider) { - this.tbTransportProducer = tbCoreQueueProvider.getTransportMsgProducer(); + this.tbTransportProducer = tbCoreQueueProvider.getTransportNotificationsMsgProducer(); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/TbQueueAdmin.java b/common/queue/src/main/java/org/thingsboard/server/TbQueueAdmin.java index 6917c23719..5952e7da70 100644 --- a/common/queue/src/main/java/org/thingsboard/server/TbQueueAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/TbQueueAdmin.java @@ -15,10 +15,8 @@ */ package org.thingsboard.server; -import com.google.common.util.concurrent.ListenableFuture; - public interface TbQueueAdmin { - ListenableFuture createTopicIfNotExists(String topic); + void createTopicIfNotExists(String topic); } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java index 0bb3a75db6..5f1b41ea7b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java @@ -18,11 +18,13 @@ package org.thingsboard.server.kafka; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.TbQueueAdmin; import java.util.Collections; @@ -33,6 +35,7 @@ import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.09.18. */ +@Slf4j public class TBKafkaAdmin implements TbQueueAdmin { AdminClient client; @@ -41,17 +44,31 @@ public class TBKafkaAdmin implements TbQueueAdmin { client = AdminClient.create(settings.toProps()); } + //TODO 2.5 @Override - public ListenableFuture createTopicIfNotExists(String topic) { - - KafkaFuture topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic); - - ListenableFuture topicFuture = JdkFutureAdapters.listenInPoolThread(topicDescriptionFuture); - - return Futures.transformAsync(topicFuture, topicDescription -> { - KafkaFuture resultFuture = createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic); - return JdkFutureAdapters.listenInPoolThread(resultFuture); - }); + public void createTopicIfNotExists(String topic) { + try { + createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic).get(); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof TopicExistsException) { + //do nothing + } else { + log.warn("[{}] Failed to create topic", topic, ee); + throw new RuntimeException(ee); + } + } catch (Exception e) { + log.warn("[{}] Failed to create topic", topic, e); + throw new RuntimeException(e); + } +// +// KafkaFuture topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic); +// +// ListenableFuture topicFuture = JdkFutureAdapters.listenInPoolThread(topicDescriptionFuture); +// +// return Futures.transformAsync(topicFuture, topicDescription -> { +// KafkaFuture resultFuture = createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic); +// return JdkFutureAdapters.listenInPoolThread(resultFuture); +// }); } public void waitForTopic(String topic, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException { diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java index 51b8725118..eb35c583c8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** @@ -39,6 +40,7 @@ import java.util.stream.Collectors; @Slf4j public class TBKafkaConsumerTemplate implements TbQueueConsumer { + private final TBKafkaAdmin admin; private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; private volatile boolean subscribed; @@ -63,6 +65,7 @@ public class TBKafkaConsumerTemplate implements TbQueueCon if (maxPollRecords > 0) { props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); } + this.admin = new TBKafkaAdmin(settings); this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; this.topic = topic; @@ -70,13 +73,16 @@ public class TBKafkaConsumerTemplate implements TbQueueCon @Override public void subscribe() { + createTopicIfNotExists(topic); consumer.subscribe(Collections.singletonList(topic)); subscribed = true; } @Override public void subscribe(List partitions) { - consumer.subscribe(partitions.stream().map(partition -> topic + "." + partition).collect(Collectors.toList())); + List topicNames = partitions.stream().map(partition -> topic + "." + partition).collect(Collectors.toList()); + topicNames.forEach(this::createTopicIfNotExists); + consumer.subscribe(topicNames); subscribed = true; } @@ -119,4 +125,7 @@ public class TBKafkaConsumerTemplate implements TbQueueCon return decoder.decode(new KafkaTbQueueMsg(record)); } + private void createTopicIfNotExists(String topic) { + admin.createTopicIfNotExists(topic); + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 1b5d57f1d2..4999ba026a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -86,9 +86,15 @@ public class TBKafkaProducerTemplate implements TbQueuePro record = new ProducerRecord<>(topic.toString(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { - callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata)); + if (callback != null) { + callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata)); + } } else { - callback.onFailure(exception); + if (callback != null) { + callback.onFailure(exception); + } else { + log.warn("Producer template failure", exception); + } } }); } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java index 566766fee6..912e935bef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java @@ -18,6 +18,7 @@ package org.thingsboard.server.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -28,7 +29,7 @@ import java.util.Properties; * Created by ashvayka on 25.09.18. */ @Slf4j -@ConditionalOnProperty(prefix = "kafka", value = "enabled", havingValue = "true", matchIfMissing = false) +@ConditionalOnExpression("'${queue.type:null}'=='kafka'") @Component public class TbKafkaSettings { @@ -41,7 +42,7 @@ public class TbKafkaSettings { @Value("${queue.kafka.acks}") private String acks; - @Value("${queue.queue.kafka.retries}") + @Value("${queue.kafka.retries}") private int retries; @Value("${queue.kafka.batch.size}") diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/InMemoryMonolithQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/InMemoryMonolithQueueProvider.java index 4fa10b7c44..7fc7bdea7c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/InMemoryMonolithQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/InMemoryMonolithQueueProvider.java @@ -55,7 +55,7 @@ public class InMemoryMonolithQueueProvider implements TbCoreQueueProvider, TbRul } @Override - public TbQueueProducer> getTransportMsgProducer() { + public TbQueueProducer> getTransportNotificationsMsgProducer() { return new InMemoryTbQueueProducer<>(notificationSettings.getNotificationsTopic()); } diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java index f06897ebe6..01e7a211d3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java @@ -20,9 +20,10 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.TbQueueConsumer; import org.thingsboard.server.TbQueueCoreSettings; import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRuleEngineSettings; +import org.thingsboard.server.TbQueueTransportApiSettings; +import org.thingsboard.server.TbQueueTransportNotificationSettings; import org.thingsboard.server.common.TbProtoQueueMsg; -import org.thingsboard.server.discovery.PartitionChangeEvent; -import org.thingsboard.server.discovery.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -40,19 +41,30 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn private final TbKafkaSettings kafkaSettings; private final TbNodeIdProvider nodeIdProvider; private final TbQueueCoreSettings coreSettings; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbQueueTransportApiSettings transportApiSettings; + private final TbQueueTransportNotificationSettings transportNotificationSettings; - public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings, TbNodeIdProvider nodeIdProvider, TbQueueCoreSettings coreSettings) { + public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings, + TbNodeIdProvider nodeIdProvider, + TbQueueCoreSettings coreSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + TbQueueTransportApiSettings transportApiSettings, + TbQueueTransportNotificationSettings transportNotificationSettings) { this.kafkaSettings = kafkaSettings; this.nodeIdProvider = nodeIdProvider; this.coreSettings = coreSettings; + this.ruleEngineSettings = ruleEngineSettings; + this.transportApiSettings = transportApiSettings; + this.transportNotificationSettings = transportNotificationSettings; } @Override - public TbQueueProducer> getTransportMsgProducer() { + public TbQueueProducer> getTransportNotificationsMsgProducer() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId()); - requestBuilder.defaultTopic(coreSettings.getTopic()); + requestBuilder.clientId("producer-transport-notifications" + nodeIdProvider.getNodeId()); + requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic()); return requestBuilder.build(); } @@ -61,7 +73,7 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("producer-rule-engine-" + nodeIdProvider.getNodeId()); - requestBuilder.defaultTopic(coreSettings.getTopic()); + requestBuilder.defaultTopic(ruleEngineSettings.getTopic()); return requestBuilder.build(); } @@ -76,14 +88,13 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn @Override public TbQueueConsumer> getToRuleEngineMsgConsumer() { - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder> responseBuilder = TBKafkaConsumerTemplate.builder(); - responseBuilder.settings(kafkaSettings); - responseBuilder.topic(coreSettings.getTopic()); - responseBuilder.clientId("tb-rule-engine-consumer-" + nodeIdProvider.getNodeId()); - responseBuilder.groupId("tb-rule-engine-" + nodeIdProvider.getNodeId()); - responseBuilder.autoCommit(true); - responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); - return responseBuilder.build(); + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder> consumerBuilder = TBKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(ruleEngineSettings.getTopic()); + consumerBuilder.clientId("tb-rule-engine-consumer-" + nodeIdProvider.getNodeId()); + consumerBuilder.groupId("tb-rule-engine-consumer-" + nodeIdProvider.getNodeId()); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); + return consumerBuilder.build(); } @Override @@ -92,30 +103,28 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getTopic()); consumerBuilder.clientId("tb-core-consumer" + nodeIdProvider.getNodeId()); - consumerBuilder.groupId("tb-core-node-" + nodeIdProvider.getNodeId()); + consumerBuilder.groupId("tb-core-consumer-" + nodeIdProvider.getNodeId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); return consumerBuilder.build(); } @Override public TbQueueConsumer> getTransportApiRequestConsumer() { - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder> responseBuilder = TBKafkaConsumerTemplate.builder(); - responseBuilder.settings(kafkaSettings); - responseBuilder.topic(coreSettings.getTopic()); - responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId()); - responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); - responseBuilder.autoCommit(true); - //TODO 2.5 - responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); - return responseBuilder.build(); + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder> consumerBuilder = TBKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(transportApiSettings.getRequestsTopic()); + consumerBuilder.clientId("consumer-transport-api-" + nodeIdProvider.getNodeId()); + consumerBuilder.groupId("transport-api-consumer-node-" + nodeIdProvider.getNodeId()); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); + return consumerBuilder.build(); } @Override public TbQueueProducer> getTransportApiResponseProducer() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("transport-api-" + nodeIdProvider.getNodeId()); - requestBuilder.defaultTopic(coreSettings.getTopic()); + requestBuilder.clientId("transport-api-producer-" + nodeIdProvider.getNodeId()); + requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic()); return requestBuilder.build(); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java index 5cbe93196a..72338bb149 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java @@ -21,8 +21,6 @@ import org.thingsboard.server.TbQueueConsumer; import org.thingsboard.server.TbQueueCoreSettings; import org.thingsboard.server.TbQueueProducer; import org.thingsboard.server.common.TbProtoQueueMsg; -import org.thingsboard.server.discovery.PartitionChangeEvent; -import org.thingsboard.server.discovery.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -48,7 +46,7 @@ public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider { } @Override - public TbQueueProducer> getTransportMsgProducer() { + public TbQueueProducer> getTransportNotificationsMsgProducer() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId()); @@ -93,7 +91,6 @@ public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider { responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId()); responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); responseBuilder.autoCommit(true); - //TODO 2.5 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); return responseBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbRuleEngineQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbRuleEngineQueueProvider.java index eb0bcb5d6e..d635419141 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbRuleEngineQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbRuleEngineQueueProvider.java @@ -46,7 +46,7 @@ public class KafkaTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider } @Override - public TbQueueProducer> getTransportMsgProducer() { + public TbQueueProducer> getTransportNotificationsMsgProducer() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId()); diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java index dcf6474e1e..577c9180fc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java @@ -19,9 +19,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueCoreSettings; import org.thingsboard.server.TbQueueProducer; import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.TbQueueRuleEngineSettings; import org.thingsboard.server.TbQueueTransportApiSettings; +import org.thingsboard.server.TbQueueTransportNotificationSettings; import org.thingsboard.server.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -29,6 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.kafka.TBKafkaAdmin; import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.kafka.TbKafkaSettings; @@ -41,31 +45,42 @@ public class KafkaTransportQueueProvider implements TransportQueueProvider { private final TbKafkaSettings kafkaSettings; private final TbNodeIdProvider nodeIdProvider; + private final TbQueueCoreSettings coreSettings; + private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueTransportApiSettings transportApiSettings; + private final TbQueueTransportNotificationSettings transportNotificationSettings; - public KafkaTransportQueueProvider(TbKafkaSettings kafkaSettings, TbNodeIdProvider nodeIdProvider, TbQueueTransportApiSettings transportApiSettings) { + public KafkaTransportQueueProvider(TbKafkaSettings kafkaSettings, + TbNodeIdProvider nodeIdProvider, + TbQueueCoreSettings coreSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + TbQueueTransportApiSettings transportApiSettings, + TbQueueTransportNotificationSettings transportNotificationSettings) { this.kafkaSettings = kafkaSettings; this.nodeIdProvider = nodeIdProvider; + this.coreSettings = coreSettings; + this.ruleEngineSettings = ruleEngineSettings; this.transportApiSettings = transportApiSettings; + this.transportNotificationSettings = transportNotificationSettings; } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> getTransportApiRequestTemplate() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId()); + requestBuilder.clientId("producer-transport-api-" + nodeIdProvider.getNodeId()); requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic()); TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder> responseBuilder = TBKafkaConsumerTemplate.builder(); responseBuilder.settings(kafkaSettings); - responseBuilder.topic(transportApiSettings.getResponsesTopic()); - responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId()); - responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); - responseBuilder.autoCommit(true); + responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + nodeIdProvider.getNodeId()); + responseBuilder.clientId("consumer-transport-api-" + nodeIdProvider.getNodeId()); + responseBuilder.groupId("transport-node-" + nodeIdProvider.getNodeId()); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); + templateBuilder.queueAdmin(new TBKafkaAdmin(kafkaSettings)); templateBuilder.requestTemplate(requestBuilder.build()); templateBuilder.responseTemplate(responseBuilder.build()); templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); @@ -78,7 +93,7 @@ public class KafkaTransportQueueProvider implements TransportQueueProvider { public TbQueueProducer> getRuleEngineMsgProducer() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId()); + requestBuilder.clientId("producer-rule-engine-" + nodeIdProvider.getNodeId()); requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic()); return requestBuilder.build(); } @@ -87,13 +102,19 @@ public class KafkaTransportQueueProvider implements TransportQueueProvider { public TbQueueProducer> getTbCoreMsgProducer() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId()); + requestBuilder.clientId("producer-tb-core-" + nodeIdProvider.getNodeId()); requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic()); return requestBuilder.build(); } @Override public TbQueueConsumer> getTransportNotificationsConsumer() { - return null; + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder> responseBuilder = TBKafkaConsumerTemplate.builder(); + responseBuilder.settings(kafkaSettings); + responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + nodeIdProvider.getNodeId()); + responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId()); + responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); + responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); + return responseBuilder.build(); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java index ef9cd23ac3..ba93bf96ef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java @@ -37,7 +37,7 @@ public interface TbCoreQueueProvider { * * @return */ - TbQueueProducer> getTransportMsgProducer(); + TbQueueProducer> getTransportNotificationsMsgProducer(); /** * Used to push messages to instances of TB RuleEngine Service diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/TbRuleEngineQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/TbRuleEngineQueueProvider.java index 2efa9ffbc6..b53c3ac28b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/TbRuleEngineQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/TbRuleEngineQueueProvider.java @@ -35,7 +35,7 @@ public interface TbRuleEngineQueueProvider { * * @return */ - TbQueueProducer> getTransportMsgProducer(); + TbQueueProducer> getTransportNotificationsMsgProducer(); /** * Used to push messages to instances of TB RuleEngine Service diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 19bf43f7db..f1b73f30af 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -119,6 +119,7 @@ public class DefaultTransportService implements TransportService { ruleEngineMsgProducer = queueProvider.getRuleEngineMsgProducer(); tbCoreMsgProducer = queueProvider.getTbCoreMsgProducer(); transportNotificationsConsumer = queueProvider.getTransportNotificationsConsumer(); + transportNotificationsConsumer.subscribe(); transportApiRequestTemplate.init(); mainConsumerExecutor.execute(() -> { while (!stopped) {