@ -21,8 +21,10 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import com.google.common.util.concurrent.MoreExecutors ;
import lombok.Getter ;
import lombok.RequiredArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.collections.CollectionUtils ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.stereotype.Service ;
import org.springframework.web.socket.CloseStatus ;
@ -64,9 +66,6 @@ import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionServi
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService ;
import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription ;
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler ;
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper ;
import org.thingsboard.server.service.ws.notification.cmd.WsCmd ;
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryPluginCmdsWrapper ;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd ;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd ;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd ;
@ -87,6 +86,7 @@ import javax.annotation.PreDestroy;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.EnumMap ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.List ;
@ -123,7 +123,6 @@ public class DefaultWebSocketService implements WebSocketService {
private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!" ;
private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!" ;
private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!" ;
private static final String FAILED_TO_PARSE_WS_COMMAND = "Failed to parse websocket command!" ;
private final ConcurrentMap < String , WsSessionMetaData > wsSessionsMap = new ConcurrentHashMap < > ( ) ;
@ -149,8 +148,7 @@ public class DefaultWebSocketService implements WebSocketService {
private ScheduledExecutorService pingExecutor ;
private String serviceId ;
private List < WsCmdListHandler < TelemetryPluginCmdsWrapper , ? > > telemetryCmdsHandlers ;
private List < WsCmdHandler < NotificationCmdsWrapper , ? extends WsCmd > > notificationCmdsHandlers ;
private Map < WsCmdType , WsCmdHandler < ? extends WsCmd > > cmdsHandlers ;
@PostConstruct
public void init ( ) {
@ -160,26 +158,23 @@ public class DefaultWebSocketService implements WebSocketService {
pingExecutor = Executors . newSingleThreadScheduledExecutor ( ThingsBoardThreadFactory . forName ( "telemetry-web-socket-ping" ) ) ;
pingExecutor . scheduleWithFixedDelay ( this : : sendPing , pingTimeout / NUMBER_OF_PING_ATTEMPTS , pingTimeout / NUMBER_OF_PING_ATTEMPTS , TimeUnit . MILLISECONDS ) ;
telemetryCmdsHandlers = List . of (
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getAttrSubCmds , this : : handleWsAttributesSubscriptionCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getTsSubCmds , this : : handleWsTimeseriesSubscriptionCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getHistoryCmds , this : : handleWsHistoryCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getEntityDataCmds , this : : handleWsEntityDataCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getAlarmDataCmds , this : : handleWsAlarmDataCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getEntityCountCmds , this : : handleWsEntityCountCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getAlarmCountCmds , this : : handleWsAlarmCountCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getEntityDataUnsubscribeCmds , this : : handleWsDataUnsubscribeCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getAlarmDataUnsubscribeCmds , this : : handleWsDataUnsubscribeCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getEntityCountUnsubscribeCmds , this : : handleWsDataUnsubscribeCmd ) ,
newCmdsHandler ( TelemetryPluginCmdsWrapper : : getAlarmCountUnsubscribeCmds , this : : handleWsDataUnsubscribeCmd )
) ;
notificationCmdsHandlers = List . of (
newCmdHandler ( NotificationCmdsWrapper : : getUnreadSubCmd , notificationCmdsHandler : : handleUnreadNotificationsSubCmd ) ,
newCmdHandler ( NotificationCmdsWrapper : : getUnreadCountSubCmd , notificationCmdsHandler : : handleUnreadNotificationsCountSubCmd ) ,
newCmdHandler ( NotificationCmdsWrapper : : getMarkAsReadCmd , notificationCmdsHandler : : handleMarkAsReadCmd ) ,
newCmdHandler ( NotificationCmdsWrapper : : getMarkAllAsReadCmd , notificationCmdsHandler : : handleMarkAllAsReadCmd ) ,
newCmdHandler ( NotificationCmdsWrapper : : getUnsubCmd , notificationCmdsHandler : : handleUnsubCmd )
) ;
cmdsHandlers = new EnumMap < > ( WsCmdType . class ) ;
cmdsHandlers . put ( WsCmdType . ATTRIBUTES , newCmdHandler ( this : : handleWsAttributesSubscriptionCmd ) ) ;
cmdsHandlers . put ( WsCmdType . TIMESERIES , newCmdHandler ( this : : handleWsTimeseriesSubscriptionCmd ) ) ;
cmdsHandlers . put ( WsCmdType . TIMESERIES_HISTORY , newCmdHandler ( this : : handleWsHistoryCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ENTITY_DATA , newCmdHandler ( this : : handleWsEntityDataCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ALARM_DATA , newCmdHandler ( this : : handleWsAlarmDataCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ENTITY_COUNT , newCmdHandler ( this : : handleWsEntityCountCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ALARM_COUNT , newCmdHandler ( this : : handleWsAlarmCountCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ENTITY_DATA_UNSUBSCRIBE , newCmdHandler ( this : : handleWsDataUnsubscribeCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ALARM_DATA_UNSUBSCRIBE , newCmdHandler ( this : : handleWsDataUnsubscribeCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ENTITY_COUNT_UNSUBSCRIBE , newCmdHandler ( this : : handleWsDataUnsubscribeCmd ) ) ;
cmdsHandlers . put ( WsCmdType . ALARM_COUNT_UNSUBSCRIBE , newCmdHandler ( this : : handleWsDataUnsubscribeCmd ) ) ;
cmdsHandlers . put ( WsCmdType . NOTIFICATIONS , newCmdHandler ( notificationCmdsHandler : : handleUnreadNotificationsSubCmd ) ) ;
cmdsHandlers . put ( WsCmdType . NOTIFICATIONS_COUNT , newCmdHandler ( notificationCmdsHandler : : handleUnreadNotificationsCountSubCmd ) ) ;
cmdsHandlers . put ( WsCmdType . MARK_NOTIFICATIONS_AS_READ , newCmdHandler ( notificationCmdsHandler : : handleMarkAsReadCmd ) ) ;
cmdsHandlers . put ( WsCmdType . MARK_ALL_NOTIFICATIONS_AS_READ , newCmdHandler ( notificationCmdsHandler : : handleMarkAllAsReadCmd ) ) ;
cmdsHandlers . put ( WsCmdType . NOTIFICATIONS_UNSUBSCRIBE , newCmdHandler ( notificationCmdsHandler : : handleUnsubCmd ) ) ;
}
@PreDestroy
@ -194,7 +189,7 @@ public class DefaultWebSocketService implements WebSocketService {
}
@Override
public void handleWebSocket SessionEvent ( WebSocketSessionRef sessionRef , SessionEvent event ) {
public void handleSessionEvent ( WebSocketSessionRef sessionRef , SessionEvent event ) {
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( PROCESSING_MSG , sessionId , event ) ;
switch ( event . getEventType ( ) ) {
@ -214,120 +209,75 @@ public class DefaultWebSocketService implements WebSocketService {
}
@Override
public void handleWebSocketMsg ( WebSocketSessionRef sessionRef , String msg ) {
if ( log . isTraceEnabled ( ) ) {
log . trace ( "[{}] Processing: {}" , sessionRef . getSessionId ( ) , msg ) ;
}
try {
switch ( sessionRef . getSessionType ( ) ) {
case TELEMETRY :
processTelemetryCmds ( sessionRef , msg ) ;
break ;
case NOTIFICATIONS :
processNotificationCmds ( sessionRef , msg ) ;
break ;
}
} catch ( IOException e ) {
log . warn ( "Failed to decode subscription cmd: {}" , e . getMessage ( ) , e ) ;
sendWsMsg ( sessionRef , new TelemetrySubscriptionUpdate ( UNKNOWN_SUBSCRIPTION_ID , SubscriptionErrorCode . BAD_REQUEST , FAILED_TO_PARSE_WS_COMMAND ) ) ;
}
}
private void processTelemetryCmds ( WebSocketSessionRef sessionRef , String msg ) throws JsonProcessingException {
TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil . fromString ( msg , TelemetryPluginCmdsWrapper . class ) ;
if ( cmdsWrapper = = null ) {
public void handleCommands ( WebSocketSessionRef sessionRef , WsCommandsWrapper commandsWrapper ) {
if ( commandsWrapper = = null | | CollectionUtils . isEmpty ( commandsWrapper . getCmds ( ) ) ) {
return ;
}
for ( WsCmdListHandler < TelemetryPluginCmdsWrapper , ? > cmdHandler : telemetryCmdsHandlers ) {
List < ? > cmds = cmdHandler . extractCmds ( cmdsWrapper ) ;
if ( cmds ! = null ) {
cmdHandler . handle ( sessionRef , cmds ) ;
}
String sessionId = sessionRef . getSessionId ( ) ;
if ( ! validateSessionMetadata ( sessionRef , UNKNOWN_SUBSCRIPTION_ID , sessionId ) ) {
return ;
}
}
private void processNotificationCmds ( WebSocketSessionRef sessionRef , String msg ) throws IOException {
NotificationCmdsWrapper cmdsWrapper = JacksonUtil . fromString ( msg , NotificationCmdsWrapper . class ) ;
for ( WsCmdHandler < NotificationCmdsWrapper , ? extends WsCmd > cmdHandler : notificationCmdsHandlers ) {
WsCmd cmd = cmdHandler . extractCmd ( cmdsWrapper ) ;
if ( cmd ! = null ) {
String sessionId = sessionRef . getSessionId ( ) ;
if ( validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId ) ) {
try {
cmdHandler . handle ( sessionRef , cmd ) ;
} catch ( Exception e ) {
log . error ( "[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}" , sessionId ,
sessionRef . getSecurityCtx ( ) . getTenantId ( ) , sessionRef . getSecurityCtx ( ) . getId ( ) , cmd , e ) ;
}
}
for ( WsCmd cmd : commandsWrapper . getCmds ( ) ) {
log . debug ( "[{}][{}][{}] Processing cmd: {}" , sessionId , cmd . getType ( ) , cmd . getCmdId ( ) , cmd ) ;
try {
Optional . ofNullable ( cmdsHandlers . get ( cmd . getType ( ) ) )
. ifPresent ( cmdHandler - > cmdHandler . handle ( sessionRef , cmd ) ) ;
} catch ( Exception e ) {
log . error ( "[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}" , sessionId ,
sessionRef . getSecurityCtx ( ) . getTenantId ( ) , sessionRef . getSecurityCtx ( ) . getId ( ) , cmd , e ) ;
}
}
}
private void handleWsEntityDataCmd ( WebSocketSessionRef sessionRef , EntityDataCmd cmd ) {
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId )
& & validateSubscriptionCmd ( sessionRef , cmd ) ) {
if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
entityDataSubService . handleCmd ( sessionRef , cmd ) ;
}
}
private void handleWsEntityCountCmd ( WebSocketSessionRef sessionRef , EntityCountCmd cmd ) {
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId )
& & validateSubscriptionCmd ( sessionRef , cmd ) ) {
if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
entityDataSubService . handleCmd ( sessionRef , cmd ) ;
}
}
private void handleWsAlarmDataCmd ( WebSocketSessionRef sessionRef , AlarmDataCmd cmd ) {
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId )
& & validateSubscriptionCmd ( sessionRef , cmd ) ) {
if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
entityDataSubService . handleCmd ( sessionRef , cmd ) ;
}
}
private void handleWsDataUnsubscribeCmd ( WebSocketSessionRef sessionRef , UnsubscribeCmd cmd ) {
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId ) ) {
entityDataSubService . cancelSubscription ( sessionRef . getSessionId ( ) , cmd ) ;
}
entityDataSubService . cancelSubscription ( sessionRef . getSessionId ( ) , cmd ) ;
}
private void handleWsAlarmCountCmd ( WebSocketSessionRef sessionRef , AlarmCountCmd cmd ) {
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId )
& & validateSubscriptionCmd ( sessionRef , cmd ) ) {
if ( validateCmd ( sessionRef , cmd ) ) {
entityDataSubService . handleCmd ( sessionRef , cmd ) ;
}
}
@Override
public void sendWsMsg ( String sessionId , TelemetrySubscriptionUpdate update ) {
sendWsMsg ( sessionId , update . getSubscriptionId ( ) , update ) ;
public void sendUpdate ( String sessionId , TelemetrySubscriptionUpdate update ) {
sendUpdate ( sessionId , update . getSubscriptionId ( ) , update ) ;
}
@Override
public void sendUpdate ( String sessionId , CmdUpdate update ) {
sendUpdate ( sessionId , update . getCmdId ( ) , update ) ;
}
@Override
public void sendWsMsg ( String sessionId , CmdUpdate update ) {
sendWsMsg ( sessionId , update . getCmdId ( ) , update ) ;
public void sendError ( WebSocketSessionRef sessionRef , int subId , SubscriptionErrorCode errorCode , String errorMsg ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( subId , errorCode , errorMsg ) ;
sendUpdate ( sessionRef , update ) ;
}
private < T > void sendWsMsg ( String sessionId , int cmdId , T update ) {
private < T > void sendUpdate ( String sessionId , int cmdId , T update ) {
WsSessionMetaData md = wsSessionsMap . get ( sessionId ) ;
if ( md ! = null ) {
sendWsMsg ( md . getSessionRef ( ) , cmdId , update ) ;
sendUpdate ( md . getSessionRef ( ) , cmdId , update ) ;
}
}
@ -455,21 +405,17 @@ public class DefaultWebSocketService implements WebSocketService {
}
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd , sessionId ) ) {
if ( cmd . isUnsubscribe ( ) ) {
unsubscribe ( sessionRef , cmd , sessionId ) ;
} else if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
EntityId entityId = EntityIdFactory . getByTypeAndId ( cmd . getEntityType ( ) , cmd . getEntityId ( ) ) ;
log . debug ( "[{}] fetching latest attributes ({}) values for device: {}" , sessionId , cmd . getKeys ( ) , entityId ) ;
Optional < Set < String > > keysOptional = getKeys ( cmd ) ;
if ( keysOptional . isPresent ( ) ) {
List < String > keys = new ArrayList < > ( keysOptional . get ( ) ) ;
handleWsAttributesSubscriptionByKeys ( sessionRef , cmd , sessionId , entityId , keys ) ;
} else {
handleWsAttributesSubscription ( sessionRef , cmd , sessionId , entityId ) ;
}
if ( cmd . isUnsubscribe ( ) ) {
unsubscribe ( sessionRef , cmd , sessionId ) ;
} else if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
EntityId entityId = EntityIdFactory . getByTypeAndId ( cmd . getEntityType ( ) , cmd . getEntityId ( ) ) ;
log . debug ( "[{}] fetching latest attributes ({}) values for device: {}" , sessionId , cmd . getKeys ( ) , entityId ) ;
Optional < Set < String > > keysOptional = getKeys ( cmd ) ;
if ( keysOptional . isPresent ( ) ) {
List < String > keys = new ArrayList < > ( keysOptional . get ( ) ) ;
handleWsAttributesSubscriptionByKeys ( sessionRef , cmd , sessionId , entityId , keys ) ;
} else {
handleWsAttributesSubscription ( sessionRef , cmd , sessionId , entityId ) ;
}
}
}
@ -503,7 +449,7 @@ public class DefaultWebSocketService implements WebSocketService {
. updateProcessor ( ( subscription , update ) - > {
subLock . lock ( ) ;
try {
sendWsMsg ( subscription . getSessionId ( ) , update ) ;
sendUpdate ( subscription . getSessionId ( ) , update ) ;
} finally {
subLock . unlock ( ) ;
}
@ -511,9 +457,9 @@ public class DefaultWebSocketService implements WebSocketService {
. build ( ) ;
subLock . lock ( ) ;
try {
try {
oldSubService . addSubscription ( sub ) ;
sendWsMsg ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , attributesData ) ) ;
sendUpdate ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , attributesData ) ) ;
} finally {
subLock . unlock ( ) ;
}
@ -531,7 +477,7 @@ public class DefaultWebSocketService implements WebSocketService {
update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR ,
FAILED_TO_FETCH_ATTRIBUTES ) ;
}
sendWsMsg ( sessionRef , update ) ;
sendUpdate ( sessionRef , update ) ;
}
} ;
@ -543,27 +489,15 @@ public class DefaultWebSocketService implements WebSocketService {
}
private void handleWsHistoryCmd ( WebSocketSessionRef sessionRef , GetHistoryCmd cmd ) {
String sessionId = sessionRef . getSessionId ( ) ;
WsSessionMetaData sessionMD = wsSessionsMap . get ( sessionId ) ;
if ( sessionMD = = null ) {
log . warn ( "[{}] Session meta data not found. " , sessionId ) ;
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR ,
SESSION_META_DATA_NOT_FOUND ) ;
sendWsMsg ( sessionRef , update ) ;
return ;
}
if ( cmd . getEntityId ( ) = = null | | cmd . getEntityId ( ) . isEmpty ( ) | | cmd . getEntityType ( ) = = null | | cmd . getEntityType ( ) . isEmpty ( ) ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Device id is empty!" ) ;
sendWsMsg ( sessionRef , update ) ;
return ;
}
if ( cmd . getKeys ( ) = = null | | cmd . getKeys ( ) . isEmpty ( ) ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Keys are empty!" ) ;
sendWsMsg ( sessionRef , update ) ;
return ;
}
if ( ! validateCmd ( sessionRef , cmd , ( ) - > {
if ( cmd . getEntityId ( ) = = null | | cmd . getEntityId ( ) . isEmpty ( ) | | cmd . getEntityType ( ) = = null | | cmd . getEntityType ( ) . isEmpty ( ) ) {
throw new IllegalArgumentException ( "Device id is empty!" ) ;
}
if ( cmd . getKeys ( ) = = null | | cmd . getKeys ( ) . isEmpty ( ) ) {
throw new IllegalArgumentException ( "Keys are empty!" ) ;
}
} ) ) return ;
EntityId entityId = EntityIdFactory . getByTypeAndId ( cmd . getEntityType ( ) , cmd . getEntityId ( ) ) ;
List < String > keys = new ArrayList < > ( getKeys ( cmd ) . orElse ( Collections . emptySet ( ) ) ) ;
List < ReadTsKvQuery > queries = keys . stream ( ) . map ( key - > new BaseReadTsKvQuery ( key , cmd . getStartTs ( ) , cmd . getEndTs ( ) , cmd . getInterval ( ) , getLimit ( cmd . getLimit ( ) ) , getAggregation ( cmd . getAgg ( ) ) ) )
@ -572,7 +506,7 @@ public class DefaultWebSocketService implements WebSocketService {
FutureCallback < List < TsKvEntry > > callback = new FutureCallback < List < TsKvEntry > > ( ) {
@Override
public void onSuccess ( List < TsKvEntry > data ) {
sendWsMsg ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , data ) ) ;
sendUpdate ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , data ) ) ;
}
@Override
@ -585,7 +519,7 @@ public class DefaultWebSocketService implements WebSocketService {
update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR ,
FAILED_TO_FETCH_DATA ) ;
}
sendWsMsg ( sessionRef , update ) ;
sendUpdate ( sessionRef , update ) ;
}
} ;
accessValidator . validate ( sessionRef . getSecurityCtx ( ) , Operation . READ_TELEMETRY , entityId ,
@ -620,7 +554,7 @@ public class DefaultWebSocketService implements WebSocketService {
. updateProcessor ( ( subscription , update ) - > {
subLock . lock ( ) ;
try {
sendWsMsg ( subscription . getSessionId ( ) , update ) ;
sendUpdate ( subscription . getSessionId ( ) , update ) ;
} finally {
subLock . unlock ( ) ;
}
@ -631,7 +565,7 @@ public class DefaultWebSocketService implements WebSocketService {
subLock . lock ( ) ;
try {
oldSubService . addSubscription ( sub ) ;
sendWsMsg ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , attributesData ) ) ;
sendUpdate ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , attributesData ) ) ;
} finally {
subLock . unlock ( ) ;
}
@ -640,9 +574,7 @@ public class DefaultWebSocketService implements WebSocketService {
@Override
public void onFailure ( Throwable e ) {
log . error ( FAILED_TO_FETCH_ATTRIBUTES , e ) ;
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR ,
FAILED_TO_FETCH_ATTRIBUTES ) ;
sendWsMsg ( sessionRef , update ) ;
sendError ( sessionRef , cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR , FAILED_TO_FETCH_ATTRIBUTES ) ;
}
} ;
@ -660,20 +592,16 @@ public class DefaultWebSocketService implements WebSocketService {
}
String sessionId = sessionRef . getSessionId ( ) ;
log . debug ( "[{}] Processing: {}" , sessionId , cmd ) ;
if ( validateSessionMetadata ( sessionRef , cmd , sessionId ) ) {
if ( cmd . isUnsubscribe ( ) ) {
unsubscribe ( sessionRef , cmd , sessionId ) ;
} else if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
EntityId entityId = EntityIdFactory . getByTypeAndId ( cmd . getEntityType ( ) , cmd . getEntityId ( ) ) ;
Optional < Set < String > > keysOptional = getKeys ( cmd ) ;
if ( keysOptional . isPresent ( ) ) {
handleWsTimeSeriesSubscriptionByKeys ( sessionRef , cmd , sessionId , entityId ) ;
} else {
handleWsTimeSeriesSubscription ( sessionRef , cmd , sessionId , entityId ) ;
}
if ( cmd . isUnsubscribe ( ) ) {
unsubscribe ( sessionRef , cmd , sessionId ) ;
} else if ( validateSubscriptionCmd ( sessionRef , cmd ) ) {
EntityId entityId = EntityIdFactory . getByTypeAndId ( cmd . getEntityType ( ) , cmd . getEntityId ( ) ) ;
Optional < Set < String > > keysOptional = getKeys ( cmd ) ;
if ( keysOptional . isPresent ( ) ) {
handleWsTimeSeriesSubscriptionByKeys ( sessionRef , cmd , sessionId , entityId ) ;
} else {
handleWsTimeSeriesSubscription ( sessionRef , cmd , sessionId , entityId ) ;
}
}
}
@ -721,7 +649,7 @@ public class DefaultWebSocketService implements WebSocketService {
. updateProcessor ( ( subscription , update ) - > {
subLock . lock ( ) ;
try {
sendWsMsg ( subscription . getSessionId ( ) , update ) ;
sendUpdate ( subscription . getSessionId ( ) , update ) ;
} finally {
subLock . unlock ( ) ;
}
@ -734,7 +662,7 @@ public class DefaultWebSocketService implements WebSocketService {
subLock . lock ( ) ;
try {
oldSubService . addSubscription ( sub ) ;
sendWsMsg ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , data ) ) ;
sendUpdate ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , data ) ) ;
} finally {
subLock . unlock ( ) ;
}
@ -750,7 +678,7 @@ public class DefaultWebSocketService implements WebSocketService {
update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR ,
FAILED_TO_FETCH_DATA ) ;
}
sendWsMsg ( sessionRef , update ) ;
sendUpdate ( sessionRef , update ) ;
}
} ;
accessValidator . validate ( sessionRef . getSecurityCtx ( ) , Operation . READ_TELEMETRY , entityId ,
@ -776,7 +704,7 @@ public class DefaultWebSocketService implements WebSocketService {
. updateProcessor ( ( subscription , update ) - > {
subLock . lock ( ) ;
try {
sendWsMsg ( subscription . getSessionId ( ) , update ) ;
sendUpdate ( subscription . getSessionId ( ) , update ) ;
} finally {
subLock . unlock ( ) ;
}
@ -787,9 +715,9 @@ public class DefaultWebSocketService implements WebSocketService {
. build ( ) ;
subLock . lock ( ) ;
try {
try {
oldSubService . addSubscription ( sub ) ;
sendWsMsg ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , data ) ) ;
sendUpdate ( sessionRef , new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , data ) ) ;
} finally {
subLock . unlock ( ) ;
}
@ -802,9 +730,7 @@ public class DefaultWebSocketService implements WebSocketService {
} else {
log . info ( FAILED_TO_FETCH_DATA , e ) ;
}
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR ,
FAILED_TO_FETCH_DATA ) ;
sendWsMsg ( sessionRef , update ) ;
sendError ( sessionRef , cmd . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR , FAILED_TO_FETCH_DATA ) ;
}
} ;
}
@ -818,95 +744,77 @@ public class DefaultWebSocketService implements WebSocketService {
}
private boolean validateSubscriptionCmd ( WebSocketSessionRef sessionRef , EntityDataCmd cmd ) {
if ( cmd . getCmdId ( ) < 0 ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Cmd id is negative value!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
} else if ( cmd . getQuery ( ) = = null & & ! cmd . hasAnyCmd ( ) ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Query is empty!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
}
return true ;
return validateCmd ( sessionRef , cmd , ( ) - > {
if ( cmd . getQuery ( ) = = null & & ! cmd . hasAnyCmd ( ) ) {
throw new IllegalArgumentException ( "Query is empty!" ) ;
}
} ) ;
}
private boolean validateSubscriptionCmd ( WebSocketSessionRef sessionRef , EntityCountCmd cmd ) {
if ( cmd . getCmdId ( ) < 0 ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Cmd id is negative value!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
} else if ( cmd . getQuery ( ) = = null ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST , "Query is empty!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
}
return true ;
return validateCmd ( sessionRef , cmd , ( ) - > {
if ( cmd . getQuery ( ) = = null ) {
throw new IllegalArgumentException ( "Query is empty!" ) ;
}
} ) ;
}
private boolean validateSubscriptionCmd ( WebSocketSessionRef sessionRef , AlarmDataCmd cmd ) {
if ( cmd . getCmdId ( ) < 0 ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Cmd id is negative value!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
} else if ( cmd . getQuery ( ) = = null ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Query is empty!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
}
return true ;
return validateCmd ( sessionRef , cmd , ( ) - > {
if ( cmd . getQuery ( ) = = null ) {
throw new IllegalArgumentException ( "Query is empty!" ) ;
}
} ) ;
}
private boolean validateSubscriptionCmd ( WebSocketSessionRef sessionRef , SubscriptionCmd cmd ) {
if ( cmd . getEntityId ( ) = = null | | cmd . getEntityId ( ) . isEmpty ( ) ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Device id is empty!" ) ;
sendWsMsg ( sessionRef , update ) ;
return false ;
}
return true ;
}
private boolean validateSessionMetadata ( WebSocketSessionRef sessionRef , SubscriptionCmd cmd , String sessionId ) {
return validateSessionMetadata ( sessionRef , cmd . getCmdId ( ) , sessionId ) ;
return validateCmd ( sessionRef , cmd , ( ) - > {
if ( cmd . getEntityId ( ) = = null | | cmd . getEntityId ( ) . isEmpty ( ) ) {
throw new IllegalArgumentException ( "Device id is empty!" ) ;
}
} ) ;
}
private boolean validateSessionMetadata ( WebSocketSessionRef sessionRef , int cmdId , String sessionId ) {
WsSessionMetaData sessionMD = wsSessionsMap . get ( sessionId ) ;
if ( sessionMD = = null ) {
log . warn ( "[{}] Session meta data not found. " , sessionId ) ;
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmdId , SubscriptionErrorCode . INTERNAL_ERROR ,
SESSION_META_DATA_NOT_FOUND ) ;
sendWsMsg ( sessionRef , update ) ;
sendError ( sessionRef , cmdId , SubscriptionErrorCode . INTERNAL_ERROR , SESSION_META_DATA_NOT_FOUND ) ;
return false ;
} else {
return true ;
}
}
private boolean validateSubscriptionCmd ( WebSocketSessionRef sessionRef , AlarmCountCmd cmd ) {
private boolean validateCmd ( WebSocketSessionRef sessionRef , WsCmd cmd ) {
return validateCmd ( sessionRef , cmd , null ) ;
}
private < C extends WsCmd > boolean validateCmd ( WebSocketSessionRef sessionRef , C cmd , Runnable validator ) {
if ( cmd . getCmdId ( ) < 0 ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST ,
"Cmd id is negative value!" ) ;
sendWsMsg ( sessionRef , update ) ;
sendError ( sessionRef , cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST , "Cmd id is negative value!" ) ;
return false ;
}
try {
if ( validator ! = null ) {
validator . run ( ) ;
}
} catch ( Exception e ) {
sendError ( sessionRef , cmd . getCmdId ( ) , SubscriptionErrorCode . BAD_REQUEST , e . getMessage ( ) ) ;
return false ;
}
return true ;
}
private void sendWsMsg ( WebSocketSessionRef sessionRef , EntityDataUpdate update ) {
sendWsMsg ( sessionRef , update . getCmdId ( ) , update ) ;
private void sendUpdate ( WebSocketSessionRef sessionRef , EntityDataUpdate update ) {
sendUpdate ( sessionRef , update . getCmdId ( ) , update ) ;
}
private void sendWsMsg ( WebSocketSessionRef sessionRef , TelemetrySubscriptionUpdate update ) {
sendWsMsg ( sessionRef , update . getSubscriptionId ( ) , update ) ;
private void sendUpdate ( WebSocketSessionRef sessionRef , TelemetrySubscriptionUpdate update ) {
sendUpdate ( sessionRef , update . getSubscriptionId ( ) , update ) ;
}
private void sendWsMsg ( WebSocketSessionRef sessionRef , int cmdId , Object update ) {
private void sendUpdate ( WebSocketSessionRef sessionRef , int cmdId , Object update ) {
try {
String msg = JacksonUtil . OBJECT_MAPPER . writeValueAsString ( update ) ;
executor . submit ( ( ) - > {
@ -1055,47 +963,19 @@ public class DefaultWebSocketService implements WebSocketService {
. map ( TenantProfile : : getDefaultProfileConfiguration ) . orElse ( null ) ;
}
public static < W , C > WsCmdHandler < W , C > newCmdHandler ( java . util . function . Function < W , C > cmdExtractor ,
BiConsumer < WebSocketSessionRef , C > handler ) {
return new WsCmdHandler < > ( cmdExtractor , handler ) ;
}
public static < W , C > WsCmdListHandler < W , C > newCmdsHandler ( java . util . function . Function < W , List < C > > cmdsExtractor ,
BiConsumer < WebSocketSessionRef , C > handler ) {
return new WsCmdListHandler < > ( cmdsExtractor , handler ) ;
public static < C extends WsCmd > WsCmdHandler < C > newCmdHandler ( BiConsumer < WebSocketSessionRef , C > handler ) {
return new WsCmdHandler < > ( handler ) ;
}
@RequiredArgsConstructor
public static class WsCmdHandler < W , C > {
private final java . util . function . Function < W , C > cmdExtractor ;
private final BiConsumer < WebSocketSessionRef , C > handler ;
public C extractCmd ( W cmdsWrapper ) {
return cmdExtractor . apply ( cmdsWrapper ) ;
}
@Getter
@SuppressWarnings ( "unchecked" )
public static class WsCmdHandler < C extends WsCmd > {
protected final BiConsumer < WebSocketSessionRef , C > handler ;
@SuppressWarnings ( "unchecked" )
public void handle ( WebSocketSessionRef sessionRef , Object cmd ) {
public void handle ( WebSocketSessionRef sessionRef , WsCmd cmd ) {
handler . accept ( sessionRef , ( C ) cmd ) ;
}
}
@RequiredArgsConstructor
public static class WsCmdListHandler < W , C > {
private final java . util . function . Function < W , List < C > > cmdsExtractor ;
private final BiConsumer < WebSocketSessionRef , C > handler ;
public List < C > extractCmds ( W cmdsWrapper ) {
return cmdsExtractor . apply ( cmdsWrapper ) ;
}
@SuppressWarnings ( "unchecked" )
public void handle ( WebSocketSessionRef sessionRef , List < ? > cmds ) {
cmds . forEach ( cmd - > {
handler . accept ( sessionRef , ( C ) cmd ) ;
} ) ;
}
}
}