Browse Source

Kafka configuration

pull/2566/head
Andrii Shvaika 6 years ago
parent
commit
7de485f453
  1. 2
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java
  2. 4
      common/queue/src/main/java/org/thingsboard/server/TbQueueAdmin.java
  3. 37
      common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
  4. 11
      common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
  5. 10
      common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
  6. 5
      common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java
  7. 2
      common/queue/src/main/java/org/thingsboard/server/provider/InMemoryMonolithQueueProvider.java
  8. 63
      common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java
  9. 5
      common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java
  10. 2
      common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbRuleEngineQueueProvider.java
  11. 39
      common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java
  12. 2
      common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java
  13. 2
      common/queue/src/main/java/org/thingsboard/server/provider/TbRuleEngineQueueProvider.java
  14. 1
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

2
application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java

@ -44,7 +44,7 @@ public class DefaultTbCoreToTransportService implements TbCoreToTransportService
private String notificationsTopic;
public DefaultTbCoreToTransportService(TbCoreQueueProvider tbCoreQueueProvider) {
this.tbTransportProducer = tbCoreQueueProvider.getTransportMsgProducer();
this.tbTransportProducer = tbCoreQueueProvider.getTransportNotificationsMsgProducer();
}
@Override

4
common/queue/src/main/java/org/thingsboard/server/TbQueueAdmin.java

@ -15,10 +15,8 @@
*/
package org.thingsboard.server;
import com.google.common.util.concurrent.ListenableFuture;
public interface TbQueueAdmin {
ListenableFuture<Void> createTopicIfNotExists(String topic);
void createTopicIfNotExists(String topic);
}

37
common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java

@ -18,11 +18,13 @@ package org.thingsboard.server.kafka;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.thingsboard.server.TbQueueAdmin;
import java.util.Collections;
@ -33,6 +35,7 @@ import java.util.concurrent.TimeoutException;
/**
* Created by ashvayka on 24.09.18.
*/
@Slf4j
public class TBKafkaAdmin implements TbQueueAdmin {
AdminClient client;
@ -41,17 +44,31 @@ public class TBKafkaAdmin implements TbQueueAdmin {
client = AdminClient.create(settings.toProps());
}
//TODO 2.5
@Override
public ListenableFuture<Void> createTopicIfNotExists(String topic) {
KafkaFuture<TopicDescription> topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic);
ListenableFuture<TopicDescription> topicFuture = JdkFutureAdapters.listenInPoolThread(topicDescriptionFuture);
return Futures.transformAsync(topicFuture, topicDescription -> {
KafkaFuture<Void> resultFuture = createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic);
return JdkFutureAdapters.listenInPoolThread(resultFuture);
});
public void createTopicIfNotExists(String topic) {
try {
createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic).get();
} catch (ExecutionException ee) {
if (ee.getCause() instanceof TopicExistsException) {
//do nothing
} else {
log.warn("[{}] Failed to create topic", topic, ee);
throw new RuntimeException(ee);
}
} catch (Exception e) {
log.warn("[{}] Failed to create topic", topic, e);
throw new RuntimeException(e);
}
//
// KafkaFuture<TopicDescription> topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic);
//
// ListenableFuture<TopicDescription> topicFuture = JdkFutureAdapters.listenInPoolThread(topicDescriptionFuture);
//
// return Futures.transformAsync(topicFuture, topicDescription -> {
// KafkaFuture<Void> resultFuture = createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic);
// return JdkFutureAdapters.listenInPoolThread(resultFuture);
// });
}
public void waitForTopic(String topic, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException {

11
common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java

@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
@ -39,6 +40,7 @@ import java.util.stream.Collectors;
@Slf4j
public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
private final TBKafkaAdmin admin;
private final KafkaConsumer<String, byte[]> consumer;
private final TbKafkaDecoder<T> decoder;
private volatile boolean subscribed;
@ -63,6 +65,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
if (maxPollRecords > 0) {
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
}
this.admin = new TBKafkaAdmin(settings);
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
this.topic = topic;
@ -70,13 +73,16 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
@Override
public void subscribe() {
createTopicIfNotExists(topic);
consumer.subscribe(Collections.singletonList(topic));
subscribed = true;
}
@Override
public void subscribe(List<Integer> partitions) {
consumer.subscribe(partitions.stream().map(partition -> topic + "." + partition).collect(Collectors.toList()));
List<String> topicNames = partitions.stream().map(partition -> topic + "." + partition).collect(Collectors.toList());
topicNames.forEach(this::createTopicIfNotExists);
consumer.subscribe(topicNames);
subscribed = true;
}
@ -119,4 +125,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
return decoder.decode(new KafkaTbQueueMsg(record));
}
private void createTopicIfNotExists(String topic) {
admin.createTopicIfNotExists(topic);
}
}

10
common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java

@ -86,9 +86,15 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
record = new ProducerRecord<>(topic.toString(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
if (callback != null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
}
} else {
callback.onFailure(exception);
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure", exception);
}
}
});
}

5
common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java

