|
|
|
@ -325,13 +325,11 @@ public class HashPartitionService implements PartitionService { |
|
|
|
.forEach(removed::add); |
|
|
|
} |
|
|
|
removed.forEach(queueKey -> { |
|
|
|
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey); |
|
|
|
changedPartitionsMap.put(queueKey, Collections.emptySet()); |
|
|
|
}); |
|
|
|
|
|
|
|
myPartitions.forEach((queueKey, partitions) -> { |
|
|
|
if (!partitions.equals(oldPartitions.get(queueKey))) { |
|
|
|
log.info("[{}] NEW PARTITIONS: {}", queueKey, partitions); |
|
|
|
Set<TopicPartitionInfo> tpiList = partitions.stream() |
|
|
|
.map(partition -> buildTopicPartitionInfo(queueKey, partition)) |
|
|
|
.collect(Collectors.toSet()); |
|
|
|
@ -377,14 +375,11 @@ public class HashPartitionService implements PartitionService { |
|
|
|
} |
|
|
|
|
|
|
|
private void publishPartitionChangeEvent(ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) { |
|
|
|
if (log.isDebugEnabled()) { |
|
|
|
log.debug("Publishing partition change event for service type " + serviceType + ":" + System.lineSeparator() + |
|
|
|
partitionsMap.entrySet().stream() |
|
|
|
.map(entry -> entry.getKey() + " - " + entry.getValue().stream() |
|
|
|
.map(TopicPartitionInfo::getFullTopicName).sorted() |
|
|
|
.collect(Collectors.toList())) |
|
|
|
.collect(Collectors.joining(System.lineSeparator()))); |
|
|
|
} |
|
|
|
log.info("Partitions changed: {}", System.lineSeparator() + partitionsMap.entrySet().stream() |
|
|
|
.map(entry -> "[" + entry.getKey() + "] - [" + entry.getValue().stream() |
|
|
|
.map(tpi -> tpi.getPartition().orElse(-1).toString()).sorted() |
|
|
|
.collect(Collectors.joining(", ")) + "]") |
|
|
|
.collect(Collectors.joining(System.lineSeparator()))); |
|
|
|
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); |
|
|
|
} |
|
|
|
|
|
|
|
@ -489,7 +484,7 @@ public class HashPartitionService implements PartitionService { |
|
|
|
} |
|
|
|
|
|
|
|
private void logServiceInfo(TransportProtos.ServiceInfo server) { |
|
|
|
log.info("[{}] Found common server: [{}]", server.getServiceId(), server.getServiceTypesList()); |
|
|
|
log.info("[{}] Found common server: {}", server.getServiceId(), server.getServiceTypesList()); |
|
|
|
} |
|
|
|
|
|
|
|
private void addNode(Map<QueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) { |
|
|
|
|