From f416e4677bfc1c6f97b0392fd0b64cd8ce255199 Mon Sep 17 00:00:00 2001 From: VIacheslavKlimov Date: Wed, 6 Aug 2025 16:28:11 +0300 Subject: [PATCH] Refactoring for KafkaAdmin --- .../server/queue/kafka/KafkaAdmin.java | 49 +++++++++---------- .../thingsboard/common/util/CachedValue.java | 4 +- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java index 124df2789e..6261e81497 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; @@ -29,6 +28,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.common.util.CachedValue; @@ -50,6 +50,7 @@ import java.util.stream.Collectors; @Component @Slf4j public class KafkaAdmin { + /* * TODO: Get rid of per consumer/producer TbKafkaAdmin, * use single KafkaAdmin instance that accepts topicConfigs. @@ -57,6 +58,11 @@ public class KafkaAdmin { private final TbKafkaSettings settings; + @Value("${queue.kafka.request.timeout.ms:30000}") + private int requestTimeoutMs; + @Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default + private int topicsCacheTtlMs; + private final LazyInitializer adminClient; private final CachedValue> topics; @@ -69,13 +75,13 @@ public class KafkaAdmin { Set topics = ConcurrentHashMap.newKeySet(); topics.addAll(listTopics()); return topics; - }, TimeUnit.MINUTES.toMillis(5)); + }, topicsCacheTtlMs); } public void createTopicIfNotExists(String topic, Map properties, boolean force) { Set topics = getTopics(); if (!force && topics.contains(topic)) { - log.trace("Topic {} already exists", topic); + log.trace("Topic {} already present in cache", topic); return; } @@ -85,7 +91,7 @@ public class KafkaAdmin { NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties); try { - createTopic(newTopic).values().get(topic).get(); + getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); topics.add(topic); } catch (ExecutionException ee) { log.trace("Failed to create topic {} with properties {}", topic, properties, ee); @@ -104,7 +110,7 @@ public class KafkaAdmin { public void deleteTopic(String topic) { log.debug("Deleting topic {}", topic); try { - getClient().deleteTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS); + getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("Failed to delete kafka topic [{}].", topic, e); } @@ -116,19 +122,15 @@ public class KafkaAdmin { public Set listTopics() { try { - Set topics = getClient().listTopics().names().get(); + Set topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS); log.trace("Listed topics: {}", topics); return topics; - } catch (InterruptedException | ExecutionException e) { + } catch (Exception e) { log.error("Failed to get all topics.", e); return Collections.emptySet(); } } - public CreateTopicsResult createTopic(NewTopic topic) { - return getClient().createTopics(Collections.singletonList(topic)); - } - public Map getTotalLagForGroupsBulk(Set groupIds) { Map result = new HashMap<>(); for (String groupId : groupIds) { @@ -148,8 +150,7 @@ public class KafkaAdmin { .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); Map endOffsets = - getClient().listOffsets(latestOffsetsSpec) - .all().get(10, TimeUnit.SECONDS); + getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); return committedOffsets.entrySet().stream() .mapToLong(entry -> { @@ -168,7 +169,7 @@ public class KafkaAdmin { @SneakyThrows public Map getConsumerGroupOffsets(String groupId) { - return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS); } /** @@ -211,7 +212,7 @@ public class KafkaAdmin { } else { log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); } - getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS); + getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); log.info("[{}] altered new consumer groupId {}", tp, newGroupId); break; } @@ -228,23 +229,19 @@ public class KafkaAdmin { return true; } - List allPartitions = getClient().describeTopics(existingTopics).topicNameValues().entrySet().stream() + List allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS) + .entrySet().stream() .flatMap(entry -> { String topic = entry.getKey(); - TopicDescription topicDescription; - try { - topicDescription = entry.getValue().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + TopicDescription topicDescription = entry.getValue(); return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); }) .toList(); Map beginningOffsets = getClient().listOffsets(allPartitions.stream() - .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(); + .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); Map endOffsets = getClient().listOffsets(allPartitions.stream() - .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(); + .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); for (TopicPartition partition : allPartitions) { long beginningOffset = beginningOffsets.get(partition).offset(); @@ -256,7 +253,7 @@ public class KafkaAdmin { } } return true; - } catch (InterruptedException | ExecutionException e) { + } catch (Exception e) { log.error("Failed to check if topics [{}] empty.", topics, e); return false; } @@ -264,7 +261,7 @@ public class KafkaAdmin { public void deleteConsumerGroup(String consumerGroupId) { try { - getClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId)); + getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("Failed to delete consumer group {}", consumerGroupId, e); } diff --git a/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java index 99c80695b4..b0a41c2a42 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java +++ b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java @@ -23,8 +23,6 @@ import java.util.function.Supplier; public class CachedValue { - private static final Object KEY = new Object(); - private final LoadingCache cache; public CachedValue(Supplier supplier, long valueTtlMs) { @@ -34,7 +32,7 @@ public class CachedValue { } public V get() { - return cache.get(KEY); + return cache.get(this); } }