Browse Source

Merge pull request #15313 from dashevchenko/publicUserWsSessionsLimitFix

Fixed WS sessions limit handling for public users
pull/15334/head
Viacheslav Klimov 2 months ago
committed by GitHub
parent
commit
c7cae283fa
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 6
      application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
  2. 14
      application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java
  3. 78
      application/src/test/java/org/thingsboard/server/controller/plugin/TbWebSocketHandlerTest.java
  4. 277
      application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java

6
application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java

@ -115,7 +115,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
private final ConcurrentMap<TenantId, Set<String>> tenantSessionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<CustomerId, Set<String>> customerSessionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UserId, Set<String>> regularUserSessionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UserId, Set<String>> publicUserSessionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, Set<String>> publicUserSessionsMap = new ConcurrentHashMap<>();
private Cache<String, SessionMetaData> pendingSessions;
@ -611,7 +611,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
}
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
&& UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
limitAllowed = publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser();
if (limitAllowed) {
@ -655,7 +655,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
}
}
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
publicUserSessions.remove(sessionId);
}

14
application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java

@ -144,7 +144,7 @@ public class DefaultWebSocketService implements WebSocketService {
private final ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Map<Integer, Integer>> sessionCmdMap = new ConcurrentHashMap<>();
private ExecutorService executor;
@ -315,7 +315,7 @@ public class DefaultWebSocketService implements WebSocketService {
}
}
private void processSessionClose(WebSocketSessionRef sessionRef) {
void processSessionClose(WebSocketSessionRef sessionRef) {
var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef);
if (tenantProfileConfiguration != null) {
String sessionId = "[" + sessionRef.getSessionId() + "]";
@ -340,7 +340,7 @@ public class DefaultWebSocketService implements WebSocketService {
}
}
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
publicUserSessions.removeIf(subId -> subId.startsWith(sessionId));
}
@ -349,7 +349,7 @@ public class DefaultWebSocketService implements WebSocketService {
}
}
private boolean processSubscription(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
boolean processSubscription(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef);
if (tenantProfileConfiguration == null) return true;
@ -401,9 +401,11 @@ public class DefaultWebSocketService implements WebSocketService {
}
}
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) {
if (cmd.isUnsubscribe()) {
publicUserSessions.remove(subId);
} else if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) {
publicUserSessions.add(subId);
} else {
log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached"

78
application/src/test/java/org/thingsboard/server/controller/plugin/TbWebSocketHandlerTest.java

@ -25,16 +25,28 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.NativeWebSocketSession;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@ -184,4 +196,70 @@ class TbWebSocketHandlerTest {
assertThat(msgs).map(Integer::parseInt).doesNotHaveDuplicates().hasSize(100);
}
// Regression test for the bug where publicUserSessionsMap was keyed by UserId(NULL_UUID),
// making maxWsSessionsPerPublicUser a global limit shared across all tenants.
// The limit is now scoped per-tenant.
@Test
void checkLimits_publicUserSessions_limitIsPerTenantNotGlobal() throws Exception {
TbTenantProfileCache tenantProfileCache = mock(TbTenantProfileCache.class);
ReflectionTestUtils.setField(wsHandler, "tenantProfileCache", tenantProfileCache);
int maxPublicSessions = 2;
TenantId tenant1 = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile1 = new TenantProfile();
profile1.createDefaultTenantProfileData();
profile1.getDefaultProfileConfiguration().setMaxWsSessionsPerPublicUser(maxPublicSessions);
willReturn(profile1).given(tenantProfileCache).get(tenant1);
TenantId tenant2 = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile2 = new TenantProfile();
profile2.createDefaultTenantProfileData();
profile2.getDefaultProfileConfiguration().setMaxWsSessionsPerPublicUser(maxPublicSessions);
willReturn(profile2).given(tenantProfileCache).get(tenant2);
Method checkLimits = TbWebSocketHandler.class.getDeclaredMethod(
"checkLimits", WebSocketSession.class, WebSocketSessionRef.class);
checkLimits.setAccessible(true);
// tenant1 fills up its limit
for (int i = 0; i < maxPublicSessions; i++) {
assertThat((boolean) checkLimits.invoke(wsHandler, mockWsSession("t1-" + i), mockPublicSessionRef(tenant1))).isTrue();
}
// tenant2 must get its own independent quota — this was the bug: with NULL_UUID as key
// all tenants shared one global counter, so tenant2 would be blocked here
for (int i = 0; i < maxPublicSessions; i++) {
assertThat((boolean) checkLimits.invoke(wsHandler, mockWsSession("t2-" + i), mockPublicSessionRef(tenant2)))
.as("tenant2 session %d should not be affected by tenant1's sessions", i + 1)
.isTrue();
}
// tenant1's (maxPublicSessions + 1)-th session must be rejected
NativeWebSocketSession overLimit = mockWsSession("t1-over");
assertThat((boolean) checkLimits.invoke(wsHandler, overLimit, mockPublicSessionRef(tenant1))).isFalse();
verify(overLimit).close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
}
private NativeWebSocketSession mockWsSession(String id) {
NativeWebSocketSession s = mock(NativeWebSocketSession.class);
willReturn(id).given(s).getId();
return s;
}
private WebSocketSessionRef mockPublicSessionRef(TenantId tenantId) {
CustomerId customerId = new CustomerId(UUID.randomUUID());
SecurityUser securityUser = mock(SecurityUser.class);
willReturn(tenantId).given(securityUser).getTenantId();
willReturn(customerId).given(securityUser).getCustomerId();
willReturn(new UserId(EntityId.NULL_UUID)).given(securityUser).getId();
willReturn(true).given(securityUser).isCustomerUser();
willReturn(new UserPrincipal(UserPrincipal.Type.PUBLIC_ID, customerId.toString())).given(securityUser).getUserPrincipal();
WebSocketSessionRef ref = mock(WebSocketSessionRef.class);
willReturn(securityUser).given(ref).getSecurityCtx();
willReturn(UUID.randomUUID().toString()).given(ref).getSessionId();
return ref;
}
}

