diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index bf5555817d..a5a0427606 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -16,7 +16,6 @@ package org.thingsboard.server.queue.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.errors.TopicExistsException; @@ -35,23 +34,17 @@ import java.util.concurrent.ExecutionException; @Slf4j public class TbKafkaAdmin implements TbQueueAdmin { - private final AdminClient client; + private final TbKafkaSettings settings; private final Map topicConfigs; - private final Set topics = ConcurrentHashMap.newKeySet(); private final int numPartitions; + private volatile Set topics; private final short replicationFactor; public TbKafkaAdmin(TbKafkaSettings settings, Map topicConfigs) { - client = AdminClient.create(settings.toAdminProps()); + this.settings = settings; this.topicConfigs = topicConfigs; - try { - topics.addAll(client.listTopics().names().get()); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to get all topics.", e); - } - String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); if (numPartitionsStr != null) { numPartitions = Integer.parseInt(numPartitionsStr); @@ -64,6 +57,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { @Override public void createTopicIfNotExists(String topic, String properties) { + Set topics = getTopics(); if (topics.contains(topic)) { return; } @@ -86,12 +80,13 @@ public class TbKafkaAdmin implements TbQueueAdmin { @Override public void deleteTopic(String topic) { + Set topics = getTopics(); if (topics.contains(topic)) { - client.deleteTopics(Collections.singletonList(topic)); + settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); } else { try { - if (client.listTopics().names().get().contains(topic)) { - client.deleteTopics(Collections.singletonList(topic)); + if (settings.getAdminClient().listTopics().names().get().contains(topic)) { + settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); } else { log.warn("Kafka topic [{}] does not exist.", topic); } @@ -101,14 +96,28 @@ public class TbKafkaAdmin implements TbQueueAdmin { } } - @Override - public void destroy() { - if (client != null) { - client.close(); + private Set getTopics() { + if (topics == null) { + synchronized (this) { + if (topics == null) { + topics = ConcurrentHashMap.newKeySet(); + try { + topics.addAll(settings.getAdminClient().listTopics().names().get()); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to get all topics.", e); + } + } + } } + return topics; } public CreateTopicsResult createTopic(NewTopic topic) { - return client.createTopics(Collections.singletonList(topic)); + return settings.getAdminClient().createTopics(Collections.singletonList(topic)); + } + + @Override + public void destroy() { } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java index dbc1b7b7c4..56440cf4a5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java @@ -15,11 +15,12 @@ */ package org.thingsboard.server.queue.kafka; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.Builder; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -35,8 +36,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.queue.discovery.PartitionService; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -62,7 +61,6 @@ public class TbKafkaConsumerStatsService { @Autowired private PartitionService partitionService; - private AdminClient adminClient; private Consumer consumer; private ScheduledExecutorService statsPrintScheduler; @@ -71,7 +69,6 @@ public class TbKafkaConsumerStatsService { if (!statsConfig.getEnabled()) { return; } - this.adminClient = AdminClient.create(kafkaSettings.toAdminProps()); this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats")); Properties consumerProps = kafkaSettings.toConsumerProps(null); @@ -90,7 +87,7 @@ public class TbKafkaConsumerStatsService { } for (String groupId : monitoredGroups) { try { - Map groupOffsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() + Map groupOffsets = kafkaSettings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() .get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS); Map endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration); @@ -157,9 +154,6 @@ public class TbKafkaConsumerStatsService { if (statsPrintScheduler != null) { statsPrintScheduler.shutdownNow(); } - if (adminClient != null) { - adminClient.close(); - } if (consumer != null) { consumer.close(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 96fa7afb7f..ebbc6fac7e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -19,6 +19,7 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -34,6 +35,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.TbProperty; import org.thingsboard.server.queue.util.PropertyUtils; +import javax.annotation.PreDestroy; import java.util.Collections; import java.util.List; import java.util.Map; @@ -143,13 +145,7 @@ public class TbKafkaSettings { @Setter private Map> consumerPropertiesPerTopic = Collections.emptyMap(); - public Properties toAdminProps() { - Properties props = toProps(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); - props.put(AdminClientConfig.RETRIES_CONFIG, retries); - - return props; - } + private volatile AdminClient adminClient; public Properties toConsumerProps(String topic) { Properties props = toProps(); @@ -221,4 +217,29 @@ public class TbKafkaSettings { } } + public AdminClient getAdminClient() { + if (adminClient == null) { + synchronized (this) { + if (adminClient == null) { + adminClient = AdminClient.create(toAdminProps()); + } + } + } + return adminClient; + } + + private Properties toAdminProps() { + Properties props = toProps(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(AdminClientConfig.RETRIES_CONFIG, retries); + return props; + } + + @PreDestroy + private void destroy() { + if (adminClient != null) { + adminClient.close(); + } + } + }