From 6c618a7db35ecc1a757f09869fcb16465f7de465 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Sun, 23 Sep 2018 16:22:04 +0300 Subject: [PATCH] Fixed issue with Zookeeper reconnect --- .../cluster/discovery/ZkDiscoveryService.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java index c3ffbabbea..6f51ceed26 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java @@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryForever; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; @@ -127,12 +129,38 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress())); log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath); + client.getConnectionStateListenable().addListener(checkReconnect(self)); } catch (Exception e) { log.error("Failed to create ZK node", e); throw new RuntimeException(e); } } + private ConnectionStateListener checkReconnect(ServerInstance self) { + return (client, newState) -> { + log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState); + if (newState == ConnectionState.LOST) { + reconnect(); + } + }; + } + + private boolean reconnectInProgress = false; + + private synchronized void reconnect() { + if (!reconnectInProgress) { + reconnectInProgress = true; + try { + client.blockUntilConnected(); + publishCurrentServer(); + } catch (InterruptedException e) { + log.error("Failed to reconnect to ZK: {}", e.getMessage(), e); + } finally { + reconnectInProgress = false; + } + } + } + @Override public void unpublishCurrentServer() { try { @@ -156,7 +184,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi .filter(cd -> !cd.getPath().equals(nodePath)) .map(cd -> { try { - return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData())); + return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData())); } catch (NoSuchElementException e) { log.error("Failed to decode ZK node", e); throw new RuntimeException(e); @@ -198,7 +226,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } ServerInstance instance; try { - ServerAddress serverAddress = SerializationUtils.deserialize(data.getData()); + ServerAddress serverAddress = SerializationUtils.deserialize(data.getData()); instance = new ServerInstance(serverAddress); } catch (SerializationException e) { log.error("Failed to decode server instance for node {}", data.getPath(), e);