diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f29cc55197..4ee08c7095 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1644,6 +1644,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -1688,7 +1690,6 @@ queue: enabled: "${TB_HOUSEKEEPER_STATS_ENABLED:true}" # Statistics printing interval for Housekeeper print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}" - vc: # Default topic name topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}" @@ -1720,6 +1721,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine @@ -1743,6 +1746,8 @@ queue: edge: # Default topic name topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}" # Amount of partitions used by Edge services partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}" # Poll interval for topics related to Edge services diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 927c311a2d..254d0a95d6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -32,6 +32,18 @@ public class TopicService { @Value("${queue.prefix:}") private String prefix; + @Value("${queue.core.notifications-topic:tb_core.notifications}") + private String tbCoreNotificationsTopic; + + @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") + private String tbRuleEngineNotificationsTopic; + + @Value("${queue.transport.notifications-topics:tb_transport.notifications}") + private String tbTransportNotificationsTopic; + + @Value("${queue.edge.notifications-topic:tb_edge.notifications}") + private String tbEdgeNotificationsTopic; + private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbRuleEngineNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbEdgeNotificationTopics = new ConcurrentHashMap<>(); @@ -47,19 +59,28 @@ public class TopicService { public TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) { return switch (serviceType) { case TB_CORE -> tbCoreNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId)); + id -> buildNotificationsTopicPartitionInfo(tbCoreNotificationsTopic, serviceId)); case TB_RULE_ENGINE -> tbRuleEngineNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId)); - default -> buildNotificationsTopicPartitionInfo(serviceType, serviceId); + id -> buildNotificationsTopicPartitionInfo(tbRuleEngineNotificationsTopic, serviceId)); + case TB_TRANSPORT -> buildNotificationsTopicPartitionInfo(tbTransportNotificationsTopic, serviceId); + default -> throw new IllegalStateException("Unexpected service type: " + serviceType); }; } + private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String topic, String serviceId) { + return buildTopicPartitionInfo(buildNotificationTopicName(topic, serviceId), null, null, false); + } + + public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { + return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); + } + public TopicPartitionInfo getEdgeNotificationsTopic(String serviceId) { return tbEdgeNotificationTopics.computeIfAbsent(serviceId, id -> buildEdgeNotificationsTopicPartitionInfo(serviceId)); } private TopicPartitionInfo buildEdgeNotificationsTopicPartitionInfo(String serviceId) { - return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false); + return buildTopicPartitionInfo(buildNotificationTopicName(tbEdgeNotificationsTopic, serviceId), null, null, false); } public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) { @@ -70,18 +91,14 @@ public class TopicService { return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false); } - private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) { - return buildTopicPartitionInfo(serviceType.name().toLowerCase() + ".notifications." + serviceId, null, null, false); - } - - public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { - return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); - } - public String buildTopicName(String topic) { return prefix.isBlank() ? topic : prefix + "." + topic; } + private String buildNotificationTopicName(String topic, String serviceId) { + return topic + "." + serviceId; + } + public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { return this.buildTopicName( servicePrefix + queueName diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index cba83e5ff7..595e514907 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -151,6 +151,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 66ab703a54..794eaed047 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -351,6 +351,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -381,6 +383,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index be91e4fe67..860059b1ee 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -296,6 +296,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -326,6 +328,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 7bdd68baf1..824d998836 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -397,6 +397,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -427,6 +429,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 8d9a60a319..4b0adab7d1 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -330,6 +330,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -360,6 +362,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 4c11bd0018..e6fe253e79 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -283,6 +283,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -313,6 +315,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine