Browse Source

Improvements for Kafka admin creation

pull/10801/head
ViacheslavKlimov 2 years ago
parent
commit
e8d14f37cb
  1. 45
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java
  2. 12
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java
  3. 35
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java

45
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java

@ -16,7 +16,6 @@
package org.thingsboard.server.queue.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
@ -35,23 +34,17 @@ import java.util.concurrent.ExecutionException;
@Slf4j
public class TbKafkaAdmin implements TbQueueAdmin {
private final AdminClient client;
private final TbKafkaSettings settings;
private final Map<String, String> topicConfigs;
private final Set<String> topics = ConcurrentHashMap.newKeySet();
private final int numPartitions;
private volatile Set<String> topics;
private final short replicationFactor;
public TbKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) {
client = AdminClient.create(settings.toAdminProps());
this.settings = settings;
this.topicConfigs = topicConfigs;
try {
topics.addAll(client.listTopics().names().get());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
if (numPartitionsStr != null) {
numPartitions = Integer.parseInt(numPartitionsStr);
@ -64,6 +57,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
@Override
public void createTopicIfNotExists(String topic, String properties) {
Set<String> topics = getTopics();
if (topics.contains(topic)) {
return;
}
@ -86,12 +80,13 @@ public class TbKafkaAdmin implements TbQueueAdmin {
@Override
public void deleteTopic(String topic) {
Set<String> topics = getTopics();
if (topics.contains(topic)) {
client.deleteTopics(Collections.singletonList(topic));
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
try {
if (client.listTopics().names().get().contains(topic)) {
client.deleteTopics(Collections.singletonList(topic));
if (settings.getAdminClient().listTopics().names().get().contains(topic)) {
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
log.warn("Kafka topic [{}] does not exist.", topic);
}
@ -101,14 +96,28 @@ public class TbKafkaAdmin implements TbQueueAdmin {
}
}
@Override
public void destroy() {
if (client != null) {
client.close();
private Set<String> getTopics() {
if (topics == null) {
synchronized (this) {
if (topics == null) {
topics = ConcurrentHashMap.newKeySet();
try {
topics.addAll(settings.getAdminClient().listTopics().names().get());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
}
}
}
return topics;
}
public CreateTopicsResult createTopic(NewTopic topic) {
return client.createTopics(Collections.singletonList(topic));
return settings.getAdminClient().createTopics(Collections.singletonList(topic));
}
@Override
public void destroy() {
}
}

12
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java

@ -15,11 +15,12 @@
*/
package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -35,8 +36,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.queue.discovery.PartitionService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@ -62,7 +61,6 @@ public class TbKafkaConsumerStatsService {
@Autowired
private PartitionService partitionService;
private AdminClient adminClient;
private Consumer<String, byte[]> consumer;
private ScheduledExecutorService statsPrintScheduler;
@ -71,7 +69,6 @@ public class TbKafkaConsumerStatsService {
if (!statsConfig.getEnabled()) {
return;
}
this.adminClient = AdminClient.create(kafkaSettings.toAdminProps());
this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats"));
Properties consumerProps = kafkaSettings.toConsumerProps(null);
@ -90,7 +87,7 @@ public class TbKafkaConsumerStatsService {
}
for (String groupId : monitoredGroups) {
try {
Map<TopicPartition, OffsetAndMetadata> groupOffsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
Map<TopicPartition, OffsetAndMetadata> groupOffsets = kafkaSettings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
.get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration);
@ -157,9 +154,6 @@ public class TbKafkaConsumerStatsService {
if (statsPrintScheduler != null) {
statsPrintScheduler.shutdownNow();
}
if (adminClient != null) {
adminClient.close();
}
if (consumer != null) {
consumer.close();
}

35
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java

@ -19,6 +19,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -34,6 +35,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.TbProperty;
import org.thingsboard.server.queue.util.PropertyUtils;
import javax.annotation.PreDestroy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -143,13 +145,7 @@ public class TbKafkaSettings {
@Setter
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = Collections.emptyMap();
public Properties toAdminProps() {
Properties props = toProps();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
return props;
}
private volatile AdminClient adminClient;
public Properties toConsumerProps(String topic) {
Properties props = toProps();
@ -221,4 +217,29 @@ public class TbKafkaSettings {
}
}
public AdminClient getAdminClient() {
if (adminClient == null) {
synchronized (this) {
if (adminClient == null) {
adminClient = AdminClient.create(toAdminProps());
}
}
}
return adminClient;
}
private Properties toAdminProps() {
Properties props = toProps();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
return props;
}
@PreDestroy
private void destroy() {
if (adminClient != null) {
adminClient.close();
}
}
}

Loading…
Cancel
Save