@ -22,7 +22,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter ;
import lombok.extern.slf4j.Slf4j ;
import org.checkerframework.checker.nullness.qual.Nullable ;
import org.jetbrains.annotations.NotNull ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.context.annotation.Lazy ;
@ -30,8 +29,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service ;
import org.springframework.web.socket.CloseStatus ;
import org.thingsboard.common.util.ThingsBoardThreadFactory ;
import org.thingsboard.server.common.data.id.CustomerId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.kv.Aggregation ;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery ;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery ;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult ;
@ -52,6 +50,9 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService ;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService ;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef ;
import org.thingsboard.server.service.telemetry.cmd.v2.AggHistoryCmd ;
import org.thingsboard.server.service.telemetry.cmd.v2.AggKey ;
import org.thingsboard.server.service.telemetry.cmd.v2.AggTimeSeriesCmd ;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd ;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate ;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd ;
@ -69,7 +70,6 @@ import javax.annotation.PreDestroy;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.HashMap ;
import java.util.LinkedHashMap ;
import java.util.LinkedHashSet ;
import java.util.List ;
import java.util.Map ;
@ -133,6 +133,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private int maxEntitiesPerAlarmSubscription ;
@Value ( "${server.ws.dynamic_page_link.max_alarm_queries_per_refresh_interval:10}" )
private int maxAlarmQueriesPerRefreshInterval ;
@Value ( "${ui.dashboard.max_datapoints_limit:50000}" )
private int maxDatapointLimit ;
private ExecutorService wsCallBackExecutor ;
private boolean tsInSqlDB ;
@ -167,7 +169,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
TbEntityDataSubCtx ctx = getSubCtx ( session . getSessionId ( ) , cmd . getCmdId ( ) ) ;
if ( ctx ! = null ) {
log . debug ( "[{}][{}] Updating existing subscriptions using: {}" , session . getSessionId ( ) , cmd . getCmdId ( ) , cmd ) ;
if ( cmd . getLatestCmd ( ) ! = null | | cmd . getTsCmd ( ) ! = null | | cmd . getHistor yCmd( ) ! = null ) {
if ( cmd . hasAn yCmd( ) ) {
ctx . clearEntitySubscriptions ( ) ;
}
} else {
@ -206,6 +208,18 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
finalCtx . setRefreshTask ( task ) ;
}
}
if ( cmd . getAggHistoryCmd ( ) ! = null ) {
handleAggHistoryCmd ( session , ctx , cmd . getAggHistoryCmd ( ) ) ;
} else if ( cmd . getAggTsCmd ( ) ! = null ) {
handleAggTsCmd ( session , ctx , cmd . getAggTsCmd ( ) ) ;
} else if ( cmd . hasRegularCmds ( ) ) {
handleRegularCommands ( session , ctx , cmd ) ;
} else {
checkAndSendInitialData ( ctx ) ;
}
}
private void handleRegularCommands ( TelemetryWebSocketSessionRef session , TbEntityDataSubCtx ctx , EntityDataCmd cmd ) {
ListenableFuture < TbEntityDataSubCtx > historyFuture ;
if ( cmd . getHistoryCmd ( ) ! = null ) {
log . trace ( "[{}][{}] Going to process history command: {}" , session . getSessionId ( ) , cmd . getCmdId ( ) , cmd . getHistoryCmd ( ) ) ;
@ -229,10 +243,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
if ( cmd . getTsCmd ( ) ! = null ) {
handleTimeSeriesCmd ( theCtx , cmd . getTsCmd ( ) ) ;
}
} else if ( ! theCtx . isInitialDataSent ( ) ) {
EntityDataUpdate update = new EntityDataUpdate ( theCtx . getCmdId ( ) , theCtx . getData ( ) , null , theCtx . getMaxEntitiesPerDataSubscription ( ) ) ;
theCtx . sendWsMsg ( update ) ;
theCtx . setInitialDataSent ( true ) ;
} else {
checkAndSendInitialData ( theCtx ) ;
}
} catch ( RuntimeException e ) {
handleWsCmdRuntimeException ( theCtx . getSessionId ( ) , e , cmd ) ;
@ -246,6 +258,94 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
} , wsCallBackExecutor ) ;
}
private void checkAndSendInitialData ( @Nullable TbEntityDataSubCtx theCtx ) {
if ( ! theCtx . isInitialDataSent ( ) ) {
EntityDataUpdate update = new EntityDataUpdate ( theCtx . getCmdId ( ) , theCtx . getData ( ) , null , theCtx . getMaxEntitiesPerDataSubscription ( ) ) ;
theCtx . sendWsMsg ( update ) ;
theCtx . setInitialDataSent ( true ) ;
}
}
private void handleAggHistoryCmd ( TelemetryWebSocketSessionRef session , TbEntityDataSubCtx ctx , AggHistoryCmd cmd ) {
var keys = cmd . getKeys ( ) ;
long interval = cmd . getEndTs ( ) - cmd . getStartTs ( ) ;
List < ReadTsKvQuery > queries = keys . stream ( ) . map ( key - > new BaseReadTsKvQuery (
key . getKey ( ) , cmd . getStartTs ( ) , cmd . getEndTs ( ) , interval , 1 , key . getAgg ( )
) ) . distinct ( ) . collect ( Collectors . toList ( ) ) ;
handleAggCmd ( session , ctx , cmd . getKeys ( ) , queries , cmd . getStartTs ( ) , cmd . getEndTs ( ) , false , false ) ;
}
private void handleAggTsCmd ( TelemetryWebSocketSessionRef session , TbEntityDataSubCtx ctx , AggTimeSeriesCmd cmd ) {
long endTs = cmd . getStartTs ( ) + cmd . getTimeWindow ( ) ;
List < ReadTsKvQuery > queries = cmd . getKeys ( ) . stream ( ) . map ( key - > {
if ( cmd . isFloating ( ) ) {
return new BaseReadTsKvQuery ( key . getKey ( ) , cmd . getStartTs ( ) , endTs , cmd . getTimeWindow ( ) , getLimit ( maxDatapointLimit ) , Aggregation . NONE ) ;
} else {
return new BaseReadTsKvQuery ( key . getKey ( ) , cmd . getStartTs ( ) , endTs , cmd . getTimeWindow ( ) , 1 , key . getAgg ( ) ) ;
}
} ) . distinct ( ) . collect ( Collectors . toList ( ) ) ;
handleAggCmd ( session , ctx , cmd . getKeys ( ) , queries , cmd . getStartTs ( ) , endTs , cmd . isFloating ( ) , true ) ;
}
private void handleAggCmd ( TelemetryWebSocketSessionRef session , TbEntityDataSubCtx ctx , List < AggKey > keys , List < ReadTsKvQuery > queries ,
long startTs , long endTs , boolean floating , boolean subscribe ) {
Map < EntityData , ListenableFuture < List < ReadTsKvQueryResult > > > fetchResultMap = new HashMap < > ( ) ;
List < EntityData > entityDataList = ctx . getData ( ) . getData ( ) ;
entityDataList . forEach ( entityData - > fetchResultMap . put ( entityData ,
tsService . findAllByQueries ( ctx . getTenantId ( ) , entityData . getEntityId ( ) , queries ) ) ) ;
Futures . transform ( Futures . allAsList ( fetchResultMap . values ( ) ) , f - > {
// Map that holds last ts for each key for each entity.
Map < EntityData , Map < String , Long > > lastTsEntityMap = new HashMap < > ( ) ;
fetchResultMap . forEach ( ( entityData , future ) - > {
try {
Map < String , Long > lastTsMap = new HashMap < > ( ) ;
lastTsEntityMap . put ( entityData , lastTsMap ) ;
List < ReadTsKvQueryResult > queryResults = future . get ( ) ;
if ( queryResults ! = null ) {
for ( ReadTsKvQueryResult queryResult : queryResults ) {
if ( floating ) {
entityData . getAggFloating ( ) . put ( queryResult . getKey ( ) , queryResult . toTsValues ( ) ) ;
} else {
entityData . getAggLatest ( ) . computeIfAbsent ( queryResult . getAgg ( ) , agg - > new HashMap < > ( ) ) . put ( queryResult . getKey ( ) , queryResult . toTsValue ( ) ) ;
}
lastTsMap . put ( queryResult . getKey ( ) , queryResult . getLastEntryTs ( ) ) ;
}
}
// Populate with empty values if no data found.
keys . forEach ( key - > {
if ( floating ) {
entityData . getAggFloating ( ) . putIfAbsent ( key . getKey ( ) , new TsValue [ ] { TsValue . EMPTY } ) ;
} else {
entityData . getAggLatest ( ) . computeIfAbsent ( key . getAgg ( ) , agg - > new HashMap < > ( ) ) . putIfAbsent ( key . getKey ( ) , TsValue . EMPTY ) ;
}
} ) ;
} catch ( InterruptedException | ExecutionException e ) {
log . warn ( "[{}][{}][{}] Failed to fetch historical data" , ctx . getSessionId ( ) , ctx . getCmdId ( ) , entityData . getEntityId ( ) , e ) ;
ctx . sendWsMsg ( new EntityDataUpdate ( ctx . getCmdId ( ) , SubscriptionErrorCode . INTERNAL_ERROR . getCode ( ) , "Failed to fetch historical data!" ) ) ;
}
} ) ;
ctx . getWsLock ( ) . lock ( ) ;
try {
EntityDataUpdate update ;
if ( ! ctx . isInitialDataSent ( ) ) {
update = new EntityDataUpdate ( ctx . getCmdId ( ) , ctx . getData ( ) , null , ctx . getMaxEntitiesPerDataSubscription ( ) ) ;
ctx . setInitialDataSent ( true ) ;
} else {
update = new EntityDataUpdate ( ctx . getCmdId ( ) , null , entityDataList , ctx . getMaxEntitiesPerDataSubscription ( ) ) ;
}
if ( subscribe ) {
ctx . createTimeSeriesSubscriptions ( lastTsEntityMap , startTs , endTs , true ) ;
}
ctx . sendWsMsg ( update ) ;
entityDataList . forEach ( ed - > ed . getTimeseries ( ) . clear ( ) ) ;
} finally {
ctx . getWsLock ( ) . unlock ( ) ;
}
return ctx ;
} , wsCallBackExecutor ) ;
}
private void handleWsCmdRuntimeException ( String sessionId , RuntimeException e , EntityDataCmd cmd ) {
log . debug ( "[{}] Failed to process ws cmd: {}" , sessionId , cmd , e ) ;
wsService . close ( sessionId , CloseStatus . SERVICE_RESTARTED ) ;
@ -420,12 +520,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private ListenableFuture < TbEntityDataSubCtx > handleGetTsCmd ( TbEntityDataSubCtx ctx , GetTsCmd cmd , boolean subscribe ) {
List < String > keys = cmd . getKeys ( ) ;
List < ReadTsKvQuery > finalTsKvQueryList ;
List < ReadTsKvQuery > tsKvQueryList = cmd . getKeys ( ) . stream ( ) . map ( key - > new BaseReadTsKvQuery (
List < ReadTsKvQuery > tsKvQueryList = keys . stream ( ) . map ( key - > new BaseReadTsKvQuery (
key , cmd . getStartTs ( ) , cmd . getEndTs ( ) , cmd . getInterval ( ) , getLimit ( cmd . getLimit ( ) ) , cmd . getAgg ( )
) ) . collect ( Collectors . toList ( ) ) ;
if ( cmd . isFetchLatestPreviousPoint ( ) ) {
finalTsKvQueryList = new ArrayList < > ( tsKvQueryList ) ;
finalTsKvQueryList . addAll ( cmd . getKeys ( ) . stream ( ) . map ( key - > new BaseReadTsKvQuery (
finalTsKvQueryList . addAll ( keys . stream ( ) . map ( key - > new BaseReadTsKvQuery (
key , cmd . getStartTs ( ) - TimeUnit . DAYS . toMillis ( 365 ) , cmd . getStartTs ( ) , cmd . getInterval ( ) , 1 , cmd . getAgg ( )
) ) . collect ( Collectors . toList ( ) ) ) ;
} else {
@ -451,7 +551,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
}
// Populate with empty values if no data found.
cmd . getKeys ( ) . forEach ( key - > {
keys . forEach ( key - > {
if ( ! entityData . getTimeseries ( ) . containsKey ( key ) ) {
entityData . getTimeseries ( ) . put ( key , new TsValue [ 0 ] ) ;
}
@ -475,7 +575,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
update = new EntityDataUpdate ( ctx . getCmdId ( ) , null , entityDataList , ctx . getMaxEntitiesPerDataSubscription ( ) ) ;
}
if ( subscribe ) {
ctx . createTimes eriesSubscriptions ( lastTsEntityMap , cmd . getStartTs ( ) , cmd . getEndTs ( ) ) ;
ctx . createTimeS eriesSubscriptions ( lastTsEntityMap , cmd . getStartTs ( ) , cmd . getEndTs ( ) ) ;
}
ctx . sendWsMsg ( update ) ;
entityDataList . forEach ( ed - > ed . getTimeseries ( ) . clear ( ) ) ;
@ -546,11 +646,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
ctx . getWsLock ( ) . lock ( ) ;
try {
ctx . createLatestValuesSubscriptions ( latestCmd . getKeys ( ) ) ;
if ( ! ctx . isInitialDataSent ( ) ) {
EntityDataUpdate update = new EntityDataUpdate ( ctx . getCmdId ( ) , ctx . getData ( ) , null , ctx . getMaxEntitiesPerDataSubscription ( ) ) ;
ctx . sendWsMsg ( update ) ;
ctx . setInitialDataSent ( true ) ;
}
checkAndSendInitialData ( ctx ) ;
} finally {
ctx . getWsLock ( ) . unlock ( ) ;
}