From e7580f6093333bc13ea7266ca220d9080c16f66c Mon Sep 17 00:00:00 2001 From: VIacheslavKlimov Date: Wed, 6 Aug 2025 14:43:34 +0300 Subject: [PATCH] Short-lived cache for Kafka topics --- .../ttl/KafkaEdgeTopicsCleanUpService.java | 4 +- .../server/queue/kafka/KafkaAdmin.java | 78 ++++++++----------- .../queue/kafka/TbKafkaSettingsTest.java | 2 +- .../thingsboard/common/util/CachedValue.java | 40 ++++++++++ 4 files changed, 77 insertions(+), 47 deletions(-) create mode 100644 common/util/src/main/java/org/thingsboard/common/util/CachedValue.java diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index f9dc3e736d..3354c63f9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -80,8 +80,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { return; } - Set topics = kafkaAdmin.getAllTopics(); - if (topics == null || topics.isEmpty()) { + Set topics = kafkaAdmin.listTopics(); + if (topics.isEmpty()) { return; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java index 7171dc608b..124df2789e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java @@ -29,7 +29,9 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.CachedValue; import org.thingsboard.server.queue.util.TbKafkaComponent; import java.util.Collections; @@ -49,37 +51,44 @@ import java.util.stream.Collectors; @Slf4j public class KafkaAdmin { /* - * TODO: Get rid of per consumer/producer TbKafkaAdmin, - * use single KafkaAdmin instance that accepts topicConfigs. + * TODO: Get rid of per consumer/producer TbKafkaAdmin, + * use single KafkaAdmin instance that accepts topicConfigs. * */ private final TbKafkaSettings settings; - private final LazyInitializer adminClient; - private volatile Set topics; + private final LazyInitializer adminClient; + private final CachedValue> topics; - public KafkaAdmin(TbKafkaSettings settings) { + public KafkaAdmin(@Lazy TbKafkaSettings settings) { this.settings = settings; this.adminClient = LazyInitializer.builder() .setInitializer(() -> AdminClient.create(settings.toAdminProps())) .get(); + this.topics = new CachedValue<>(() -> { + Set topics = ConcurrentHashMap.newKeySet(); + topics.addAll(listTopics()); + return topics; + }, TimeUnit.MINUTES.toMillis(5)); } public void createTopicIfNotExists(String topic, Map properties, boolean force) { - if (!force) { - Set topics = getTopics(); - if (topics.contains(topic)) { - return; - } + Set topics = getTopics(); + if (!force && topics.contains(topic)) { + log.trace("Topic {} already exists", topic); + return; } - try { - String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); - int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1; - NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties); + log.debug("Creating topic {} with properties {}", topic, properties); + String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); + int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1; + NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties); + + try { createTopic(newTopic).values().get(topic).get(); topics.add(topic); } catch (ExecutionException ee) { + log.trace("Failed to create topic {} with properties {}", topic, properties, ee); if (ee.getCause() instanceof TopicExistsException) { //do nothing } else { @@ -93,48 +102,29 @@ public class KafkaAdmin { } public void deleteTopic(String topic) { - Set topics = getTopics(); - if (topics.remove(topic)) { - getClient().deleteTopics(Collections.singletonList(topic)); - } else { - try { - if (getClient().listTopics().names().get().contains(topic)) { - getClient().deleteTopics(Collections.singletonList(topic)); - } else { - log.warn("Kafka topic [{}] does not exist.", topic); - } - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to delete kafka topic [{}].", topic, e); - } + log.debug("Deleting topic {}", topic); + try { + getClient().deleteTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Failed to delete kafka topic [{}].", topic, e); } } private Set getTopics() { - if (topics == null) { - synchronized (this) { - if (topics == null) { - topics = ConcurrentHashMap.newKeySet(); - try { - topics.addAll(getClient().listTopics().names().get()); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to get all topics.", e); - } - } - } - } - return topics; + return topics.get(); } - public Set getAllTopics() { + public Set listTopics() { try { - return getClient().listTopics().names().get(); + Set topics = getClient().listTopics().names().get(); + log.trace("Listed topics: {}", topics); + return topics; } catch (InterruptedException | ExecutionException e) { log.error("Failed to get all topics.", e); + return Collections.emptySet(); } - return null; } - public CreateTopicsResult createTopic(NewTopic topic) { return getClient().createTopics(Collections.singletonList(topic)); } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java index ad026c63aa..bc37982245 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java @@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; -@SpringBootTest(classes = TbKafkaSettings.class) +@SpringBootTest(classes = {TbKafkaSettings.class, KafkaAdmin.class}) @TestPropertySource(properties = { "queue.type=kafka", "queue.kafka.bootstrap.servers=localhost:9092", diff --git a/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java new file mode 100644 index 0000000000..99c80695b4 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class CachedValue { + + private static final Object KEY = new Object(); + + private final LoadingCache cache; + + public CachedValue(Supplier supplier, long valueTtlMs) { + this.cache = Caffeine.newBuilder() + .expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS) + .build(__ -> supplier.get()); + } + + public V get() { + return cache.get(KEY); + } + +}