From 11b8d3c41ecad4871e8f6f44b2adaea774d00515 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Fri, 22 Mar 2024 13:06:58 +0200 Subject: [PATCH] added global queue prefix for pubsub queue factory --- .../provider/PubSubMonolithQueueFactory.java | 30 +++++++++---------- .../provider/PubSubTbCoreQueueFactory.java | 24 +++++++-------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index c7c59974cd..61c2614fc7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -109,40 +109,40 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic())); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic())); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic())); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic())); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic())); } @Override public TbQueueConsumer> createToVersionControlMsgConsumer() { - return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, topicService.buildTopicName(vcSettings.getTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders()) ); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration) { - return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(), + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(configuration.getTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @@ -155,7 +155,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @@ -168,13 +168,13 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(), + return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getRequestsTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic()); + return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getResponsesTopic())); } @Override @@ -202,29 +202,29 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @Override public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToOtaPackageStateServiceMsgConsumer() { - return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createToOtaPackageStateServiceMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic())); } @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } @Override public TbQueueProducer> createVersionControlMsgProducer() { - return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic()); + return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, topicService.buildTopicName(vcSettings.getTopic())); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index a6b9e44022..63975e3116 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -100,32 +100,32 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic())); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic())); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic())); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic())); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic())); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @@ -138,13 +138,13 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(), + return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getRequestsTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic()); + return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getResponsesTopic())); } @Override @@ -172,24 +172,24 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { @Override public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToOtaPackageStateServiceMsgConsumer() { - return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createToOtaPackageStateServiceMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic())); } @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } @Override