From c992ebac9d9bb5addcafd0972b774e91c220bd9f Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 28 Nov 2023 18:25:37 +0200 Subject: [PATCH] Use single WebSocket session --- .../service/ws/DefaultWebSocketService.java | 191 ++++++++---------- .../service/ws/WebSocketSessionType.java | 4 +- .../cmd/NotificationCmdsWrapper.java | 17 ++ ...ginCmdsWrapper.java => WsCmdsWrapper.java} | 17 +- .../telemetry/cmd/v1/TelemetryPluginCmd.java | 4 +- .../service/ws/telemetry/cmd/v2/DataCmd.java | 3 +- .../ws/telemetry/cmd/v2/UnsubscribeCmd.java | 4 +- .../controller/TbTestWebSocketClient.java | 10 +- .../notification/NotificationApiWsClient.java | 22 +- .../lwm2m/AbstractLwM2MIntegrationTest.java | 4 +- 10 files changed, 142 insertions(+), 134 deletions(-) rename application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/{TelemetryPluginCmdsWrapper.java => WsCmdsWrapper.java} (74%) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index d0334f90b6..47478b3fdb 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -66,7 +66,7 @@ import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription; import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.WsCmd; -import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd; @@ -149,8 +149,7 @@ public class DefaultWebSocketService implements WebSocketService { private ScheduledExecutorService pingExecutor; private String serviceId; - private List> telemetryCmdsHandlers; - private List> notificationCmdsHandlers; + private List> cmdsHandlers; @PostConstruct public void init() { @@ -160,25 +159,23 @@ public class DefaultWebSocketService implements WebSocketService { pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping")); pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); - telemetryCmdsHandlers = List.of( - newCmdsHandler(TelemetryPluginCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd) - ); - notificationCmdsHandlers = List.of( - newCmdHandler(NotificationCmdsWrapper::getUnreadSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd), - newCmdHandler(NotificationCmdsWrapper::getUnreadCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), - newCmdHandler(NotificationCmdsWrapper::getMarkAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd), - newCmdHandler(NotificationCmdsWrapper::getMarkAllAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd), - newCmdHandler(NotificationCmdsWrapper::getUnsubCmd, notificationCmdsHandler::handleUnsubCmd) + cmdsHandlers = List.of( + newCmdsHandler(WsCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), + newCmdsHandler(WsCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), + newCmdsHandler(WsCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd), + newCmdsHandler(WsCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), + newCmdsHandler(WsCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), + newCmdsHandler(WsCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), + newCmdsHandler(WsCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), + newCmdsHandler(WsCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdHandler(WsCmdsWrapper::getUnreadNotificationsSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd), + newCmdHandler(WsCmdsWrapper::getUnreadNotificationsCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), + newCmdHandler(WsCmdsWrapper::getMarkNotificationAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd), + newCmdHandler(WsCmdsWrapper::getMarkAllNotificationsAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd), + newCmdHandler(WsCmdsWrapper::getNotificationsUnsubCmd, notificationCmdsHandler::handleUnsubCmd) ); } @@ -221,8 +218,8 @@ public class DefaultWebSocketService implements WebSocketService { try { switch (sessionRef.getSessionType()) { - case TELEMETRY: - processTelemetryCmds(sessionRef, msg); + case GENERAL: + processCmds(sessionRef, msg); break; case NOTIFICATIONS: processNotificationCmds(sessionRef, msg); @@ -234,25 +231,27 @@ public class DefaultWebSocketService implements WebSocketService { } } - private void processTelemetryCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { - TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, TelemetryPluginCmdsWrapper.class); - if (cmdsWrapper == null) { - return; - } - for (WsCmdListHandler cmdHandler : telemetryCmdsHandlers) { - List cmds = cmdHandler.extractCmds(cmdsWrapper); - if (cmds != null) { - cmdHandler.handle(sessionRef, cmds); - } - } + private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { + WsCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCmdsWrapper.class); + processCmds(sessionRef, cmdsWrapper); } private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException { NotificationCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class); - for (WsCmdHandler cmdHandler : notificationCmdsHandlers) { - WsCmd cmd = cmdHandler.extractCmd(cmdsWrapper); - if (cmd != null) { - String sessionId = sessionRef.getSessionId(); + processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper()); + } + + private void processCmds(WebSocketSessionRef sessionRef, WsCmdsWrapper cmdsWrapper) { + if (cmdsWrapper == null) { + return; + } + String sessionId = sessionRef.getSessionId(); + for (WsCmdsHandler cmdHandler : cmdsHandlers) { + List cmds = cmdHandler.extract(cmdsWrapper); + if (cmds == null) { + continue; + } + for (WsCmd cmd : cmds) { if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)) { try { cmdHandler.handle(sessionRef, cmd); @@ -269,8 +268,7 @@ public class DefaultWebSocketService implements WebSocketService { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) - && validateSubscriptionCmd(sessionRef, cmd)) { + if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } @@ -279,8 +277,7 @@ public class DefaultWebSocketService implements WebSocketService { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) - && validateSubscriptionCmd(sessionRef, cmd)) { + if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } @@ -289,8 +286,7 @@ public class DefaultWebSocketService implements WebSocketService { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) - && validateSubscriptionCmd(sessionRef, cmd)) { + if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } @@ -298,18 +294,14 @@ public class DefaultWebSocketService implements WebSocketService { private void handleWsDataUnsubscribeCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - - if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)) { - entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd); - } + entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd); } private void handleWsAlarmCountCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) - && validateSubscriptionCmd(sessionRef, cmd)) { + if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } @@ -457,19 +449,17 @@ public class DefaultWebSocketService implements WebSocketService { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSessionMetadata(sessionRef, cmd, sessionId)) { - if (cmd.isUnsubscribe()) { - unsubscribe(sessionRef, cmd, sessionId); - } else if (validateSubscriptionCmd(sessionRef, cmd)) { - EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); - log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId); - Optional> keysOptional = getKeys(cmd); - if (keysOptional.isPresent()) { - List keys = new ArrayList<>(keysOptional.get()); - handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys); - } else { - handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId); - } + if (cmd.isUnsubscribe()) { + unsubscribe(sessionRef, cmd, sessionId); + } else if (validateSubscriptionCmd(sessionRef, cmd)) { + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); + log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId); + Optional> keysOptional = getKeys(cmd); + if (keysOptional.isPresent()) { + List keys = new ArrayList<>(keysOptional.get()); + handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys); + } else { + handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId); } } } @@ -511,7 +501,7 @@ public class DefaultWebSocketService implements WebSocketService { .build(); subLock.lock(); - try{ + try { oldSubService.addSubscription(sub); sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData)); } finally { @@ -543,15 +533,6 @@ public class DefaultWebSocketService implements WebSocketService { } private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) { - String sessionId = sessionRef.getSessionId(); - WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); - if (sessionMD == null) { - log.warn("[{}] Session meta data not found. ", sessionId); - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - SESSION_META_DATA_NOT_FOUND); - sendWsMsg(sessionRef, update); - return; - } if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Device id is empty!"); @@ -662,18 +643,16 @@ public class DefaultWebSocketService implements WebSocketService { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSessionMetadata(sessionRef, cmd, sessionId)) { - if (cmd.isUnsubscribe()) { - unsubscribe(sessionRef, cmd, sessionId); - } else if (validateSubscriptionCmd(sessionRef, cmd)) { - EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); - Optional> keysOptional = getKeys(cmd); + if (cmd.isUnsubscribe()) { + unsubscribe(sessionRef, cmd, sessionId); + } else if (validateSubscriptionCmd(sessionRef, cmd)) { + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); + Optional> keysOptional = getKeys(cmd); - if (keysOptional.isPresent()) { - handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); - } else { - handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId); - } + if (keysOptional.isPresent()) { + handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); + } else { + handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId); } } } @@ -787,7 +766,7 @@ public class DefaultWebSocketService implements WebSocketService { .build(); subLock.lock(); - try{ + try { oldSubService.addSubscription(sub); sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); } finally { @@ -1056,23 +1035,23 @@ public class DefaultWebSocketService implements WebSocketService { } - public static WsCmdHandler newCmdHandler(java.util.function.Function cmdExtractor, - BiConsumer handler) { + public static WsCmdHandler newCmdHandler(java.util.function.Function cmdExtractor, + BiConsumer handler) { return new WsCmdHandler<>(cmdExtractor, handler); } - public static WsCmdListHandler newCmdsHandler(java.util.function.Function> cmdsExtractor, - BiConsumer handler) { - return new WsCmdListHandler<>(cmdsExtractor, handler); + public static WsCmdsHandler newCmdsHandler(java.util.function.Function> cmdsExtractor, + BiConsumer handler) { + return new WsCmdsHandler<>(cmdsExtractor, handler); } @RequiredArgsConstructor - public static class WsCmdHandler { - private final java.util.function.Function cmdExtractor; - private final BiConsumer handler; + public static class WsCmdsHandler { + private final java.util.function.Function> cmdsExtractor; + protected final BiConsumer handler; - public C extractCmd(W cmdsWrapper) { - return cmdExtractor.apply(cmdsWrapper); + public List extract(WsCmdsWrapper cmdsWrapper) { + return cmdsExtractor.apply(cmdsWrapper); } @SuppressWarnings("unchecked") @@ -1081,20 +1060,12 @@ public class DefaultWebSocketService implements WebSocketService { } } - @RequiredArgsConstructor - public static class WsCmdListHandler { - private final java.util.function.Function> cmdsExtractor; - private final BiConsumer handler; - - public List extractCmds(W cmdsWrapper) { - return cmdsExtractor.apply(cmdsWrapper); - } - - @SuppressWarnings("unchecked") - public void handle(WebSocketSessionRef sessionRef, List cmds) { - cmds.forEach(cmd -> { - handler.accept(sessionRef, (C) cmd); - }); + public static class WsCmdHandler extends WsCmdsHandler { + public WsCmdHandler(java.util.function.Function cmdExtractor, BiConsumer handler) { + super(cmdsWrapper -> { + C cmd = cmdExtractor.apply(cmdsWrapper); + return cmd != null ? List.of(cmd) : null; + }, handler); } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java index 5c1a155c4b..5547ef3359 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java @@ -24,8 +24,8 @@ import java.util.Optional; @RequiredArgsConstructor @Getter public enum WebSocketSessionType { - TELEMETRY("telemetry"), - NOTIFICATIONS("notifications"); + GENERAL("telemetry"), + NOTIFICATIONS("notifications"); // deprecated private final String name; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java index 88613eb966..de7a6adab7 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java @@ -15,9 +15,15 @@ */ package org.thingsboard.server.service.ws.notification.cmd; +import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; +/** + * @deprecated Use {@link org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper}. This class is left for backward compatibility + * */ @Data +@Deprecated public class NotificationCmdsWrapper { private NotificationsCountSubCmd unreadCountSubCmd; @@ -30,4 +36,15 @@ public class NotificationCmdsWrapper { private NotificationsUnsubCmd unsubCmd; + @JsonIgnore + public WsCmdsWrapper toCommonCmdsWrapper() { + WsCmdsWrapper wrapper = new WsCmdsWrapper(); + wrapper.setUnreadNotificationsCountSubCmd(unreadCountSubCmd); + wrapper.setUnreadNotificationsSubCmd(unreadSubCmd); + wrapper.setMarkNotificationAsReadCmd(markAsReadCmd); + wrapper.setMarkAllNotificationsAsReadCmd(markAllAsReadCmd); + wrapper.setNotificationsUnsubCmd(unsubCmd); + return wrapper; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/TelemetryPluginCmdsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCmdsWrapper.java similarity index 74% rename from application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/TelemetryPluginCmdsWrapper.java rename to application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCmdsWrapper.java index 5b2c12afcd..40eb672030 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/TelemetryPluginCmdsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCmdsWrapper.java @@ -16,6 +16,11 @@ package org.thingsboard.server.service.ws.telemetry.cmd; import lombok.Data; +import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd; +import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd; +import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubCmd; +import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; +import org.thingsboard.server.service.ws.notification.cmd.NotificationsUnsubCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd; @@ -34,7 +39,7 @@ import java.util.List; * @author Andrew Shvayka */ @Data -public class TelemetryPluginCmdsWrapper { +public class WsCmdsWrapper { private List attrSubCmds; @@ -58,4 +63,14 @@ public class TelemetryPluginCmdsWrapper { private List alarmCountUnsubscribeCmds; + private NotificationsCountSubCmd unreadNotificationsCountSubCmd; + + private NotificationsSubCmd unreadNotificationsSubCmd; + + private MarkNotificationsAsReadCmd markNotificationAsReadCmd; + + private MarkAllNotificationsAsReadCmd markAllNotificationsAsReadCmd; + + private NotificationsUnsubCmd notificationsUnsubCmd; + } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java index dc9d3a48c2..5081e71d0a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.service.ws.telemetry.cmd.v1; +import org.thingsboard.server.service.ws.notification.cmd.WsCmd; + /** * @author Andrew Shvayka */ -public interface TelemetryPluginCmd { +public interface TelemetryPluginCmd extends WsCmd { int getCmdId(); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java index c09cbc03d1..282e95f511 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java @@ -17,9 +17,10 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import lombok.Data; import lombok.Getter; +import org.thingsboard.server.service.ws.notification.cmd.WsCmd; @Data -public class DataCmd { +public class DataCmd implements WsCmd { @Getter private final int cmdId; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java index 81288c76da..3bdc70b503 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.service.ws.telemetry.cmd.v2; -public interface UnsubscribeCmd { +import org.thingsboard.server.service.ws.notification.cmd.WsCmd; + +public interface UnsubscribeCmd extends WsCmd { int getCmdId(); diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index ede6c69fd4..43afcfc845 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityFilter; import org.thingsboard.server.common.data.query.EntityKey; -import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; @@ -106,19 +106,19 @@ public class TbTestWebSocketClient extends WebSocketClient { } public void send(EntityDataCmd cmd) throws NotYetConnectedException { - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + WsCmdsWrapper wrapper = new WsCmdsWrapper(); wrapper.setEntityDataCmds(Collections.singletonList(cmd)); this.send(JacksonUtil.toString(wrapper)); } public void send(EntityCountCmd cmd) throws NotYetConnectedException { - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + WsCmdsWrapper wrapper = new WsCmdsWrapper(); wrapper.setEntityCountCmds(Collections.singletonList(cmd)); this.send(JacksonUtil.toString(wrapper)); } public void send(AlarmCountCmd cmd) throws NotYetConnectedException { - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + WsCmdsWrapper wrapper = new WsCmdsWrapper(); wrapper.setAlarmCountCmds(Collections.singletonList(cmd)); this.send(JacksonUtil.toString(wrapper)); } @@ -240,7 +240,7 @@ public class TbTestWebSocketClient extends WebSocketClient { cmd.setEntityId(entityId.getId().toString()); cmd.setScope(scope); cmd.setKeys(String.join(",", keys)); - TelemetryPluginCmdsWrapper cmdsWrapper = new TelemetryPluginCmdsWrapper(); + WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); cmdsWrapper.setAttrSubCmds(List.of(cmd)); JsonNode msg = JacksonUtil.valueToTree(cmdsWrapper); ((ObjectNode) msg.get("attrSubCmds").get(0)).remove("type"); diff --git a/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java b/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java index 664ff7bd54..31023fe258 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java @@ -24,11 +24,11 @@ import org.thingsboard.server.common.data.notification.Notification; import org.thingsboard.server.controller.TbTestWebSocketClient; import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd; import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd; -import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubCmd; import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType; import java.net.URI; @@ -50,37 +50,37 @@ public class NotificationApiWsClient extends TbTestWebSocketClient { private List notifications; public NotificationApiWsClient(String wsUrl, String token) throws URISyntaxException { - super(new URI(wsUrl + "/api/ws/plugins/notifications?token=" + token)); + super(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + token)); } public NotificationApiWsClient subscribeForUnreadNotifications(int limit) { - NotificationCmdsWrapper cmdsWrapper = new NotificationCmdsWrapper(); - cmdsWrapper.setUnreadSubCmd(new NotificationsSubCmd(1, limit)); + WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); + cmdsWrapper.setUnreadNotificationsSubCmd(new NotificationsSubCmd(1, limit)); sendCmd(cmdsWrapper); this.limit = limit; return this; } public NotificationApiWsClient subscribeForUnreadNotificationsCount() { - NotificationCmdsWrapper cmdsWrapper = new NotificationCmdsWrapper(); - cmdsWrapper.setUnreadCountSubCmd(new NotificationsCountSubCmd(2)); + WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); + cmdsWrapper.setUnreadNotificationsCountSubCmd(new NotificationsCountSubCmd(2)); sendCmd(cmdsWrapper); return this; } public void markNotificationAsRead(UUID... notifications) { - NotificationCmdsWrapper cmdsWrapper = new NotificationCmdsWrapper(); - cmdsWrapper.setMarkAsReadCmd(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications))); + WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); + cmdsWrapper.setMarkNotificationAsReadCmd(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications))); sendCmd(cmdsWrapper); } public void markAllNotificationsAsRead() { - NotificationCmdsWrapper cmdsWrapper = new NotificationCmdsWrapper(); - cmdsWrapper.setMarkAllAsReadCmd(new MarkAllNotificationsAsReadCmd(newCmdId())); + WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); + cmdsWrapper.setMarkAllNotificationsAsReadCmd(new MarkAllNotificationsAsReadCmd(newCmdId())); sendCmd(cmdsWrapper); } - public void sendCmd(NotificationCmdsWrapper cmdsWrapper) { + public void sendCmd(WsCmdsWrapper cmdsWrapper) { String cmd = JacksonUtil.toString(cmdsWrapper); send(cmd); } diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java index 64382d5d51..3dd011ac78 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java @@ -63,7 +63,7 @@ import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.service.DaoSqlTest; -import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; @@ -229,7 +229,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + WsCmdsWrapper wrapper = new WsCmdsWrapper(); wrapper.setEntityDataCmds(Collections.singletonList(cmd)); getWsClient().send(JacksonUtil.toString(wrapper));