|
|
|
@ -228,7 +228,7 @@ public class HashPartitionService implements PartitionService { |
|
|
|
}); |
|
|
|
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { |
|
|
|
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, queueKeys.stream() |
|
|
|
.collect(Collectors.toMap(k -> k, k -> Collections.emptySet())), Collections.emptyMap()); |
|
|
|
.collect(Collectors.toMap(k -> k, k -> Collections.emptySet()))); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -408,7 +408,6 @@ public class HashPartitionService implements PartitionService { |
|
|
|
myPartitions = newPartitions; |
|
|
|
|
|
|
|
Map<QueueKey, Set<TopicPartitionInfo>> changedPartitionsMap = new HashMap<>(); |
|
|
|
Map<QueueKey, Set<TopicPartitionInfo>> oldPartitionsMap = new HashMap<>(); |
|
|
|
|
|
|
|
Set<QueueKey> removed = new HashSet<>(); |
|
|
|
oldPartitions.forEach((queueKey, partitions) -> { |
|
|
|
@ -429,16 +428,16 @@ public class HashPartitionService implements PartitionService { |
|
|
|
|
|
|
|
myPartitions.forEach((queueKey, partitions) -> { |
|
|
|
if (!partitions.equals(oldPartitions.get(queueKey))) { |
|
|
|
changedPartitionsMap.put(queueKey, toTpiList(queueKey, partitions)); |
|
|
|
oldPartitionsMap.put(queueKey, toTpiList(queueKey, oldPartitions.get(queueKey))); |
|
|
|
Set<TopicPartitionInfo> tpiList = partitions.stream() |
|
|
|
.map(partition -> buildTopicPartitionInfo(queueKey, partition)) |
|
|
|
.collect(Collectors.toSet()); |
|
|
|
changedPartitionsMap.put(queueKey, tpiList); |
|
|
|
} |
|
|
|
}); |
|
|
|
if (!changedPartitionsMap.isEmpty()) { |
|
|
|
changedPartitionsMap.entrySet().stream() |
|
|
|
.collect(Collectors.groupingBy(entry -> entry.getKey().getType(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) |
|
|
|
.forEach((serviceType, partitionsMap) -> { |
|
|
|
publishPartitionChangeEvent(serviceType, partitionsMap, oldPartitionsMap); |
|
|
|
}); |
|
|
|
.forEach(this::publishPartitionChangeEvent); |
|
|
|
} |
|
|
|
|
|
|
|
if (currentOtherServices == null) { |
|
|
|
@ -470,15 +469,13 @@ public class HashPartitionService implements PartitionService { |
|
|
|
applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService)); |
|
|
|
} |
|
|
|
|
|
|
|
private void publishPartitionChangeEvent(ServiceType serviceType, |
|
|
|
Map<QueueKey, Set<TopicPartitionInfo>> newPartitions, |
|
|
|
Map<QueueKey, Set<TopicPartitionInfo>> oldPartitions) { |
|
|
|
log.info("Partitions changed: {}", System.lineSeparator() + newPartitions.entrySet().stream() |
|
|
|
private void publishPartitionChangeEvent(ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) { |
|
|
|
log.info("Partitions changed: {}", System.lineSeparator() + partitionsMap.entrySet().stream() |
|
|
|
.map(entry -> "[" + entry.getKey() + "] - [" + entry.getValue().stream() |
|
|
|
.map(tpi -> tpi.getPartition().orElse(-1).toString()).sorted() |
|
|
|
.collect(Collectors.joining(", ")) + "]") |
|
|
|
.collect(Collectors.joining(System.lineSeparator()))); |
|
|
|
PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, newPartitions); |
|
|
|
PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap); |
|
|
|
try { |
|
|
|
applicationEventPublisher.publishEvent(event); |
|
|
|
} catch (Exception e) { |
|
|
|
@ -486,15 +483,6 @@ public class HashPartitionService implements PartitionService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Set<TopicPartitionInfo> toTpiList(QueueKey queueKey, List<Integer> partitions) { |
|
|
|
if (partitions == null) { |
|
|
|
return Collections.emptySet(); |
|
|
|
} |
|
|
|
return partitions.stream() |
|
|
|
.map(partition -> buildTopicPartitionInfo(queueKey, partition)) |
|
|
|
.collect(Collectors.toSet()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public Set<String> getAllServiceIds(ServiceType serviceType) { |
|
|
|
return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet()); |
|
|
|
|