277
application/src/test/java/org/thingsboard/server/service/ws/DefaultWebSocketServiceTest.java

@ -0,0 +1,277 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ws;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
class DefaultWebSocketServiceTest {
DefaultWebSocketService service;
TbTenantProfileCache tenantProfileCache;
WebSocketMsgEndpoint msgEndpoint;
@BeforeEach
void setUp() {
tenantProfileCache = mock(TbTenantProfileCache.class);
msgEndpoint = mock(WebSocketMsgEndpoint.class);
service = new DefaultWebSocketService(
mock(TbLocalSubscriptionService.class),
mock(TbEntityDataSubscriptionService.class),
mock(NotificationCommandsHandler.class),
msgEndpoint,
mock(AccessValidator.class),
mock(AttributesService.class),
mock(TimeseriesService.class),
mock(TbServiceInfoProvider.class),
tenantProfileCache
);
}
// Regression test: publicUserSubscriptionsMap must be keyed by TenantId, not UserId(NULL_UUID).
// With the old UserId(NULL_UUID) key, all tenants shared one global subscription counter.
@Test
void processSubscription_publicUserSubscriptionsMap_isPerTenantNotGlobal() throws Exception {
int maxPublicSubscriptions = 2;
TenantId tenant1 = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile1 = new TenantProfile();
profile1.createDefaultTenantProfileData();
profile1.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile1).given(tenantProfileCache).get(tenant1);
TenantId tenant2 = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile2 = new TenantProfile();
profile2.createDefaultTenantProfileData();
profile2.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile2).given(tenantProfileCache).get(tenant2);
// tenant1 fills up its quota
for (int i = 0; i < maxPublicSubscriptions; i++) {
assertThat(service.processSubscription(mockPublicSessionRef(tenant1, "t1-session-" + i), subscriptionCmd(i)))
.as("tenant1 subscription %d should be accepted", i + 1)
.isTrue();
}
// tenant2 must have its own independent quota — this was the bug:
// with UserId(NULL_UUID) as key all tenants shared one counter, so tenant2 would be blocked here
for (int i = 0; i < maxPublicSubscriptions; i++) {
assertThat(service.processSubscription(mockPublicSessionRef(tenant2, "t2-session-" + i), subscriptionCmd(i)))
.as("tenant2 subscription %d should not be affected by tenant1's subscriptions", i + 1)
.isTrue();
}
// tenant1's (maxPublicSubscriptions + 1)-th subscription must be rejected
assertThat(service.processSubscription(mockPublicSessionRef(tenant1, "t1-session-over"), subscriptionCmd(99)))
.as("tenant1 should be rejected after exceeding its limit")
.isFalse();
// Verify that publicUserSubscriptionsMap has separate entries per tenant
@SuppressWarnings("unchecked")
ConcurrentMap<TenantId, Set<String>> publicUserSubscriptionsMap =
(ConcurrentMap<TenantId, Set<String>>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap");
assertThat(publicUserSubscriptionsMap).as("map should contain tenant1").containsKey(tenant1);
assertThat(publicUserSubscriptionsMap).as("map should contain tenant2").containsKey(tenant2);
assertThat(publicUserSubscriptionsMap).as("map must not have a single NULL_UUID entry for all tenants")
.doesNotContainKey(new TenantId(EntityId.NULL_UUID));
assertThat(publicUserSubscriptionsMap.get(tenant1))
.as("tenant1 should have exactly %d subscriptions", maxPublicSubscriptions)
.hasSize(maxPublicSubscriptions);
assertThat(publicUserSubscriptionsMap.get(tenant2))
.as("tenant2 should have exactly %d subscriptions", maxPublicSubscriptions)
.hasSize(maxPublicSubscriptions);
}
@Test
void processSubscription_publicUserSubscriptionsMap_subscriptionIdFormat() {
int maxPublicSubscriptions = 5;
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile = new TenantProfile();
profile.createDefaultTenantProfileData();
profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile).given(tenantProfileCache).get(tenantId);
String sessionId = "my-session-id";
int cmdId = 42;
WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, sessionId);
service.processSubscription(sessionRef, subscriptionCmd(cmdId));
@SuppressWarnings("unchecked")
ConcurrentMap<TenantId, Set<String>> publicUserSubscriptionsMap =
(ConcurrentMap<TenantId, Set<String>>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap");
Set<String> subs = publicUserSubscriptionsMap.get(tenantId);
assertThat(subs).hasSize(1);
assertThat(subs.iterator().next()).isEqualTo("[" + sessionId + "]:[" + cmdId + "]");
}
@Test
void processSubscription_unsubscribe_removesEntryFromPublicUserSubscriptionsMap() {
int maxPublicSubscriptions = 5;
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile = new TenantProfile();
profile.createDefaultTenantProfileData();
profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile).given(tenantProfileCache).get(tenantId);
String sessionId = "session-1";
int cmdId = 1;
WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, sessionId);
service.processSubscription(sessionRef, subscriptionCmd(cmdId));
@SuppressWarnings("unchecked")
ConcurrentMap<TenantId, Set<String>> publicUserSubscriptionsMap =
(ConcurrentMap<TenantId, Set<String>>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap");
assertThat(publicUserSubscriptionsMap.get(tenantId)).hasSize(1);
AttributesSubscriptionCmd unsubCmd = subscriptionCmd(cmdId);
unsubCmd.setUnsubscribe(true);
service.processSubscription(sessionRef, unsubCmd);
assertThat(publicUserSubscriptionsMap.get(tenantId)).isEmpty();
}
@Test
void processSubscription_unsubscribe_freesSlotForNewSubscription() {
int maxPublicSubscriptions = 1;
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile = new TenantProfile();
profile.createDefaultTenantProfileData();
profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile).given(tenantProfileCache).get(tenantId);
WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, "session-1");
service.processSubscription(sessionRef, subscriptionCmd(1));
// slot is full — second subscription on same session should be rejected
assertThat(service.processSubscription(sessionRef, subscriptionCmd(2))).isFalse();
// unsubscribe cmd 1 to free the slot
AttributesSubscriptionCmd unsubCmd = subscriptionCmd(1);
unsubCmd.setUnsubscribe(true);
service.processSubscription(sessionRef, unsubCmd);
// now a new subscription should succeed
assertThat(service.processSubscription(sessionRef, subscriptionCmd(3)))
.as("new subscription should succeed after unsubscribe freed the slot")
.isTrue();
}
@Test
void processSessionClose_removesAllSessionSubscriptionsFromPublicUserSubscriptionsMap() {
int maxPublicSubscriptions = 10;
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile = new TenantProfile();
profile.createDefaultTenantProfileData();
profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile).given(tenantProfileCache).get(tenantId);
String sessionId = "closing-session";
WebSocketSessionRef sessionRef = mockPublicSessionRef(tenantId, sessionId);
service.processSubscription(sessionRef, subscriptionCmd(1));
service.processSubscription(sessionRef, subscriptionCmd(2));
service.processSubscription(sessionRef, subscriptionCmd(3));
@SuppressWarnings("unchecked")
ConcurrentMap<TenantId, Set<String>> publicUserSubscriptionsMap =
(ConcurrentMap<TenantId, Set<String>>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap");
assertThat(publicUserSubscriptionsMap.get(tenantId)).hasSize(3);
service.processSessionClose(sessionRef);
assertThat(publicUserSubscriptionsMap.get(tenantId)).isEmpty();
}
@Test
void processSessionClose_onlyRemovesClosedSessionSubscriptions() {
int maxPublicSubscriptions = 10;
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TenantProfile profile = new TenantProfile();
profile.createDefaultTenantProfileData();
profile.getDefaultProfileConfiguration().setMaxWsSubscriptionsPerPublicUser(maxPublicSubscriptions);
willReturn(profile).given(tenantProfileCache).get(tenantId);
WebSocketSessionRef session1 = mockPublicSessionRef(tenantId, "session-1");
WebSocketSessionRef session2 = mockPublicSessionRef(tenantId, "session-2");
service.processSubscription(session1, subscriptionCmd(1));
service.processSubscription(session1, subscriptionCmd(2));
service.processSubscription(session2, subscriptionCmd(1));
@SuppressWarnings("unchecked")
ConcurrentMap<TenantId, Set<String>> publicUserSubscriptionsMap =
(ConcurrentMap<TenantId, Set<String>>) ReflectionTestUtils.getField(service, "publicUserSubscriptionsMap");
assertThat(publicUserSubscriptionsMap.get(tenantId)).hasSize(3);
service.processSessionClose(session1);
Set<String> remaining = publicUserSubscriptionsMap.get(tenantId);
assertThat(remaining).hasSize(1);
assertThat(remaining).allMatch(subId -> subId.startsWith("[session-2]"));
}
private WebSocketSessionRef mockPublicSessionRef(TenantId tenantId, String sessionId) {
CustomerId customerId = new CustomerId(UUID.randomUUID());
SecurityUser securityUser = mock(SecurityUser.class);
willReturn(tenantId).given(securityUser).getTenantId();
willReturn(customerId).given(securityUser).getCustomerId();
willReturn(new UserId(EntityId.NULL_UUID)).given(securityUser).getId();
willReturn(true).given(securityUser).isCustomerUser();
willReturn(new UserPrincipal(UserPrincipal.Type.PUBLIC_ID, customerId.toString())).given(securityUser).getUserPrincipal();
WebSocketSessionRef ref = mock(WebSocketSessionRef.class);
willReturn(securityUser).given(ref).getSecurityCtx();
willReturn(sessionId).given(ref).getSessionId();
return ref;
}
private AttributesSubscriptionCmd subscriptionCmd(int cmdId) {
AttributesSubscriptionCmd cmd = new AttributesSubscriptionCmd();
cmd.setCmdId(cmdId);
return cmd;
}
}
Loading…
Cancel
Save