|
|
|
@ -66,7 +66,7 @@ import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription; |
|
|
|
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; |
|
|
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; |
|
|
|
import org.thingsboard.server.service.ws.notification.cmd.WsCmd; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryPluginCmdsWrapper; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd; |
|
|
|
@ -149,8 +149,7 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
private ScheduledExecutorService pingExecutor; |
|
|
|
private String serviceId; |
|
|
|
|
|
|
|
private List<WsCmdListHandler<TelemetryPluginCmdsWrapper, ?>> telemetryCmdsHandlers; |
|
|
|
private List<WsCmdHandler<NotificationCmdsWrapper, ? extends WsCmd>> notificationCmdsHandlers; |
|
|
|
private List<WsCmdsHandler<? extends WsCmd>> cmdsHandlers; |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
@ -160,25 +159,23 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping")); |
|
|
|
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); |
|
|
|
|
|
|
|
telemetryCmdsHandlers = List.of( |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdsHandler(TelemetryPluginCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd) |
|
|
|
); |
|
|
|
notificationCmdsHandlers = List.of( |
|
|
|
newCmdHandler(NotificationCmdsWrapper::getUnreadSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd), |
|
|
|
newCmdHandler(NotificationCmdsWrapper::getUnreadCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), |
|
|
|
newCmdHandler(NotificationCmdsWrapper::getMarkAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd), |
|
|
|
newCmdHandler(NotificationCmdsWrapper::getMarkAllAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd), |
|
|
|
newCmdHandler(NotificationCmdsWrapper::getUnsubCmd, notificationCmdsHandler::handleUnsubCmd) |
|
|
|
cmdsHandlers = List.of( |
|
|
|
newCmdsHandler(WsCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdsHandler(WsCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), |
|
|
|
newCmdHandler(WsCmdsWrapper::getUnreadNotificationsSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd), |
|
|
|
newCmdHandler(WsCmdsWrapper::getUnreadNotificationsCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), |
|
|
|
newCmdHandler(WsCmdsWrapper::getMarkNotificationAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd), |
|
|
|
newCmdHandler(WsCmdsWrapper::getMarkAllNotificationsAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd), |
|
|
|
newCmdHandler(WsCmdsWrapper::getNotificationsUnsubCmd, notificationCmdsHandler::handleUnsubCmd) |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
@ -221,8 +218,8 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
|
|
|
|
try { |
|
|
|
switch (sessionRef.getSessionType()) { |
|
|
|
case TELEMETRY: |
|
|
|
processTelemetryCmds(sessionRef, msg); |
|
|
|
case GENERAL: |
|
|
|
processCmds(sessionRef, msg); |
|
|
|
break; |
|
|
|
case NOTIFICATIONS: |
|
|
|
processNotificationCmds(sessionRef, msg); |
|
|
|
@ -234,25 +231,27 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void processTelemetryCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { |
|
|
|
TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, TelemetryPluginCmdsWrapper.class); |
|
|
|
if (cmdsWrapper == null) { |
|
|
|
return; |
|
|
|
} |
|
|
|
for (WsCmdListHandler<TelemetryPluginCmdsWrapper, ?> cmdHandler : telemetryCmdsHandlers) { |
|
|
|
List<?> cmds = cmdHandler.extractCmds(cmdsWrapper); |
|
|
|
if (cmds != null) { |
|
|
|
cmdHandler.handle(sessionRef, cmds); |
|
|
|
} |
|
|
|
} |
|
|
|
private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { |
|
|
|
WsCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCmdsWrapper.class); |
|
|
|
processCmds(sessionRef, cmdsWrapper); |
|
|
|
} |
|
|
|
|
|
|
|
private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException { |
|
|
|
NotificationCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class); |
|
|
|
for (WsCmdHandler<NotificationCmdsWrapper, ? extends WsCmd> cmdHandler : notificationCmdsHandlers) { |
|
|
|
WsCmd cmd = cmdHandler.extractCmd(cmdsWrapper); |
|
|
|
if (cmd != null) { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper()); |
|
|
|
} |
|
|
|
|
|
|
|
private void processCmds(WebSocketSessionRef sessionRef, WsCmdsWrapper cmdsWrapper) { |
|
|
|
if (cmdsWrapper == null) { |
|
|
|
return; |
|
|
|
} |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
for (WsCmdsHandler<? extends WsCmd> cmdHandler : cmdsHandlers) { |
|
|
|
List<? extends WsCmd> cmds = cmdHandler.extract(cmdsWrapper); |
|
|
|
if (cmds == null) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
for (WsCmd cmd : cmds) { |
|
|
|
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)) { |
|
|
|
try { |
|
|
|
cmdHandler.handle(sessionRef, cmd); |
|
|
|
@ -269,8 +268,7 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) |
|
|
|
&& validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
entityDataSubService.handleCmd(sessionRef, cmd); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -279,8 +277,7 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) |
|
|
|
&& validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
entityDataSubService.handleCmd(sessionRef, cmd); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -289,8 +286,7 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) |
|
|
|
&& validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
entityDataSubService.handleCmd(sessionRef, cmd); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -298,18 +294,14 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
private void handleWsDataUnsubscribeCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)) { |
|
|
|
entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd); |
|
|
|
} |
|
|
|
entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd); |
|
|
|
} |
|
|
|
|
|
|
|
private void handleWsAlarmCountCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) |
|
|
|
&& validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
entityDataSubService.handleCmd(sessionRef, cmd); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -457,19 +449,17 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd, sessionId)) { |
|
|
|
if (cmd.isUnsubscribe()) { |
|
|
|
unsubscribe(sessionRef, cmd, sessionId); |
|
|
|
} else if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); |
|
|
|
log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId); |
|
|
|
Optional<Set<String>> keysOptional = getKeys(cmd); |
|
|
|
if (keysOptional.isPresent()) { |
|
|
|
List<String> keys = new ArrayList<>(keysOptional.get()); |
|
|
|
handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys); |
|
|
|
} else { |
|
|
|
handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId); |
|
|
|
} |
|
|
|
if (cmd.isUnsubscribe()) { |
|
|
|
unsubscribe(sessionRef, cmd, sessionId); |
|
|
|
} else if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); |
|
|
|
log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId); |
|
|
|
Optional<Set<String>> keysOptional = getKeys(cmd); |
|
|
|
if (keysOptional.isPresent()) { |
|
|
|
List<String> keys = new ArrayList<>(keysOptional.get()); |
|
|
|
handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys); |
|
|
|
} else { |
|
|
|
handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -511,7 +501,7 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
.build(); |
|
|
|
|
|
|
|
subLock.lock(); |
|
|
|
try{ |
|
|
|
try { |
|
|
|
oldSubService.addSubscription(sub); |
|
|
|
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData)); |
|
|
|
} finally { |
|
|
|
@ -543,15 +533,6 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
} |
|
|
|
|
|
|
|
private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); |
|
|
|
if (sessionMD == null) { |
|
|
|
log.warn("[{}] Session meta data not found. ", sessionId); |
|
|
|
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, |
|
|
|
SESSION_META_DATA_NOT_FOUND); |
|
|
|
sendWsMsg(sessionRef, update); |
|
|
|
return; |
|
|
|
} |
|
|
|
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { |
|
|
|
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, |
|
|
|
"Device id is empty!"); |
|
|
|
@ -662,18 +643,16 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
String sessionId = sessionRef.getSessionId(); |
|
|
|
log.debug("[{}] Processing: {}", sessionId, cmd); |
|
|
|
|
|
|
|
if (validateSessionMetadata(sessionRef, cmd, sessionId)) { |
|
|
|
if (cmd.isUnsubscribe()) { |
|
|
|
unsubscribe(sessionRef, cmd, sessionId); |
|
|
|
} else if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); |
|
|
|
Optional<Set<String>> keysOptional = getKeys(cmd); |
|
|
|
if (cmd.isUnsubscribe()) { |
|
|
|
unsubscribe(sessionRef, cmd, sessionId); |
|
|
|
} else if (validateSubscriptionCmd(sessionRef, cmd)) { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); |
|
|
|
Optional<Set<String>> keysOptional = getKeys(cmd); |
|
|
|
|
|
|
|
if (keysOptional.isPresent()) { |
|
|
|
handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); |
|
|
|
} else { |
|
|
|
handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId); |
|
|
|
} |
|
|
|
if (keysOptional.isPresent()) { |
|
|
|
handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); |
|
|
|
} else { |
|
|
|
handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -787,7 +766,7 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
.build(); |
|
|
|
|
|
|
|
subLock.lock(); |
|
|
|
try{ |
|
|
|
try { |
|
|
|
oldSubService.addSubscription(sub); |
|
|
|
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); |
|
|
|
} finally { |
|
|
|
@ -1056,23 +1035,23 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public static <W, C> WsCmdHandler<W, C> newCmdHandler(java.util.function.Function<W, C> cmdExtractor, |
|
|
|
BiConsumer<WebSocketSessionRef, C> handler) { |
|
|
|
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(java.util.function.Function<WsCmdsWrapper, C> cmdExtractor, |
|
|
|
BiConsumer<WebSocketSessionRef, C> handler) { |
|
|
|
return new WsCmdHandler<>(cmdExtractor, handler); |
|
|
|
} |
|
|
|
|
|
|
|
public static <W, C> WsCmdListHandler<W, C> newCmdsHandler(java.util.function.Function<W, List<C>> cmdsExtractor, |
|
|
|
BiConsumer<WebSocketSessionRef, C> handler) { |
|
|
|
return new WsCmdListHandler<>(cmdsExtractor, handler); |
|
|
|
public static <C extends WsCmd> WsCmdsHandler<C> newCmdsHandler(java.util.function.Function<WsCmdsWrapper, List<C>> cmdsExtractor, |
|
|
|
BiConsumer<WebSocketSessionRef, C> handler) { |
|
|
|
return new WsCmdsHandler<>(cmdsExtractor, handler); |
|
|
|
} |
|
|
|
|
|
|
|
@RequiredArgsConstructor |
|
|
|
public static class WsCmdHandler<W, C> { |
|
|
|
private final java.util.function.Function<W, C> cmdExtractor; |
|
|
|
private final BiConsumer<WebSocketSessionRef, C> handler; |
|
|
|
public static class WsCmdsHandler<C extends WsCmd> { |
|
|
|
private final java.util.function.Function<WsCmdsWrapper, List<C>> cmdsExtractor; |
|
|
|
protected final BiConsumer<WebSocketSessionRef, C> handler; |
|
|
|
|
|
|
|
public C extractCmd(W cmdsWrapper) { |
|
|
|
return cmdExtractor.apply(cmdsWrapper); |
|
|
|
public List<C> extract(WsCmdsWrapper cmdsWrapper) { |
|
|
|
return cmdsExtractor.apply(cmdsWrapper); |
|
|
|
} |
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
@ -1081,20 +1060,12 @@ public class DefaultWebSocketService implements WebSocketService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@RequiredArgsConstructor |
|
|
|
public static class WsCmdListHandler<W, C> { |
|
|
|
private final java.util.function.Function<W, List<C>> cmdsExtractor; |
|
|
|
private final BiConsumer<WebSocketSessionRef, C> handler; |
|
|
|
|
|
|
|
public List<C> extractCmds(W cmdsWrapper) { |
|
|
|
return cmdsExtractor.apply(cmdsWrapper); |
|
|
|
} |
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
public void handle(WebSocketSessionRef sessionRef, List<?> cmds) { |
|
|
|
cmds.forEach(cmd -> { |
|
|
|
handler.accept(sessionRef, (C) cmd); |
|
|
|
}); |
|
|
|
public static class WsCmdHandler<C extends WsCmd> extends WsCmdsHandler<C> { |
|
|
|
public WsCmdHandler(java.util.function.Function<WsCmdsWrapper, C> cmdExtractor, BiConsumer<WebSocketSessionRef, C> handler) { |
|
|
|
super(cmdsWrapper -> { |
|
|
|
C cmd = cmdExtractor.apply(cmdsWrapper); |
|
|
|
return cmd != null ? List.of(cmd) : null; |
|
|
|
}, handler); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|