From cffd2e96cc79185a70a14a4ac0740f3a0411933a Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Thu, 26 Mar 2026 16:27:50 +0200 Subject: [PATCH 1/3] fixed WS limit handling for "Sessions per public user maximum number" --- .../controller/plugin/TbWebSocketHandler.java | 6 +- .../service/ws/DefaultWebSocketService.java | 6 +- .../plugin/TbWebSocketHandlerTest.java | 78 +++++++++++++++++++ 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 73315be73f..133584201c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -115,7 +115,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke private final ConcurrentMap> tenantSessionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> customerSessionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> regularUserSessionsMap = new ConcurrentHashMap<>(); - private final ConcurrentMap> publicUserSessionsMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> publicUserSessionsMap = new ConcurrentHashMap<>(); private Cache pendingSessions; @@ -611,7 +611,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { - Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); + Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { limitAllowed = publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser(); if (limitAllowed) { @@ -655,7 +655,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { - Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); + Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { publicUserSessions.remove(sessionId); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 4b1a81dd2d..09651a9264 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -144,7 +144,7 @@ public class DefaultWebSocketService implements WebSocketService { private final ConcurrentMap> tenantSubscriptionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> customerSubscriptionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> regularUserSubscriptionsMap = new ConcurrentHashMap<>(); - private final ConcurrentMap> publicUserSubscriptionsMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> publicUserSubscriptionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> sessionCmdMap = new ConcurrentHashMap<>(); private ExecutorService executor; @@ -340,7 +340,7 @@ public class DefaultWebSocketService implements WebSocketService { } } if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { - Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); + Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { publicUserSessions.removeIf(subId -> subId.startsWith(sessionId)); } @@ -401,7 +401,7 @@ public class DefaultWebSocketService implements WebSocketService { } } if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { - Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); + Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) { publicUserSessions.add(subId); diff --git a/application/src/test/java/org/thingsboard/server/controller/plugin/TbWebSocketHandlerTest.java b/application/src/test/java/org/thingsboard/server/controller/plugin/TbWebSocketHandlerTest.java index 053cb6808f..4dc637725f 100644 --- a/application/src/test/java/org/thingsboard/server/controller/plugin/TbWebSocketHandlerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/plugin/TbWebSocketHandlerTest.java @@ -25,16 +25,28 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.adapter.NativeWebSocketSession; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.model.UserPrincipal; import org.thingsboard.server.service.ws.WebSocketSessionRef; import java.io.IOException; +import java.lang.reflect.Method; import java.util.Collection; import java.util.Deque; import java.util.List; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; @@ -184,4 +196,70 @@ class TbWebSocketHandlerTest { assertThat(msgs).map(Integer::parseInt).doesNotHaveDuplicates().hasSize(100); } + // Regression test for the bug where publicUserSessionsMap was keyed by UserId(NULL_UUID), + // making maxWsSessionsPerPublicUser a global limit shared across all tenants. + // The limit is now scoped per-tenant. + @Test + void checkLimits_publicUserSessions_limitIsPerTenantNotGlobal() throws Exception { + TbTenantProfileCache tenantProfileCache = mock(TbTenantProfileCache.class); + ReflectionTestUtils.setField(wsHandler, "tenantProfileCache", tenantProfileCache); + + int maxPublicSessions = 2; + + TenantId tenant1 = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile1 = new TenantProfile(); + profile1.createDefaultTenantProfileData(); + profile1.getDefaultProfileConfiguration().setMaxWsSessionsPerPublicUser(maxPublicSessions); + willReturn(profile1).given(tenantProfileCache).get(tenant1); + + TenantId tenant2 = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile2 = new TenantProfile(); + profile2.createDefaultTenantProfileData(); + profile2.getDefaultProfileConfiguration().setMaxWsSessionsPerPublicUser(maxPublicSessions); + willReturn(profile2).given(tenantProfileCache).get(tenant2); + + Method checkLimits = TbWebSocketHandler.class.getDeclaredMethod( + "checkLimits", WebSocketSession.class, WebSocketSessionRef.class); + checkLimits.setAccessible(true); + + // tenant1 fills up its limit + for (int i = 0; i < maxPublicSessions; i++) { + assertThat((boolean) checkLimits.invoke(wsHandler, mockWsSession("t1-" + i), mockPublicSessionRef(tenant1))).isTrue(); + } + + // tenant2 must get its own independent quota — this was the bug: with NULL_UUID as key + // all tenants shared one global counter, so tenant2 would be blocked here + for (int i = 0; i < maxPublicSessions; i++) { + assertThat((boolean) checkLimits.invoke(wsHandler, mockWsSession("t2-" + i), mockPublicSessionRef(tenant2))) + .as("tenant2 session %d should not be affected by tenant1's sessions", i + 1) + .isTrue(); + } + + // tenant1's (maxPublicSessions + 1)-th session must be rejected + NativeWebSocketSession overLimit = mockWsSession("t1-over"); + assertThat((boolean) checkLimits.invoke(wsHandler, overLimit, mockPublicSessionRef(tenant1))).isFalse(); + verify(overLimit).close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached")); + } + + private NativeWebSocketSession mockWsSession(String id) { + NativeWebSocketSession s = mock(NativeWebSocketSession.class); + willReturn(id).given(s).getId(); + return s; + } + + private WebSocketSessionRef mockPublicSessionRef(TenantId tenantId) { + CustomerId customerId = new CustomerId(UUID.randomUUID()); + SecurityUser securityUser = mock(SecurityUser.class); + willReturn(tenantId).given(securityUser).getTenantId(); + willReturn(customerId).given(securityUser).getCustomerId(); + willReturn(new UserId(EntityId.NULL_UUID)).given(securityUser).getId(); + willReturn(true).given(securityUser).isCustomerUser(); + willReturn(new UserPrincipal(UserPrincipal.Type.PUBLIC_ID, customerId.toString())).given(securityUser).getUserPrincipal(); + + WebSocketSessionRef ref = mock(WebSocketSessionRef.class); + willReturn(securityUser).given(ref).getSecurityCtx(); + willReturn(UUID.randomUUID().toString()).given(ref).getSessionId(); + return ref; + } + } From a75c008f50813e8d15628571403782d2d22cff84 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 30 Mar 2026 11:05:17 +0300 Subject: [PATCH 2/3] added tests for DefaultWebSocketService.processSubscription --- .../service/ws/DefaultWebSocketService.java | 2 +- .../ws/DefaultWebSocketServiceTest.java | 170 ++++++++++++++++++ 2 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 09651a9264..a4d7fe81cf 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -349,7 +349,7 @@ public class DefaultWebSocketService implements WebSocketService { } } - private boolean processSubscription(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) { + boolean processSubscription(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) { var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); if (tenantProfileConfiguration == null) return true; diff --git a/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java new file mode 100644 index 0000000000..69f918ece7 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java @@ -0,0 +1,170 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ws; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.model.UserPrincipal; +import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService; +import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; +import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; +import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; + +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +class DefaultWebSocketServiceTest { + + DefaultWebSocketService service; + TbTenantProfileCache tenantProfileCache; + WebSocketMsgEndpoint msgEndpoint; + + @BeforeEach + void setUp() { + tenantProfileCache = mock(TbTenantProfileCache.class); + msgEndpoint = mock(WebSocketMsgEndpoint.class); + + service = new DefaultWebSocketService( + mock(TbLocalSubscriptionService.class), + mock(TbEntityDataSubscriptionService.class), + mock(NotificationCommandsHandler.class), + msgEndpoint, + mock(AccessValidator.class), + mock(AttributesService.class), + mock(TimeseriesService.class), + mock(TbServiceInfoProvider.class), + tenantProfileCache + ); + } + + // Regression test: publicUserSubscriptionsMap must be keyed by TenantId, not UserId(NULL_UUID). + // With the old UserId(NULL_UUID) key, all tenants shared one global subscription counter. + @Test + void processSubscription_publicUserSubscriptionsMap_isPerTenantNotGlobal() throws Exception { + int maxPublicSubscriptions = 2; + + TenantId tenant1 = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile1 = new TenantProfile(); + profile1.createDefaultTenantProfileData(); + profile1.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile1).given(tenantProfileCache).get(tenant1); + + TenantId tenant2 = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile2 = new TenantProfile(); + profile2.createDefaultTenantProfileData(); + profile2.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile2).given(tenantProfileCache).get(tenant2); + + // tenant1 fills up its quota + for (int i = 0; i < maxPublicSubscriptions; i++) { + assertThat(service.processSubscription(mockPublicSessionRef(tenant1, "t1-session-" + i), subscriptionCmd(i))) + .as("tenant1 subscription %d should be accepted", i + 1) + .isTrue(); + } + + // tenant2 must have its own independent quota — this was the bug: + // with UserId(NULL_UUID) as key all tenants shared one counter, so tenant2 would be blocked here + for (int i = 0; i < maxPublicSubscriptions; i++) { + assertThat(service.processSubscription(mockPublicSessionRef(tenant2, "t2-session-" + i), subscriptionCmd(i))) + .as("tenant2 subscription %d should not be affected by tenant1's subscriptions", i + 1) + .isTrue(); + } + + // tenant1's (maxPublicSubscriptions + 1)-th subscription must be rejected + assertThat(service.processSubscription(mockPublicSessionRef(tenant1, "t1-session-over"), subscriptionCmd(99))) + .as("tenant1 should be rejected after exceeding its limit") + .isFalse(); + + // Verify that publicUserSubscriptionsMap has separate entries per tenant + @SuppressWarnings("unchecked") + ConcurrentMap> publicUserSubscriptionsMap = + (ConcurrentMap>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap"); + + assertThat(publicUserSubscriptionsMap).as("map should contain tenant1").containsKey(tenant1); + assertThat(publicUserSubscriptionsMap).as("map should contain tenant2").containsKey(tenant2); + assertThat(publicUserSubscriptionsMap).as("map must not have a single NULL_UUID entry for all tenants") + .doesNotContainKey(new TenantId(EntityId.NULL_UUID)); + + assertThat(publicUserSubscriptionsMap.get(tenant1)) + .as("tenant1 should have exactly %d subscriptions", maxPublicSubscriptions) + .hasSize(maxPublicSubscriptions); + assertThat(publicUserSubscriptionsMap.get(tenant2)) + .as("tenant2 should have exactly %d subscriptions", maxPublicSubscriptions) + .hasSize(maxPublicSubscriptions); + } + + @Test + void processSubscription_publicUserSubscriptionsMap_subscriptionIdFormat() { + int maxPublicSubscriptions = 5; + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile = new TenantProfile(); + profile.createDefaultTenantProfileData(); + profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile).given(tenantProfileCache).get(tenantId); + + String sessionId = "my-session-id"; + int cmdId = 42; + WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, sessionId); + service.processSubscription(sessionRef, subscriptionCmd(cmdId)); + + @SuppressWarnings("unchecked") + ConcurrentMap> publicUserSubscriptionsMap = + (ConcurrentMap>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap"); + + Set subs = publicUserSubscriptionsMap.get(tenantId); + assertThat(subs).hasSize(1); + assertThat(subs.iterator().next()).isEqualTo("[" + sessionId + "]:[" + cmdId + "]"); + } + + private WebSocketSessionRef mockPublicSessionRef(TenantId tenantId, String sessionId) { + CustomerId customerId = new CustomerId(UUID.randomUUID()); + SecurityUser securityUser = mock(SecurityUser.class); + willReturn(tenantId).given(securityUser).getTenantId(); + willReturn(customerId).given(securityUser).getCustomerId(); + willReturn(new UserId(EntityId.NULL_UUID)).given(securityUser).getId(); + willReturn(true).given(securityUser).isCustomerUser(); + willReturn(new UserPrincipal(UserPrincipal.Type.PUBLIC_ID, customerId.toString())).given(securityUser).getUserPrincipal(); + + WebSocketSessionRef ref = mock(WebSocketSessionRef.class); + willReturn(securityUser).given(ref).getSecurityCtx(); + willReturn(sessionId).given(ref).getSessionId(); + return ref; + } + + private AttributesSubscriptionCmd subscriptionCmd(int cmdId) { + AttributesSubscriptionCmd cmd = new AttributesSubscriptionCmd(); + cmd.setCmdId(cmdId); + return cmd; + } + +} From bf307a41bd94a7da1f6731ec669774c2c45af758 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 30 Mar 2026 11:17:02 +0300 Subject: [PATCH 3/3] added tests for DefaultWebSocketService --- .../service/ws/DefaultWebSocketService.java | 6 +- .../ws/DefaultWebSocketServiceTest.java | 107 ++++++++++++++++++ 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index a4d7fe81cf..27a8ca275c 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -315,7 +315,7 @@ public class DefaultWebSocketService implements WebSocketService { } } - private void processSessionClose(WebSocketSessionRef sessionRef) { + void processSessionClose(WebSocketSessionRef sessionRef) { var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); if (tenantProfileConfiguration != null) { String sessionId = "[" + sessionRef.getSessionId() + "]"; @@ -403,7 +403,9 @@ public class DefaultWebSocketService implements WebSocketService { if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { - if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) { + if (cmd.isUnsubscribe()) { + publicUserSessions.remove(subId); + } else if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) { publicUserSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached" diff --git a/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java index 69f918ece7..a533e0369f 100644 --- a/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java @@ -146,6 +146,113 @@ class DefaultWebSocketServiceTest { assertThat(subs.iterator().next()).isEqualTo("[" + sessionId + "]:[" + cmdId + "]"); } + @Test + void processSubscription_unsubscribe_removesEntryFromPublicUserSubscriptionsMap() { + int maxPublicSubscriptions = 5; + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile = new TenantProfile(); + profile.createDefaultTenantProfileData(); + profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile).given(tenantProfileCache).get(tenantId); + + String sessionId = "session-1"; + int cmdId = 1; + WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, sessionId); + + service.processSubscription(sessionRef, subscriptionCmd(cmdId)); + + @SuppressWarnings("unchecked") + ConcurrentMap> publicUserSubscriptionsMap = + (ConcurrentMap>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap"); + assertThat(publicUserSubscriptionsMap.get(tenantId)).hasSize(1); + + AttributesSubscriptionCmd unsubCmd = subscriptionCmd(cmdId); + unsubCmd.setUnsubscribe(true); + service.processSubscription(sessionRef, unsubCmd); + + assertThat(publicUserSubscriptionsMap.get(tenantId)).isEmpty(); + } + + @Test + void processSubscription_unsubscribe_freesSlotForNewSubscription() { + int maxPublicSubscriptions = 1; + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile = new TenantProfile(); + profile.createDefaultTenantProfileData(); + profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile).given(tenantProfileCache).get(tenantId); + + WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, "session-1"); + service.processSubscription(sessionRef, subscriptionCmd(1)); + + // slot is full — second subscription on same session should be rejected + assertThat(service.processSubscription(sessionRef, subscriptionCmd(2))).isFalse(); + + // unsubscribe cmd 1 to free the slot + AttributesSubscriptionCmd unsubCmd = subscriptionCmd(1); + unsubCmd.setUnsubscribe(true); + service.processSubscription(sessionRef, unsubCmd); + + // now a new subscription should succeed + assertThat(service.processSubscription(sessionRef, subscriptionCmd(3))) + .as("new subscription should succeed after unsubscribe freed the slot") + .isTrue(); + } + + @Test + void processSessionClose_removesAllSessionSubscriptionsFromPublicUserSubscriptionsMap() { + int maxPublicSubscriptions = 10; + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile = new TenantProfile(); + profile.createDefaultTenantProfileData(); + profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile).given(tenantProfileCache).get(tenantId); + + String sessionId = "closing-session"; + WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, sessionId); + + service.processSubscription(sessionRef, subscriptionCmd(1)); + service.processSubscription(sessionRef, subscriptionCmd(2)); + service.processSubscription(sessionRef, subscriptionCmd(3)); + + @SuppressWarnings("unchecked") + ConcurrentMap> publicUserSubscriptionsMap = + (ConcurrentMap>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap"); + assertThat(publicUserSubscriptionsMap.get(tenantId)).hasSize(3); + + service.processSessionClose(sessionRef); + + assertThat(publicUserSubscriptionsMap.get(tenantId)).isEmpty(); + } + + @Test + void processSessionClose_onlyRemovesClosedSessionSubscriptions() { + int maxPublicSubscriptions = 10; + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TenantProfile profile = new TenantProfile(); + profile.createDefaultTenantProfileData(); + profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions); + willReturn(profile).given(tenantProfileCache).get(tenantId); + + WebSocketSessionRef session1 = mockPublicSessionRef(tenantId, "session-1"); + WebSocketSessionRef session2 = mockPublicSessionRef(tenantId, "session-2"); + + service.processSubscription(session1, subscriptionCmd(1)); + service.processSubscription(session1, subscriptionCmd(2)); + service.processSubscription(session2, subscriptionCmd(1)); + + @SuppressWarnings("unchecked") + ConcurrentMap> publicUserSubscriptionsMap = + (ConcurrentMap>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap"); + assertThat(publicUserSubscriptionsMap.get(tenantId)).hasSize(3); + + service.processSessionClose(session1); + + Set remaining = publicUserSubscriptionsMap.get(tenantId); + assertThat(remaining).hasSize(1); + assertThat(remaining).allMatch(subId -> subId.startsWith("[session-2]")); + } + private WebSocketSessionRef mockPublicSessionRef(TenantId tenantId, String sessionId) { CustomerId customerId = new CustomerId(UUID.randomUUID()); SecurityUser securityUser = mock(SecurityUser.class);