Browse Source

Technical cleanup of deprecated Curator API in ZooKeeper discovery

pull/15663/head
Oleksandra Matviienko 3 weeks ago
parent
commit
e874a91df0
  1. 48
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java
  2. 31
      common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java

48
common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java

@ -26,9 +26,8 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
@ -54,12 +53,12 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
import static org.apache.curator.framework.recipes.cache.CuratorCacheAccessor.parentPathFilter;
@Service
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
@Slf4j
public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
public class ZkDiscoveryService implements DiscoveryService {
@Value("${zk.url}")
private String zkUrl;
@ -84,7 +83,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
private ScheduledExecutorService zkExecutorService;
@Getter
private CuratorFramework client;
private PathChildrenCache cache;
private CuratorCache cache;
private String nodePath;
private String zkNodesDir;
@ -117,7 +116,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@Override
public List<TransportProtos.ServiceInfo> getOtherServers() {
return cache.getCurrentData().stream()
return cache.stream()
.filter(parentPathFilter(zkNodesDir))
.filter(cd -> !cd.getPath().equals(nodePath))
.map(cd -> {
try {
@ -241,7 +241,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout, new RetryForever(zkRetryInterval));
client.start();
client.blockUntilConnected();
cache = new PathChildrenCache(client, zkNodesDir, true);
client.createContainers(zkNodesDir);
cache = CuratorCache.builder(client, zkNodesDir).build();
cache.start();
stopped = false;
log.info("ZK client connected");
@ -254,7 +255,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
private void subscribeToEvents() {
cache.getListenable().addListener(this);
cache.listenable().addListener(this::onCacheEvent);
}
private void unpublishCurrentServer() {
@ -286,29 +287,32 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
return "The " + propertyName + " property need to be set!";
}
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
@SneakyThrows
void onCacheEvent(CuratorCacheListener.Type type, ChildData oldData, ChildData newData) {
if (stopped) {
log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
log.debug("Ignoring {}. Service is stopped.", type);
return;
}
if (client.getState() != CuratorFrameworkState.STARTED) {
log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState());
log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", type, client.getState());
return;
}
ChildData data = pathChildrenCacheEvent.getData();
ChildData data = type == CuratorCacheListener.Type.NODE_DELETED ? oldData : newData;
if (data == null) {
log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
log.debug("Ignoring {} due to empty child data", type);
return;
} else if (data.getData() == null) {
log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
log.debug("Ignoring {} due to empty child's data", type);
return;
} else if (zkNodesDir.equals(data.getPath())) {
log.debug("Ignoring event about parent node {}", data.getPath());
return;
} else if (nodePath != null && nodePath.equals(data.getPath())) {
if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
if (type == CuratorCacheListener.Type.NODE_DELETED) {
log.info("ZK node for current instance is somehow deleted.");
publishCurrentServer();
}
log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
log.debug("Ignoring event about current server {}", data.getPath());
return;
}
TransportProtos.ServiceInfo instance;
@ -322,9 +326,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
String serviceId = instance.getServiceId();
ProtocolStringList serviceTypesList = instance.getServiceTypesList();
log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId);
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
log.trace("Processing [{}] event for [{}]", type, serviceId);
switch (type) {
case NODE_CREATED:
ScheduledFuture<?> task = delayedTasks.remove(serviceId);
if (task != null) {
if (task.cancel(false)) {
@ -341,7 +345,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
recalculatePartitions();
}
break;
case CHILD_REMOVED:
case NODE_DELETED:
zkExecutorService.submit(() -> applicationEventPublisher.publishEvent(new OtherServiceShutdownEvent(this, serviceId, serviceTypesList)));
ScheduledFuture<?> future = zkExecutorService.schedule(() -> {
log.debug("[{}] Going to recalculate partitions due to removed node [{}]",

31
common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java

@ -18,8 +18,8 @@ package org.thingsboard.server.queue.discovery;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -36,8 +36,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_DELETED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -63,12 +63,10 @@ public class ZkDiscoveryServiceTest {
private CuratorFramework client;
@Mock
private PathChildrenCache cache;
@Mock
private CuratorFramework curatorFramework;
private CuratorCache cache;
private ZkDiscoveryService zkDiscoveryService;
private List<ChildData> dataList;
private static final long RECALCULATE_DELAY = 100L;
@ -89,12 +87,13 @@ public class ZkDiscoveryServiceTest {
ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService);
ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY);
ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard");
ReflectionTestUtils.setField(zkDiscoveryService, "zkNodesDir", "/thingsboard/nodes");
when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo);
List<ChildData> dataList = new ArrayList<>();
dataList = new ArrayList<>();
dataList.add(currentData);
when(cache.getCurrentData()).thenReturn(dataList);
when(cache.stream()).thenAnswer(inv -> dataList.stream());
}
@Test
@ -178,14 +177,14 @@ public class ZkDiscoveryServiceTest {
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo, childInfo)));
}
private void startNode(ChildData data) throws Exception {
cache.getCurrentData().add(data);
zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_ADDED, data));
private void startNode(ChildData data) {
dataList.add(data);
zkDiscoveryService.onCacheEvent(NODE_CREATED, null, data);
}
private void stopNode(ChildData data) throws Exception {
cache.getCurrentData().remove(data);
zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_REMOVED, data));
private void stopNode(ChildData data) {
dataList.remove(data);
zkDiscoveryService.onCacheEvent(NODE_DELETED, data, null);
}
}

Loading…
Cancel
Save