From 9ddd22edcf9e5e73cc7a527c0fbdfe6e367291b1 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 20 Mar 2025 16:12:42 +0200 Subject: [PATCH 1/4] EDQS: don't use Kafka manual partitions assignment --- .../server/service/edqs/EdqsSyncService.java | 3 ++ .../service/edqs/KafkaEdqsSyncService.java | 10 +++++- .../src/main/resources/thingsboard.yml | 12 +++---- .../server/edqs/processor/EdqsProcessor.java | 7 ++-- .../server/edqs/processor/EdqsProducer.java | 12 +++---- .../edqs/state/KafkaEdqsStateService.java | 10 +++++- .../common/DefaultTbQueueRequestTemplate.java | 1 - .../server/queue/kafka/TbKafkaAdmin.java | 33 ++++++++++++++----- edqs/src/main/resources/edqs.yml | 12 +++---- 9 files changed, 63 insertions(+), 37 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index 79e0e60983..6ea2f959b7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sql.relation.RelationRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; +import org.thingsboard.server.queue.edqs.EdqsConfig; import java.util.List; import java.util.Map; @@ -75,6 +76,8 @@ public abstract class EdqsSyncService { @Autowired @Lazy private DefaultEdqsService edqsService; + @Autowired + protected EdqsConfig edqsConfig; private final ConcurrentHashMap entityInfoMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap keys = new ConcurrentHashMap<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 4ef552521b..201964c955 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -17,11 +17,14 @@ package org.thingsboard.server.service.edqs; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Service @ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'") @@ -31,7 +34,12 @@ public class KafkaEdqsSyncService extends EdqsSyncService { public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); - this.syncNeeded = kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic()); + this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) + .mapToObj(partition -> TopicPartitionInfo.builder() + .topic(EdqsQueue.EVENTS.getTopic()) + .partition(partition) + .build().getFullTopicName()) + .collect(Collectors.toSet())); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c61a993d56..eac787d52b 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1645,12 +1645,12 @@ queue: calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Calculated Field State topics calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" - # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions - edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + # Kafka properties for EDQS events topics + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS requests topic (default: 3 minutes retention) + edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS state topic (infinite retention, compaction) + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 7ddc9147df..f2360617ee 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -164,13 +164,10 @@ public class EdqsProcessor implements TbQueueHandler, } try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - Set partitions = newPartitions.stream() - .map(tpi -> tpi.withUseInternalPartition(true)) - .collect(Collectors.toSet()); - stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic())); + stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic())); // eventsConsumer's partitions are updated by stateService - responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template + responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); if (CollectionsUtil.isNotEmpty(oldPartitions)) { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index be1f0481be..970ee76381 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -70,17 +70,13 @@ public class EdqsProducer { log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t); } }; + TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .topic(topic) + .partition(partitionService.resolvePartition(tenantId)) + .build(); if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .partition(partitionService.resolvePartition(tenantId)) - .useInternalPartition(true) - .build(); kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction } else { - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .build(); producer.send(tpi, new TbProtoQueueMsg<>(null, msg), callback); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 85b8e92387..067c730bfe 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -45,6 +45,8 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Service @RequiredArgsConstructor @@ -151,7 +153,13 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void process(Set partitions) { if (queueStateService.getPartitions().isEmpty()) { - eventsToBackupConsumer.subscribe(); + Set allPartitions = IntStream.range(0, config.getPartitions()) + .mapToObj(partition -> TopicPartitionInfo.builder() + .topic(EdqsQueue.EVENTS.getTopic()) + .partition(partition) + .build()) + .collect(Collectors.toSet()); + eventsToBackupConsumer.subscribe(allPartitions); eventsToBackupConsumer.launch(); } queueStateService.update(new QueueKey(ServiceType.EDQS), partitions); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 4efb297491..1beb505595 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -263,7 +263,6 @@ public class DefaultTbQueueRequestTemplate topics) { try { - if (!getTopics().contains(topic)) { + List existingTopics = getTopics().stream().filter(topics::contains).toList(); + if (existingTopics.isEmpty()) { return true; } - TopicDescription topicDescription = settings.getAdminClient().describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic).get(); - List partitions = topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).toList(); - Map beginningOffsets = settings.getAdminClient().listOffsets(partitions.stream() + List allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream() + .flatMap(entry -> { + String topic = entry.getKey(); + TopicDescription topicDescription; + try { + topicDescription = entry.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); + }) + .toList(); + + Map beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(); - - Map endOffsets = settings.getAdminClient().listOffsets(partitions.stream() + Map endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(); - for (TopicPartition partition : partitions) { + for (TopicPartition partition : allPartitions) { long beginningOffset = beginningOffsets.get(partition).offset(); long endOffset = endOffsets.get(partition).offset(); if (beginningOffset != endOffset) { - log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), topic); + log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); return false; } } return true; } catch (InterruptedException | ExecutionException e) { - log.error("Failed to check if topic [{}] is empty.", topic, e); + log.error("Failed to check if topics [{}] empty.", topics, e); return false; } } diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index f7d0eda841..c101eff68e 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -148,12 +148,12 @@ queue: # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: - # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions - edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + # Kafka properties for EDQS events topics + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS requests topic (default: 3 minutes retention) + edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS state topic (infinite retention, compaction) + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" From 0a14ce3f12399054eb22668e24903858bc8f823e Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 20 Mar 2025 16:53:28 +0200 Subject: [PATCH 2/4] Add EDQS monitoring --- .../monitoring/data/Latencies.java | 1 + .../monitoring/data/MonitoredServiceKey.java | 1 + .../service/BaseMonitoringService.java | 68 +++++++++++++++++++ .../src/main/resources/tb-monitoring.yml | 3 + 4 files changed, 73 insertions(+) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index a190b7b791..b42e9138e4 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -20,6 +20,7 @@ public class Latencies { public static final String WS_CONNECT = "wsConnect"; public static final String WS_SUBSCRIBE = "wsSubscribe"; public static final String LOG_IN = "logIn"; + public static final String EDQS_QUERY = "edqsQuery"; public static String request(String key) { return String.format("%sRequest", key); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java index 342ee121ef..9c3ee5b786 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java @@ -18,5 +18,6 @@ package org.thingsboard.monitoring.data; public class MonitoredServiceKey { public static final String GENERAL = "Monitoring"; + public static final String EDQS = "*EDQS*"; } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java index 9157e9f31c..7219e4f9aa 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java @@ -15,10 +15,13 @@ */ package org.thingsboard.monitoring.service; +import com.google.common.collect.Sets; import jakarta.annotation.PostConstruct; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.client.WsClient; @@ -27,13 +30,26 @@ import org.thingsboard.monitoring.config.MonitoringConfig; import org.thingsboard.monitoring.config.MonitoringTarget; import org.thingsboard.monitoring.data.Latencies; import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import org.thingsboard.monitoring.util.TbStopWatch; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityDataPageLink; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.common.data.query.EntityDataSortOrder; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.EntityTypeFilter; +import org.thingsboard.server.common.data.query.TsValue; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -41,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; @Slf4j public abstract class BaseMonitoringService, T extends MonitoringTarget> { @@ -61,6 +78,9 @@ public abstract class BaseMonitoringService, T ext @Autowired protected ApplicationContext applicationContext; + @Value("${monitoring.edqs.enabled:false}") + private boolean edqsMonitoringEnabled; + @PostConstruct private void init() { if (configs == null || configs.isEmpty()) { @@ -108,6 +128,21 @@ public abstract class BaseMonitoringService, T ext check(healthChecker, wsClient); } } + + if (edqsMonitoringEnabled) { + try { + stopWatch.start(); + checkEdqs(); + reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime()); + + reporter.serviceIsOk(MonitoredServiceKey.EDQS); + } catch (ServiceFailureException e) { + reporter.serviceFailure(MonitoredServiceKey.EDQS, e); + } catch (Exception e) { + reporter.serviceFailure(MonitoredServiceKey.GENERAL, e); + } + } + reporter.reportLatencies(tbClient); log.debug("Finished {}", getName()); } catch (Throwable error) { @@ -149,6 +184,39 @@ public abstract class BaseMonitoringService, T ext } } + private void checkEdqs() { + EntityTypeFilter entityTypeFilter = new EntityTypeFilter(); + entityTypeFilter.setEntityType(EntityType.DEVICE); + EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"))); + EntityDataQuery entityDataQuery = new EntityDataQuery(entityTypeFilter, pageLink, + List.of(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "type")), + List.of(new EntityKey(EntityKeyType.TIME_SERIES, "testData")), + Collections.emptyList()); + + PageData result = tbClient.findEntityDataByQuery(entityDataQuery); + Set devices = result.getData().stream() + .map(entityData -> entityData.getEntityId().getId()) + .collect(Collectors.toSet()); + Set missing = Sets.difference(new HashSet<>(this.devices), devices); + if (!missing.isEmpty()) { + throw new ServiceFailureException("Missing devices in the response: " + missing); + } + + result.getData().stream() + .filter(entityData -> this.devices.contains(entityData.getEntityId().getId())) + .forEach(entityData -> { + Map values = new HashMap<>(entityData.getLatest().get(EntityKeyType.ENTITY_FIELD)); + values.putAll(entityData.getLatest().get(EntityKeyType.TIME_SERIES)); + + Stream.of("name", "type", "testData").forEach(key -> { + TsValue value = values.get(key); + if (value == null || StringUtils.isBlank(value.getValue())) { + throw new ServiceFailureException("Missing " + key + " for device " + entityData.getEntityId()); + } + }); + }); + } + @SneakyThrows private Set getAssociatedUrls(String baseUrl) { URI url = new URI(baseUrl); diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index ae0d265ed6..6cc2e79cb4 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -105,6 +105,9 @@ monitoring: # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. + edqs: + enabled: "${EDQS_MONITORING_ENABLED:false}" + notifications: message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}' slack: From b7604a8d0af67826d06af9e2b2cea78a9cfd624a Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 20 Mar 2025 17:17:08 +0200 Subject: [PATCH 3/4] Fix NONE partitioning strategy --- .../service/edqs/DefaultEdqsApiService.java | 5 +++-- .../server/edqs/processor/EdqsProcessor.java | 15 ++++++++++----- .../server/edqs/processor/EdqsProducer.java | 2 +- .../server/edqs/state/EdqsPartitionService.java | 7 +++++-- .../queue/discovery/HashPartitionService.java | 17 ++++++++++++++++- .../queue/discovery/PartitionService.java | 2 ++ 6 files changed, 37 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java index 51c963ed2f..c7e17b62ae 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java @@ -76,8 +76,9 @@ public class DefaultEdqsApiService implements EdqsApiService { requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits()); } - Integer partition = edqsPartitionService.resolvePartition(tenantId); - ListenableFuture> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition); + UUID key = UUID.randomUUID(); + Integer partition = edqsPartitionService.resolvePartition(tenantId, key); + ListenableFuture> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(key, requestMsg.build()), partition); return Futures.transform(resultFuture, msg -> { TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg(); return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index f2360617ee..fb15c3fc73 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -173,13 +173,18 @@ public class EdqsProcessor implements TbQueueHandler, if (CollectionsUtil.isNotEmpty(oldPartitions)) { Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); - if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { + if (removedPartitions.isEmpty()) { + return; + } + + if (config.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) { + repository.clearIf(tenantId -> { + Integer partition = partitionService.resolvePartition(tenantId, null); + return removedPartitions.contains(partition); + }); + } else { log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); } - repository.clearIf(tenantId -> { - Integer partition = partitionService.resolvePartition(tenantId); - return partition != null && removedPartitions.contains(partition); - }); } } catch (Throwable t) { log.error("Failed to handle partition change event {}", event, t); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index 970ee76381..a06836339f 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -72,7 +72,7 @@ public class EdqsProducer { }; TopicPartitionInfo tpi = TopicPartitionInfo.builder() .topic(topic) - .partition(partitionService.resolvePartition(tenantId)) + .partition(partitionService.resolvePartition(tenantId, key)) .build(); if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java index 94e9437650..e2fbf9a981 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java @@ -29,11 +29,14 @@ public class EdqsPartitionService { private final HashPartitionService hashPartitionService; private final EdqsConfig edqsConfig; - public Integer resolvePartition(TenantId tenantId) { + public Integer resolvePartition(TenantId tenantId, Object key) { if (edqsConfig.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) { return hashPartitionService.resolvePartitionIndex(tenantId.getId(), edqsConfig.getPartitions()); } else { - return null; + if (key == null) { + throw new IllegalArgumentException("Partitioning key is missing but partitioning strategy is not TENANT"); + } + return hashPartitionService.resolvePartitionIndex(key.toString(), edqsConfig.getPartitions()); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 3a76d50825..7186bf7055 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent; import org.thingsboard.server.queue.util.AfterStartUp; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -559,7 +560,15 @@ public class HashPartitionService implements PartitionService { @Override public int resolvePartitionIndex(UUID entityId, int partitions) { - int hash = hash(entityId); + return resolvePartitionIndex(hash(entityId), partitions); + } + + @Override + public int resolvePartitionIndex(String key, int partitions) { + return resolvePartitionIndex(hash(key), partitions); + } + + private int resolvePartitionIndex(int hash, int partitions) { return Math.abs(hash % partitions); } @@ -725,6 +734,12 @@ public class HashPartitionService implements PartitionService { .hash().asInt(); } + private int hash(String key) { + return hashFunction.newHasher() + .putString(key, StandardCharsets.UTF_8) + .hash().asInt(); + } + public static HashFunction forName(String name) { return switch (name) { case "murmur3_32" -> Hashing.murmur3_32(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 7abd68e25f..5dda413f17 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -79,4 +79,6 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); + int resolvePartitionIndex(String key, int partitions); + } From 0caa6ad86e506c40d235bb778010a734e8dfb93d Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 21 Mar 2025 14:25:54 +0200 Subject: [PATCH 4/4] Refactoring for EDQS --- .../service/edqs/DefaultEdqsService.java | 4 +-- .../server/service/edqs/EdqsSyncService.java | 3 -- .../service/edqs/KafkaEdqsSyncService.java | 3 +- .../server/edqs/processor/EdqsProducer.java | 36 ++++++------------- .../edqs/state/KafkaEdqsStateService.java | 4 +-- .../queue/edqs/KafkaEdqsQueueFactory.java | 1 + .../provider/KafkaMonolithQueueFactory.java | 1 + .../provider/KafkaTbCoreQueueFactory.java | 1 + .../KafkaTbRuleEngineQueueFactory.java | 1 + 9 files changed, 19 insertions(+), 35 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index e823dee4e7..7d5a0cb0fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -96,10 +96,8 @@ public class DefaultEdqsService implements EdqsService { private void init() { executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass()); eventsProducer = EdqsProducer.builder() - .queue(EdqsQueue.EVENTS) - .partitionService(edqsPartitionService) - .topicService(topicService) .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS)) + .partitionService(edqsPartitionService) .build(); syncLock = distributedLockService.getLock("edqs_sync"); } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index 6ea2f959b7..79e0e60983 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -43,7 +43,6 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sql.relation.RelationRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; -import org.thingsboard.server.queue.edqs.EdqsConfig; import java.util.List; import java.util.Map; @@ -76,8 +75,6 @@ public abstract class EdqsSyncService { @Autowired @Lazy private DefaultEdqsService edqsService; - @Autowired - protected EdqsConfig edqsConfig; private final ConcurrentHashMap entityInfoMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap keys = new ConcurrentHashMap<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 201964c955..ad7b7b970d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edqs; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -32,7 +33,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { private final boolean syncNeeded; - public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) { + public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index a06836339f..cc4f913d38 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -16,6 +16,7 @@ package org.thingsboard.server.edqs.processor; import lombok.Builder; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.RecordTooLargeException; import org.thingsboard.server.common.data.ObjectType; @@ -27,53 +28,38 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; @Slf4j +@Builder +@RequiredArgsConstructor public class EdqsProducer { - private final EdqsQueue queue; - private final EdqsPartitionService partitionService; - private final TopicService topicService; - private final TbQueueProducer> producer; - - @Builder - public EdqsProducer(EdqsQueue queue, - EdqsPartitionService partitionService, - TopicService topicService, - TbQueueProducer> producer) { - this.queue = queue; - this.partitionService = partitionService; - this.topicService = topicService; - this.producer = producer; - } + private final EdqsPartitionService partitionService; public void send(TenantId tenantId, ObjectType type, String key, ToEdqsMsg msg) { - String topic = topicService.buildTopicName(queue.getTopic()); + TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .topic(producer.getDefaultTopic()) + .partition(partitionService.resolvePartition(tenantId, key)) + .build(); TbQueueCallback callback = new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg); + log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, tpi, msg); } @Override public void onFailure(Throwable t) { if (t instanceof RecordTooLargeException) { if (!log.isDebugEnabled()) { - log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, topic, t); // not logging the whole message + log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, tpi, t); // not logging the whole message return; } } - log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t); + log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, tpi, msg, t); } }; - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .partition(partitionService.resolvePartition(tenantId, key)) - .build(); if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction } else { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 067c730bfe..c59707c9c3 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -143,10 +143,8 @@ public class KafkaEdqsStateService implements EdqsStateService { .build(); stateProducer = EdqsProducer.builder() - .queue(EdqsQueue.STATE) - .partitionService(partitionService) - .topicService(topicService) .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE)) + .partitionService(partitionService) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index e985696040..a322cc5434 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -95,6 +95,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index f07bd9dcbb..1a8ee1dcee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -593,6 +593,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 3c6d144a0c..2a1dc4171f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -483,6 +483,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index d0e6c2f123..f5300badbe 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -378,6 +378,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build();