|
|
|
@ -58,15 +58,12 @@ public class KafkaAdmin { |
|
|
|
|
|
|
|
private final TbKafkaSettings settings; |
|
|
|
|
|
|
|
@Value("${queue.kafka.request.timeout.ms:30000}") |
|
|
|
private int requestTimeoutMs; |
|
|
|
@Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default
|
|
|
|
private int topicsCacheTtlMs; |
|
|
|
|
|
|
|
private final LazyInitializer<AdminClient> adminClient; |
|
|
|
private final CachedValue<Set<String>> topics; |
|
|
|
|
|
|
|
public KafkaAdmin(@Lazy TbKafkaSettings settings) { |
|
|
|
public KafkaAdmin(@Lazy TbKafkaSettings settings, |
|
|
|
@Value("${queue.kafka.topics_cache_ttl_ms:300000}") |
|
|
|
int topicsCacheTtlMs) { |
|
|
|
this.settings = settings; |
|
|
|
this.adminClient = LazyInitializer.<AdminClient>builder() |
|
|
|
.setInitializer(() -> AdminClient.create(settings.toAdminProps())) |
|
|
|
@ -91,7 +88,7 @@ public class KafkaAdmin { |
|
|
|
NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties); |
|
|
|
|
|
|
|
try { |
|
|
|
getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
getClient().createTopics(List.of(newTopic)).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
topics.add(topic); |
|
|
|
} catch (ExecutionException ee) { |
|
|
|
log.trace("Failed to create topic {} with properties {}", topic, properties, ee); |
|
|
|
@ -110,7 +107,7 @@ public class KafkaAdmin { |
|
|
|
public void deleteTopic(String topic) { |
|
|
|
log.debug("Deleting topic {}", topic); |
|
|
|
try { |
|
|
|
getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
getClient().deleteTopics(List.of(topic)).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Failed to delete kafka topic [{}].", topic, e); |
|
|
|
} |
|
|
|
@ -122,7 +119,7 @@ public class KafkaAdmin { |
|
|
|
|
|
|
|
public Set<String> listTopics() { |
|
|
|
try { |
|
|
|
Set<String> topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
Set<String> topics = getClient().listTopics().names().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
log.trace("Listed topics: {}", topics); |
|
|
|
return topics; |
|
|
|
} catch (Exception e) { |
|
|
|
@ -150,7 +147,7 @@ public class KafkaAdmin { |
|
|
|
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); |
|
|
|
|
|
|
|
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = |
|
|
|
getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
getClient().listOffsets(latestOffsetsSpec).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
|
|
|
|
return committedOffsets.entrySet().stream() |
|
|
|
.mapToLong(entry -> { |
|
|
|
@ -169,7 +166,7 @@ public class KafkaAdmin { |
|
|
|
|
|
|
|
@SneakyThrows |
|
|
|
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) { |
|
|
|
return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
@ -212,7 +209,7 @@ public class KafkaAdmin { |
|
|
|
} else { |
|
|
|
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); |
|
|
|
} |
|
|
|
getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
log.info("[{}] altered new consumer groupId {}", tp, newGroupId); |
|
|
|
break; |
|
|
|
} |
|
|
|
@ -229,7 +226,7 @@ public class KafkaAdmin { |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
List<TopicPartition> allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS) |
|
|
|
List<TopicPartition> allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS) |
|
|
|
.entrySet().stream() |
|
|
|
.flatMap(entry -> { |
|
|
|
String topic = entry.getKey(); |
|
|
|
@ -239,9 +236,9 @@ public class KafkaAdmin { |
|
|
|
.toList(); |
|
|
|
|
|
|
|
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = getClient().listOffsets(allPartitions.stream() |
|
|
|
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = getClient().listOffsets(allPartitions.stream() |
|
|
|
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
|
|
|
|
for (TopicPartition partition : allPartitions) { |
|
|
|
long beginningOffset = beginningOffsets.get(partition).offset(); |
|
|
|
@ -261,7 +258,7 @@ public class KafkaAdmin { |
|
|
|
|
|
|
|
public void deleteConsumerGroup(String consumerGroupId) { |
|
|
|
try { |
|
|
|
getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(settings.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("Failed to delete consumer group {}", consumerGroupId, e); |
|
|
|
} |
|
|
|
|