Browse Source

Refactoring for KafkaAdmin

pull/13799/head
VIacheslavKlimov 10 months ago
parent
commit
f416e4677b
  1. 49
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java
  2. 4
      common/util/src/main/java/org/thingsboard/common/util/CachedValue.java

49
common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java

@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
@ -29,6 +28,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.CachedValue;
@ -50,6 +50,7 @@ import java.util.stream.Collectors;
@Component
@Slf4j
public class KafkaAdmin {
/*
* TODO: Get rid of per consumer/producer TbKafkaAdmin,
* use single KafkaAdmin instance that accepts topicConfigs.
@ -57,6 +58,11 @@ public class KafkaAdmin {
private final TbKafkaSettings settings;
@Value("${queue.kafka.request.timeout.ms:30000}")
private int requestTimeoutMs;
@Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default
private int topicsCacheTtlMs;
private final LazyInitializer<AdminClient> adminClient;
private final CachedValue<Set<String>> topics;
@ -69,13 +75,13 @@ public class KafkaAdmin {
Set<String> topics = ConcurrentHashMap.newKeySet();
topics.addAll(listTopics());
return topics;
}, TimeUnit.MINUTES.toMillis(5));
}, topicsCacheTtlMs);
}
public void createTopicIfNotExists(String topic, Map<String, String> properties, boolean force) {
Set<String> topics = getTopics();
if (!force && topics.contains(topic)) {
log.trace("Topic {} already exists", topic);
log.trace("Topic {} already present in cache", topic);
return;
}
@ -85,7 +91,7 @@ public class KafkaAdmin {
NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties);
try {
createTopic(newTopic).values().get(topic).get();
getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
topics.add(topic);
} catch (ExecutionException ee) {
log.trace("Failed to create topic {} with properties {}", topic, properties, ee);
@ -104,7 +110,7 @@ public class KafkaAdmin {
public void deleteTopic(String topic) {
log.debug("Deleting topic {}", topic);
try {
getClient().deleteTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
@ -116,19 +122,15 @@ public class KafkaAdmin {
public Set<String> listTopics() {
try {
Set<String> topics = getClient().listTopics().names().get();
Set<String> topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
log.trace("Listed topics: {}", topics);
return topics;
} catch (InterruptedException | ExecutionException e) {
} catch (Exception e) {
log.error("Failed to get all topics.", e);
return Collections.emptySet();
}
}
public CreateTopicsResult createTopic(NewTopic topic) {
return getClient().createTopics(Collections.singletonList(topic));
}
public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
Map<String, Long> result = new HashMap<>();
for (String groupId : groupIds) {
@ -148,8 +150,7 @@ public class KafkaAdmin {
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
getClient().listOffsets(latestOffsetsSpec)
.all().get(10, TimeUnit.SECONDS);
getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
return committedOffsets.entrySet().stream()
.mapToLong(entry -> {
@ -168,7 +169,7 @@ public class KafkaAdmin {
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
}
/**
@ -211,7 +212,7 @@ public class KafkaAdmin {
} else {
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS);
getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}
@ -228,23 +229,19 @@ public class KafkaAdmin {
return true;
}
List<TopicPartition> allPartitions = getClient().describeTopics(existingTopics).topicNameValues().entrySet().stream()
List<TopicPartition> allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS)
.entrySet().stream()
.flatMap(entry -> {
String topic = entry.getKey();
TopicDescription topicDescription;
try {
topicDescription = entry.getValue().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
TopicDescription topicDescription = entry.getValue();
return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
})
.toList();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = getClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get();
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = getClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get();
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
for (TopicPartition partition : allPartitions) {
long beginningOffset = beginningOffsets.get(partition).offset();
@ -256,7 +253,7 @@ public class KafkaAdmin {
}
}
return true;
} catch (InterruptedException | ExecutionException e) {
} catch (Exception e) {
log.error("Failed to check if topics [{}] empty.", topics, e);
return false;
}
@ -264,7 +261,7 @@ public class KafkaAdmin {
public void deleteConsumerGroup(String consumerGroupId) {
try {
getClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId));
getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Failed to delete consumer group {}", consumerGroupId, e);
}

4
common/util/src/main/java/org/thingsboard/common/util/CachedValue.java

@ -23,8 +23,6 @@ import java.util.function.Supplier;
public class CachedValue<V> {
private static final Object KEY = new Object();
private final LoadingCache<Object, V> cache;
public CachedValue(Supplier<V> supplier, long valueTtlMs) {
@ -34,7 +32,7 @@ public class CachedValue<V> {
}
public V get() {
return cache.get(KEY);
return cache.get(this);
}
}

Loading…
Cancel
Save