Browse Source

Refactor TopicService. Change ymls to be able to configure notification topic names

pull/12588/head
Andrii Landiak 1 year ago
parent
commit
e3a72fec09
  1. 7
      application/src/main/resources/thingsboard.yml
  2. 41
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java
  3. 2
      msa/vc-executor/src/main/resources/tb-vc-executor.yml
  4. 4
      transport/coap/src/main/resources/tb-coap-transport.yml
  5. 4
      transport/http/src/main/resources/tb-http-transport.yml
  6. 4
      transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml
  7. 4
      transport/mqtt/src/main/resources/tb-mqtt-transport.yml
  8. 4
      transport/snmp/src/main/resources/tb-snmp-transport.yml

7
application/src/main/resources/thingsboard.yml

@ -1644,6 +1644,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -1688,7 +1690,6 @@ queue:
enabled: "${TB_HOUSEKEEPER_STATS_ENABLED:true}"
# Statistics printing interval for Housekeeper
print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}"
vc:
# Default topic name
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
@ -1720,6 +1721,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine
@ -1743,6 +1746,8 @@ queue:
edge:
# Default topic name
topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}"
# Amount of partitions used by Edge services
partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}"
# Poll interval for topics related to Edge services

41
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java

@ -32,6 +32,18 @@ public class TopicService {
@Value("${queue.prefix:}")
private String prefix;
@Value("${queue.core.notifications-topic:tb_core.notifications}")
private String tbCoreNotificationsTopic;
@Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}")
private String tbRuleEngineNotificationsTopic;
@Value("${queue.transport.notifications-topics:tb_transport.notifications}")
private String tbTransportNotificationsTopic;
@Value("${queue.edge.notifications-topic:tb_edge.notifications}")
private String tbEdgeNotificationsTopic;
private final ConcurrentMap<String, TopicPartitionInfo> tbCoreNotificationTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TopicPartitionInfo> tbRuleEngineNotificationTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TopicPartitionInfo> tbEdgeNotificationTopics = new ConcurrentHashMap<>();
@ -47,19 +59,28 @@ public class TopicService {
public TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) {
return switch (serviceType) {
case TB_CORE -> tbCoreNotificationTopics.computeIfAbsent(serviceId,
id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId));
id -> buildNotificationsTopicPartitionInfo(tbCoreNotificationsTopic, serviceId));
case TB_RULE_ENGINE -> tbRuleEngineNotificationTopics.computeIfAbsent(serviceId,
id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId));
default -> buildNotificationsTopicPartitionInfo(serviceType, serviceId);
id -> buildNotificationsTopicPartitionInfo(tbRuleEngineNotificationsTopic, serviceId));
case TB_TRANSPORT -> buildNotificationsTopicPartitionInfo(tbTransportNotificationsTopic, serviceId);
default -> throw new IllegalStateException("Unexpected service type: " + serviceType);
};
}
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String topic, String serviceId) {
return buildTopicPartitionInfo(buildNotificationTopicName(topic, serviceId), null, null, false);
}
public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) {
return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition);
}
public TopicPartitionInfo getEdgeNotificationsTopic(String serviceId) {
return tbEdgeNotificationTopics.computeIfAbsent(serviceId, id -> buildEdgeNotificationsTopicPartitionInfo(serviceId));
}
private TopicPartitionInfo buildEdgeNotificationsTopicPartitionInfo(String serviceId) {
return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false);
return buildTopicPartitionInfo(buildNotificationTopicName(tbEdgeNotificationsTopic, serviceId), null, null, false);
}
public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) {
@ -70,18 +91,14 @@ public class TopicService {
return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false);
}
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) {
return buildTopicPartitionInfo(serviceType.name().toLowerCase() + ".notifications." + serviceId, null, null, false);
}
public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) {
return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition);
}
public String buildTopicName(String topic) {
return prefix.isBlank() ? topic : prefix + "." + topic;
}
private String buildNotificationTopicName(String topic, String serviceId) {
return topic + "." + serviceId;
}
public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) {
return this.buildTopicName(
servicePrefix + queueName

2
msa/vc-executor/src/main/resources/tb-vc-executor.yml

@ -151,6 +151,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices

4
transport/coap/src/main/resources/tb-coap-transport.yml

@ -351,6 +351,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -381,6 +383,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

4
transport/http/src/main/resources/tb-http-transport.yml

@ -296,6 +296,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -326,6 +328,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

4
transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml

@ -397,6 +397,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -427,6 +429,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

4
transport/mqtt/src/main/resources/tb-mqtt-transport.yml

@ -330,6 +330,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -360,6 +362,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

4
transport/snmp/src/main/resources/tb-snmp-transport.yml

@ -283,6 +283,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -313,6 +315,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

Loading…
Cancel
Save