From e874a91df0fa1fb4341eec659621ae4aef0e2248 Mon Sep 17 00:00:00 2001 From: Oleksandra Matviienko Date: Tue, 19 May 2026 15:25:43 +0200 Subject: [PATCH] Technical cleanup of deprecated Curator API in ZooKeeper discovery --- .../queue/discovery/ZkDiscoveryService.java | 48 ++++++++++--------- .../discovery/ZkDiscoveryServiceTest.java | 31 ++++++------ 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index ccc242a1a3..47c2761020 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -26,9 +26,8 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryForever; @@ -54,12 +53,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED; +import static org.apache.curator.framework.recipes.cache.CuratorCacheAccessor.parentPathFilter; @Service @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false) @Slf4j -public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener { +public class ZkDiscoveryService implements DiscoveryService { @Value("${zk.url}") private String zkUrl; @@ -84,7 +83,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private ScheduledExecutorService zkExecutorService; @Getter private CuratorFramework client; - private PathChildrenCache cache; + private CuratorCache cache; private String nodePath; private String zkNodesDir; @@ -117,7 +116,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @Override public List getOtherServers() { - return cache.getCurrentData().stream() + return cache.stream() + .filter(parentPathFilter(zkNodesDir)) .filter(cd -> !cd.getPath().equals(nodePath)) .map(cd -> { try { @@ -241,7 +241,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout, new RetryForever(zkRetryInterval)); client.start(); client.blockUntilConnected(); - cache = new PathChildrenCache(client, zkNodesDir, true); + client.createContainers(zkNodesDir); + cache = CuratorCache.builder(client, zkNodesDir).build(); cache.start(); stopped = false; log.info("ZK client connected"); @@ -254,7 +255,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } private void subscribeToEvents() { - cache.getListenable().addListener(this); + cache.listenable().addListener(this::onCacheEvent); } private void unpublishCurrentServer() { @@ -286,29 +287,32 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi return "The " + propertyName + " property need to be set!"; } - @Override - public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { + @SneakyThrows + void onCacheEvent(CuratorCacheListener.Type type, ChildData oldData, ChildData newData) { if (stopped) { - log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent); + log.debug("Ignoring {}. Service is stopped.", type); return; } if (client.getState() != CuratorFrameworkState.STARTED) { - log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState()); + log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", type, client.getState()); return; } - ChildData data = pathChildrenCacheEvent.getData(); + ChildData data = type == CuratorCacheListener.Type.NODE_DELETED ? oldData : newData; if (data == null) { - log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent); + log.debug("Ignoring {} due to empty child data", type); return; } else if (data.getData() == null) { - log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent); + log.debug("Ignoring {} due to empty child's data", type); + return; + } else if (zkNodesDir.equals(data.getPath())) { + log.debug("Ignoring event about parent node {}", data.getPath()); return; } else if (nodePath != null && nodePath.equals(data.getPath())) { - if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) { + if (type == CuratorCacheListener.Type.NODE_DELETED) { log.info("ZK node for current instance is somehow deleted."); publishCurrentServer(); } - log.debug("Ignoring event about current server {}", pathChildrenCacheEvent); + log.debug("Ignoring event about current server {}", data.getPath()); return; } TransportProtos.ServiceInfo instance; @@ -322,9 +326,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi String serviceId = instance.getServiceId(); ProtocolStringList serviceTypesList = instance.getServiceTypesList(); - log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId); - switch (pathChildrenCacheEvent.getType()) { - case CHILD_ADDED: + log.trace("Processing [{}] event for [{}]", type, serviceId); + switch (type) { + case NODE_CREATED: ScheduledFuture task = delayedTasks.remove(serviceId); if (task != null) { if (task.cancel(false)) { @@ -341,7 +345,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi recalculatePartitions(); } break; - case CHILD_REMOVED: + case NODE_DELETED: zkExecutorService.submit(() -> applicationEventPublisher.publishEvent(new OtherServiceShutdownEvent(this, serviceId, serviceTypesList))); ScheduledFuture future = zkExecutorService.schedule(() -> { log.debug("[{}] Going to recalculate partitions due to removed node [{}]", diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java index 74bf30dca6..a7be216f66 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -18,8 +18,8 @@ package org.thingsboard.server.queue.discovery; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -36,8 +36,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; -import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED; -import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_DELETED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -63,12 +63,10 @@ public class ZkDiscoveryServiceTest { private CuratorFramework client; @Mock - private PathChildrenCache cache; - - @Mock - private CuratorFramework curatorFramework; + private CuratorCache cache; private ZkDiscoveryService zkDiscoveryService; + private List dataList; private static final long RECALCULATE_DELAY = 100L; @@ -89,12 +87,13 @@ public class ZkDiscoveryServiceTest { ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY); ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); + ReflectionTestUtils.setField(zkDiscoveryService, "zkNodesDir", "/thingsboard/nodes"); when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); - List dataList = new ArrayList<>(); + dataList = new ArrayList<>(); dataList.add(currentData); - when(cache.getCurrentData()).thenReturn(dataList); + when(cache.stream()).thenAnswer(inv -> dataList.stream()); } @Test @@ -178,14 +177,14 @@ public class ZkDiscoveryServiceTest { verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo, childInfo))); } - private void startNode(ChildData data) throws Exception { - cache.getCurrentData().add(data); - zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_ADDED, data)); + private void startNode(ChildData data) { + dataList.add(data); + zkDiscoveryService.onCacheEvent(NODE_CREATED, null, data); } - private void stopNode(ChildData data) throws Exception { - cache.getCurrentData().remove(data); - zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_REMOVED, data)); + private void stopNode(ChildData data) { + dataList.remove(data); + zkDiscoveryService.onCacheEvent(NODE_DELETED, data, null); } }