Browse Source

Fix for Transport API subscriptions

pull/2811/head
Andrii Shvaika 6 years ago
parent
commit
676868b4d3
  1. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  2. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -200,7 +200,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(transportApiSettings.getRequestsTopic());
consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-transport-api-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(transportApiAdmin);
return consumerBuilder.build();

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

@ -170,7 +170,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(transportApiSettings.getRequestsTopic());
consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-transport-api-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(transportApiAdmin);
return consumerBuilder.build();

Loading…
Cancel
Save