From eeb72de9395c4cb1a44c19c55e0a6ec863f88cad Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 22 May 2024 15:33:13 +0300 Subject: [PATCH] Fix memory leak with old WS subscriptions --- .../service/ws/DefaultWebSocketService.java | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 2a53e9a772..d7091bbad2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -83,6 +83,7 @@ import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpda import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -143,6 +144,7 @@ public class DefaultWebSocketService implements WebSocketService { private final ConcurrentMap> customerSubscriptionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> regularUserSubscriptionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> publicUserSubscriptionsMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> sessionCmdMap = new ConcurrentHashMap<>(); private ExecutorService executor; private ScheduledExecutorService pingExecutor; @@ -201,9 +203,7 @@ public class DefaultWebSocketService implements WebSocketService { event.getError().orElse(new RuntimeException("No error specified"))); break; case CLOSED: - wsSessionsMap.remove(sessionId); - oldSubService.cancelAllSessionSubscriptions(sessionId); - entityDataSubService.cancelAllSessionSubscriptions(sessionId); + cleanupSessionById(sessionId); processSessionClose(sessionRef); break; } @@ -299,9 +299,7 @@ public class DefaultWebSocketService implements WebSocketService { public void cleanupIfStale(String sessionId) { if (!msgEndpoint.isOpen(sessionId)) { log.info("[{}] Cleaning up stale session ", sessionId); - wsSessionsMap.remove(sessionId); - oldSubService.cancelAllSessionSubscriptions(sessionId); - entityDataSubService.cancelAllSessionSubscriptions(sessionId); + cleanupSessionById(sessionId); } } @@ -451,7 +449,7 @@ public class DefaultWebSocketService implements WebSocketService { TbAttributeSubscription sub = TbAttributeSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) - .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet()) + .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) .queryTs(queryTs) @@ -500,6 +498,13 @@ public class DefaultWebSocketService implements WebSocketService { } } + private int registerNewSessionSubId(String sessionId, WebSocketSessionRef sessionRef, int cmdId) { + var cmdMap = sessionCmdMap.computeIfAbsent(sessionId, id -> new ConcurrentHashMap<>()); + var subId = sessionRef.getSessionSubIdSeq().incrementAndGet(); + cmdMap.put(cmdId, subId); + return subId; + } + private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) { if (!validateCmd(sessionRef, cmd, () -> { if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { @@ -557,7 +562,7 @@ public class DefaultWebSocketService implements WebSocketService { TbAttributeSubscription sub = TbAttributeSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) - .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet()) + .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) .queryTs(queryTs) @@ -655,7 +660,7 @@ public class DefaultWebSocketService implements WebSocketService { TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) - .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet()) + .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) .updateProcessor((subscription, update) -> { @@ -710,7 +715,7 @@ public class DefaultWebSocketService implements WebSocketService { TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionId) - .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet()) + .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) .updateProcessor((subscription, update) -> { @@ -749,12 +754,25 @@ public class DefaultWebSocketService implements WebSocketService { private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { - oldSubService.cancelAllSessionSubscriptions(sessionId); + log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId()); + cleanupSessionById(sessionId); } else { - oldSubService.cancelSubscription(sessionId, cmd.getCmdId()); + Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId()); + if (subId == null) { + log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId()); + subId = cmd.getCmdId(); + } + oldSubService.cancelSubscription(sessionId, subId); } } + private void cleanupSessionById(String sessionId) { + wsSessionsMap.remove(sessionId); + oldSubService.cancelAllSessionSubscriptions(sessionId); + sessionCmdMap.remove(sessionId); + entityDataSubService.cancelAllSessionSubscriptions(sessionId); + } + private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) { return validateCmd(sessionRef, cmd, () -> { if (cmd.getQuery() == null && !cmd.hasAnyCmd()) {