@ -18,6 +18,7 @@ package org.thingsboard.server.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@ -28,7 +29,7 @@ import java.util.Properties;
* Created by ashvayka on 25.09.18.
*/
@Slf4j
@ConditionalOnProperty(prefix = "kafka", value = "enabled", havingValue = "true", matchIfMissing = false)
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
@Component
public class TbKafkaSettings {
@ -41,7 +42,7 @@ public class TbKafkaSettings {
@Value("${queue.kafka.acks}")
private String acks;
@Value("${queue.queue.kafka.retries}")
@Value("${queue.kafka.retries}")
private int retries;
@Value("${queue.kafka.batch.size}")

2
common/queue/src/main/java/org/thingsboard/server/provider/InMemoryMonolithQueueProvider.java

@ -55,7 +55,7 @@ public class InMemoryMonolithQueueProvider implements TbCoreQueueProvider, TbRul
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
return new InMemoryTbQueueProducer<>(notificationSettings.getNotificationsTopic());
}

63
common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java

@ -20,9 +20,10 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueCoreSettings;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRuleEngineSettings;
import org.thingsboard.server.TbQueueTransportApiSettings;
import org.thingsboard.server.TbQueueTransportNotificationSettings;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.discovery.PartitionChangeEvent;
import org.thingsboard.server.discovery.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@ -40,19 +41,30 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn
private final TbKafkaSettings kafkaSettings;
private final TbNodeIdProvider nodeIdProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings, TbNodeIdProvider nodeIdProvider, TbQueueCoreSettings coreSettings) {
public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings,
TbNodeIdProvider nodeIdProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.kafkaSettings = kafkaSettings;
this.nodeIdProvider = nodeIdProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.clientId("producer-transport-notifications" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic());
return requestBuilder.build();
}
@ -61,7 +73,7 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-rule-engine-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
return requestBuilder.build();
}
@ -76,14 +88,13 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> getToRuleEngineMsgConsumer() {
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(coreSettings.getTopic());
responseBuilder.clientId("tb-rule-engine-consumer-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("tb-rule-engine-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true);
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
return responseBuilder.build();
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("tb-rule-engine-consumer-" + nodeIdProvider.getNodeId());
consumerBuilder.groupId("tb-rule-engine-consumer-" + nodeIdProvider.getNodeId());
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
return consumerBuilder.build();
}
@Override
@ -92,30 +103,28 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getTopic());
consumerBuilder.clientId("tb-core-consumer" + nodeIdProvider.getNodeId());
consumerBuilder.groupId("tb-core-node-" + nodeIdProvider.getNodeId());
consumerBuilder.groupId("tb-core-consumer-" + nodeIdProvider.getNodeId());
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
return consumerBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> getTransportApiRequestConsumer() {
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(coreSettings.getTopic());
responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true);
//TODO 2.5
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
return responseBuilder.build();
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(transportApiSettings.getRequestsTopic());
consumerBuilder.clientId("consumer-transport-api-" + nodeIdProvider.getNodeId());
consumerBuilder.groupId("transport-api-consumer-node-" + nodeIdProvider.getNodeId());
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiResponseProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("transport-api-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.clientId("transport-api-producer-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
return requestBuilder.build();
}
}

5
common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java

@ -21,8 +21,6 @@ import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueCoreSettings;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.discovery.PartitionChangeEvent;
import org.thingsboard.server.discovery.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@ -48,7 +46,7 @@ public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId());
@ -93,7 +91,6 @@ public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider {
responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true);
//TODO 2.5
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
return responseBuilder.build();
}

2
common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbRuleEngineQueueProvider.java

@ -46,7 +46,7 @@ public class KafkaTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId());

39
common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java

@ -19,9 +19,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueCoreSettings;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.TbQueueRuleEngineSettings;
import org.thingsboard.server.TbQueueTransportApiSettings;
import org.thingsboard.server.TbQueueTransportNotificationSettings;
import org.thingsboard.server.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -29,6 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings;
@ -41,31 +45,42 @@ public class KafkaTransportQueueProvider implements TransportQueueProvider {
private final TbKafkaSettings kafkaSettings;
private final TbNodeIdProvider nodeIdProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
public KafkaTransportQueueProvider(TbKafkaSettings kafkaSettings, TbNodeIdProvider nodeIdProvider, TbQueueTransportApiSettings transportApiSettings) {
public KafkaTransportQueueProvider(TbKafkaSettings kafkaSettings,
TbNodeIdProvider nodeIdProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.kafkaSettings = kafkaSettings;
this.nodeIdProvider = nodeIdProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
}
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiRequestTemplate() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId());
requestBuilder.clientId("producer-transport-api-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportApiSettings.getResponsesTopic());
responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true);
responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + nodeIdProvider.getNodeId());
responseBuilder.clientId("consumer-transport-api-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("transport-node-" + nodeIdProvider.getNodeId());
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
templateBuilder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
templateBuilder.requestTemplate(requestBuilder.build());
templateBuilder.responseTemplate(responseBuilder.build());
templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
@ -78,7 +93,7 @@ public class KafkaTransportQueueProvider implements TransportQueueProvider {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId());
requestBuilder.clientId("producer-rule-engine-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
return requestBuilder.build();
}
@ -87,13 +102,19 @@ public class KafkaTransportQueueProvider implements TransportQueueProvider {
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-" + nodeIdProvider.getNodeId());
requestBuilder.clientId("producer-tb-core-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
return requestBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsConsumer() {
return null;
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + nodeIdProvider.getNodeId());
responseBuilder.clientId("consumer-transport-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
return responseBuilder.build();
}
}

2
common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java

@ -37,7 +37,7 @@ public interface TbCoreQueueProvider {
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer();
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer();
/**
* Used to push messages to instances of TB RuleEngine Service

2
common/queue/src/main/java/org/thingsboard/server/provider/TbRuleEngineQueueProvider.java

@ -35,7 +35,7 @@ public interface TbRuleEngineQueueProvider {
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer();
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer();
/**
* Used to push messages to instances of TB RuleEngine Service

1
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -119,6 +119,7 @@ public class DefaultTransportService implements TransportService {
ruleEngineMsgProducer = queueProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = queueProvider.getTbCoreMsgProducer();
transportNotificationsConsumer = queueProvider.getTransportNotificationsConsumer();
transportNotificationsConsumer.subscribe();
transportApiRequestTemplate.init();
mainConsumerExecutor.execute(() -> {
while (!stopped) {

Loading…
Cancel
Save