From dc7b86fa1da0b0be125be22f0698bce41ea35d4b Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Wed, 18 Mar 2026 11:00:35 +0200 Subject: [PATCH] Address PR review comments for edge session refactoring - Deduplicate forEach iteration over both session maps (HashSet) - Use null-safe getEdgeId() instead of getEdge().getId() in remove() - Add defensive null checks on getState() in onConfigurationUpdate and destroyAndMarkAsZombieIfFailed - Add null check on pendingMsgsMap.get() in onDownlinkResponse - Move scheduleAtFixedRate to @PostConstruct in DefaultZombieSessionCleanupService - Fix misleading "kafka sessions" log message to "zombie sessions" - Chain IOException in GrpcServer RuntimeException - Guard against race condition on removeByEdgeId in onEdgeDisconnect - Rename shadowed parameter in shutdownExecutorSafely --- .../server/service/edge/rpc/GrpcServer.java | 2 +- .../edge/rpc/service/EdgeGrpcService.java | 12 +++++---- .../DefaultZombieSessionCleanupService.java | 7 +++++- .../edge/rpc/session/EdgeGrpcSession.java | 2 +- .../edge/rpc/session/EdgeSessionsHolder.java | 10 +++++--- .../AbstractEdgeGrpcSessionManager.java | 25 +++++++++++++------ 6 files changed, 39 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/GrpcServer.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/GrpcServer.java index c5c2e40f5b..0b19f10aa4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/GrpcServer.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/GrpcServer.java @@ -85,7 +85,7 @@ public class GrpcServer { server.start(); } catch (IOException e) { log.error("Failed to start Edge RPC server!", e); - throw new RuntimeException("Failed to start Edge RPC server!"); + throw new RuntimeException("Failed to start Edge RPC server!", e); } log.info("Edge RPC server initialized!"); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/service/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/service/EdgeGrpcService.java index a41ff78110..b50d643634 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/service/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/service/EdgeGrpcService.java @@ -221,8 +221,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i EdgeGrpcSessionManager current = sessions.getByEdgeId(edgeId); if (current != null && current.getState().getSessionId().equals(sessionId)) { EdgeGrpcSessionManager toRemove = sessions.removeByEdgeId(edgeId); - toRemove.onEdgeDisconnect(); - toRemove.destroyAndMarkAsZombieIfFailed(); + if (toRemove != null) { + toRemove.onEdgeDisconnect(); + toRemove.destroyAndMarkAsZombieIfFailed(); + } sessions.removeBySessionId(sessionId); save(tenantId, edgeId, ACTIVITY_STATE, false); long lastDisconnectTs = System.currentTimeMillis(); @@ -382,9 +384,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void shutdownExecutorSafely(ExecutorService sendDownlinkExecutorService) { - if (sendDownlinkExecutorService != null && !sendDownlinkExecutorService.isShutdown()) { - sendDownlinkExecutorService.shutdown(); + private void shutdownExecutorSafely(ExecutorService e) { + if (e != null && !e.isShutdown()) { + e.shutdown(); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/DefaultZombieSessionCleanupService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/DefaultZombieSessionCleanupService.java index 76d554eab9..780dcbe196 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/DefaultZombieSessionCleanupService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/DefaultZombieSessionCleanupService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.edge.rpc.session; +import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -50,6 +51,10 @@ public class DefaultZombieSessionCleanupService implements ZombieSessionCleanupS public DefaultZombieSessionCleanupService() { this.zombieSessions = new ConcurrentLinkedQueue<>(); this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions"); + } + + @PostConstruct + public void init() { this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS); } @@ -86,7 +91,7 @@ public class DefaultZombieSessionCleanupService implements ZombieSessionCleanupS } }); } catch (Exception e) { - log.warn("Failed to cleanup kafka sessions", e); + log.warn("Failed to cleanup zombie sessions", e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeGrpcSession.java index dc2296f1d5..73a3b53e4c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeGrpcSession.java @@ -334,7 +334,7 @@ public class EdgeGrpcSession implements EdgeSession { log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", getTenantId(), getEdgeId(), getSessionId(), msg.getDownlinkMsgId(), msg.getErrorMsg()); DownlinkMsg downlinkMsg = state.getPendingMsgsMap().get(msg.getDownlinkMsgId()); // if NOT timeseries or attributes failures - ack failed downlink - if (downlinkMsg.getEntityDataCount() == 0) { + if (downlinkMsg != null && downlinkMsg.getEntityDataCount() == 0) { state.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeSessionsHolder.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeSessionsHolder.java index 7cc27214a7..2d7c3dd8e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeSessionsHolder.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeSessionsHolder.java @@ -24,6 +24,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.EdgeSessionState; import org.thingsboard.server.service.edge.rpc.session.manager.EdgeGrpcSessionManager; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -40,8 +42,10 @@ public class EdgeSessionsHolder { private final ConcurrentMap sessionsById = new ConcurrentHashMap<>(); public void forEach(Consumer consumer) { - sessions.values().forEach(consumer); - sessionsById.values().forEach(consumer); + Set unique = new HashSet<>(sessions.values()); + unique.addAll(sessionsById.values()); + + unique.forEach(consumer); } public void put(EdgeGrpcSessionManager session) { @@ -73,7 +77,7 @@ public class EdgeSessionsHolder { return; } EdgeSessionState sessionState = session.getState(); - removeByEdgeId(sessionState.getEdge().getId()); // todo: react to warnings + removeByEdgeId(sessionState.getEdgeId()); removeBySessionId(sessionState.getSessionId()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/manager/AbstractEdgeGrpcSessionManager.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/manager/AbstractEdgeGrpcSessionManager.java index 1634d2fab4..0df1af1ada 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/manager/AbstractEdgeGrpcSessionManager.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/session/manager/AbstractEdgeGrpcSessionManager.java @@ -101,11 +101,12 @@ public abstract class AbstractEdgeGrpcSessionManager extends EdgeGrpcSessionDele @Override public void onConfigurationUpdate(Edge edge) { EdgeSessionState state = getState(); - CustomerId stateCustomerId = null; - if (state != null) { - stateCustomerId = state.getEdge().getCustomerId(); - state.setEdge(edge); + if (state == null) { + log.warn("Session state is null, skipping configuration update for edge [{}]", edge.getId()); + return; } + CustomerId stateCustomerId = state.getEdge().getCustomerId(); + state.setEdge(edge); if (stateCustomerId != null && !stateCustomerId.equals(edge.getCustomerId())) { // do not send edge configuration message on customer update // message send by separate flow from assign_to or unassing_from customer @@ -128,13 +129,21 @@ public abstract class AbstractEdgeGrpcSessionManager extends EdgeGrpcSessionDele try (finalSession) { if (!destroy()) { - log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", - state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId()); + if (state != null) { + log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", + state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId()); + } else { + log.warn("Session destroy failed (state is null). Adding to zombie queue for later cleanup."); + } zombieSessionCleanupService.add(this); } } catch (Exception e) { - log.warn("[{}][{}] Exception during session destroy for edge [{}] with session id [{}]", - state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId(), e); + if (state != null) { + log.warn("[{}][{}] Exception during session destroy for edge [{}] with session id [{}]", + state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId(), e); + } else { + log.warn("Exception during session destroy (state is null)", e); + } } } }