@ -17,10 +17,8 @@ package org.thingsboard.server.service.subscription;
import lombok.RequiredArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.context.event.EventListener ;
import org.springframework.stereotype.Service ;
import org.thingsboard.common.util.DonAsynchron ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.ThingsBoardThreadFactory ;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg ;
import org.thingsboard.server.cluster.TbClusterService ;
import org.thingsboard.server.common.data.DataConstants ;
@ -30,24 +28,15 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.id.UserId ;
import org.thingsboard.server.common.data.kv.Aggregation ;
import org.thingsboard.server.common.data.kv.AttributeKvEntry ;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQue ry ;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEnt ry ;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry ;
import org.thingsboard.server.common.data.kv.KvEntry ;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery ;
import org.thingsboard.server.common.data.kv.StringDataEntry ;
import org.thingsboard.server.common.data.kv.TsKvEntry ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.dao.attributes.AttributesService ;
import org.thingsboard.server.dao.timeseries.TimeseriesService ;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto ;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmSubscriptionUpdateProto ;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto ;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue ;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateValueListProto ;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg ;
import org.thingsboard.server.queue.TbQueueProducer ;
import org.thingsboard.server.queue.common.TbProtoQueueMsg ;
@ -56,31 +45,25 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener ;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider ;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent ;
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent ;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider ;
import org.thingsboard.server.queue.util.TbCoreComponent ;
import org.thingsboard.server.service.state.DefaultDeviceStateService ;
import org.thingsboard.server.service.state.DeviceStateService ;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate ;
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate ;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate ;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate ;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.util.ArrayList ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Set ;
import java.util.TreeMap ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.function.Function ;
import java.util.function.Predicate ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
@Slf4j
@TbCoreComponent
@ -88,8 +71,6 @@ import java.util.function.Predicate;
@RequiredArgsConstructor
public class DefaultSubscriptionManagerService extends TbApplicationEventListener < PartitionChangeEvent > implements SubscriptionManagerService {
private final AttributesService attrService ;
private final TimeseriesService tsService ;
private final NotificationsTopicService notificationsTopicService ;
private final PartitionService partitionService ;
private final TbServiceInfoProvider serviceInfoProvider ;
@ -97,126 +78,89 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
private final TbLocalSubscriptionService localSubscriptionService ;
private final DeviceStateService deviceStateService ;
private final TbClusterService clusterService ;
private final SubscriptionSchedulerComponent scheduler ;
private final Map < EntityId , Set < TbSubscription > > subscriptionsByEntityId = new ConcurrentHashMap < > ( ) ;
private final Map < String , Map < Integer , TbSubscription > > subscriptionsByWsSessionId = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < TopicPartitionInfo , Set < TbSubscription > > partitionedSubscriptions = new ConcurrentHashMap < > ( ) ;
private final Set < TopicPartitionInfo > currentPartitions = ConcurrentHashMap . newKeySet ( ) ;
private final Lock subsLock = new ReentrantLock ( ) ;
private final ConcurrentMap < EntityId , TbEntityRemoteSubsInfo > entitySubscriptions = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < EntityId , TbEntityUpdatesInfo > entityUpdates = new ConcurrentHashMap < > ( ) ;
private ExecutorService tsCallBackExecutor ;
private String serviceId ;
private TbQueueProducer < TbProtoQueueMsg < ToCoreNotificationMsg > > toCoreNotificationsProducer ;
private long initTs ;
@PostConstruct
public void initExecutor ( ) {
tsCallBackExecutor = Executors . newSingleThreadExecutor ( ThingsBoardThreadFactory . forName ( "ts-sub-callback" ) ) ;
serviceId = serviceInfoProvider . getServiceId ( ) ;
initTs = System . currentTimeMillis ( ) ;
toCoreNotificationsProducer = producerProvider . getTbCoreNotificationsMsgProducer ( ) ;
}
@PreDestroy
public void shutdownExecutor ( ) {
if ( tsCallBackExecutor ! = null ) {
tsCallBackExecutor . shutdownNow ( ) ;
}
scheduler . scheduleWithFixedDelay ( this : : cleanupEntityUpdates , 1 , 1 , TimeUnit . HOURS ) ;
}
@Override
public void addSubscription ( TbSubscription subscription , TbCallback callback ) {
log . trace ( "[{}][{}][{}] Registering subscription for entity [{}]" ,
subscription . getServiceId ( ) , subscription . getSessionId ( ) , subscription . getSubscriptionId ( ) , subscription . getEntityId ( ) ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , subscription . getTenantId ( ) , subscription . getEntityId ( ) ) ;
if ( currentPartitions . contains ( tpi ) ) {
partitionedSubscriptions . computeIfAbsent ( tpi , k - > ConcurrentHashMap . newKeySet ( ) ) . add ( subscription ) ;
public void onSubEvent ( String serviceId , TbEntitySubEvent event , TbCallback callback ) {
var tenantId = event . getTenantId ( ) ;
var entityId = event . getEntityId ( ) ;
log . trace ( "[{}][{}][{}] Processing subscription event {}" , tenantId , entityId , serviceId , event ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , entityId ) ;
if ( tpi . isMyPartition ( ) ) {
subsLock . lock ( ) ;
try {
var entitySubs = entitySubscriptions . computeIfAbsent ( entityId , id - > new TbEntityRemoteSubsInfo ( tenantId , entityId ) ) ;
boolean empty = entitySubs . updateAndCheckIsEmpty ( serviceId , event ) ;
if ( empty ) {
entitySubscriptions . remove ( entityId ) ;
}
} finally {
subsLock . unlock ( ) ;
}
callback . onSuccess ( ) ;
if ( event . hasTsOrAttrSub ( ) ) {
sendSubEventCallback ( serviceId , entityId , event . getSeqNumber ( ) ) ;
}
} else {
log . warn ( "[{}][{}] Entity belongs to external partition. Probably rebalancing is in progress. Topic: {}"
, subscription . getTenantId ( ) , subscription . getEntityId ( ) , tpi . getFullTopicName ( ) ) ;
log . warn ( "[{}][{}][{}] Eve nt belongs to external partition. Probably re- balancing is in progress. Topic: {}"
, tenantId , entityId , serviceId , tpi . getFullTopicName ( ) ) ;
callback . onFailure ( new RuntimeException ( "Entity belongs to external partition " + tpi . getFullTopicName ( ) + "!" ) ) ;
}
boolean newSubscription = subscriptionsByEntityId
. computeIfAbsent ( subscription . getEntityId ( ) , k - > ConcurrentHashMap . newKeySet ( ) ) . add ( subscription ) ;
subscriptionsByWsSessionId . computeIfAbsent ( subscription . getSessionId ( ) , k - > new ConcurrentHashMap < > ( ) ) . put ( subscription . getSubscriptionId ( ) , subscription ) ;
if ( newSubscription ) {
switch ( subscription . getType ( ) ) {
case TIMESERIES :
handleNewTelemetrySubscription ( ( TbTimeseriesSubscription ) subscription ) ;
break ;
case ATTRIBUTES :
handleNewAttributeSubscription ( ( TbAttributeSubscription ) subscription ) ;
break ;
case ALARMS :
handleNewAlarmsSubscription ( ( TbAlarmsSubscription ) subscription ) ;
break ;
}
}
}
@Override
public void cancelSubscription ( String sessionId , int subscriptionId , TbCallback callback ) {
log . debug ( "[{}][{}] Going to remove subscription." , sessionId , subscriptionId ) ;
Map < Integer , TbSubscription > sessionSubscriptions = subscriptionsByWsSessionId . get ( sessionId ) ;
if ( sessionSubscriptions ! = null ) {
TbSubscription subscription = sessionSubscriptions . remove ( subscriptionId ) ;
if ( subscription ! = null ) {
removeSubscriptionFromEntityMap ( subscription ) ;
removeSubscriptionFromPartitionMap ( subscription ) ;
if ( sessionSubscriptions . isEmpty ( ) ) {
subscriptionsByWsSessionId . remove ( sessionId ) ;
}
} else {
log . debug ( "[{}][{}] Subscription not found!" , sessionId , subscriptionId ) ;
@EventListener ( OtherServiceShutdownEvent . class )
public void onApplicationEvent ( OtherServiceShutdownEvent event ) {
if ( event . getServiceTypes ( ) ! = null & & event . getServiceTypes ( ) . contains ( ServiceType . TB_CORE ) ) {
subsLock . lock ( ) ;
try {
int sizeBeforeCleanup = entitySubscriptions . size ( ) ;
entitySubscriptions . entrySet ( ) . removeIf ( kv - > kv . getValue ( ) . removeAndCheckIsEmpty ( event . getServiceId ( ) ) ) ;
log . info ( "[{}][{}] Removed {} entity subscription records due to server shutdown." , serviceId , event . getServiceId ( ) , entitySubscriptions . size ( ) - sizeBeforeCleanup ) ;
} finally {
subsLock . unlock ( ) ;
}
}
}
private void sendSubEventCallback ( String targetId , EntityId entityId , int seqNumber ) {
var update = getEntityUpdatesInfo ( entityId ) ;
if ( serviceId . equals ( targetId ) ) {
localSubscriptionService . onSubEventCallback ( entityId , seqNumber , update , TbCallback . EMPTY ) ;
} else {
log . debug ( "[{}] No session subscriptions found!" , sessionId ) ;
sendCoreNotification ( targetId , entityId , TbSubscriptionUtils . toProto ( entityId . getId ( ) , seqNumber , update ) ) ;
}
callback . onSuccess ( ) ;
}
@Override
protected void onTbApplicationEvent ( PartitionChangeEvent partitionChangeEvent ) {
if ( ServiceType . TB_CORE . equals ( partitionChangeEvent . getServiceType ( ) ) ) {
Set < TopicPartitionInfo > removedPartitions = new HashSet < > ( currentPartitions ) ;
removedPartitions . removeAll ( partitionChangeEvent . getPartitions ( ) ) ;
currentPartitions . clear ( ) ;
currentPartitions . addAll ( partitionChangeEvent . getPartitions ( ) ) ;
// We no longer manage current partition of devices;
removedPartitions . forEach ( partition - > {
Set < TbSubscription > subs = partitionedSubscriptions . remove ( partition ) ;
if ( subs ! = null ) {
subs . forEach ( sub - > {
if ( ! serviceId . equals ( sub . getServiceId ( ) ) ) {
removeSubscriptionFromEntityMap ( sub ) ;
}
} ) ;
}
} ) ;
entitySubscriptions . values ( ) . removeIf ( sub - >
! partitionService . resolve ( ServiceType . TB_CORE , sub . getTenantId ( ) , sub . getEntityId ( ) ) . isMyPartition ( ) ) ;
}
}
@Override
public void onTimeSeriesUpdate ( TenantId tenantId , EntityId entityId , List < TsKvEntry > ts , TbCallback callback ) {
onLocalTelemetrySubUpdate ( entityId ,
s - > {
if ( TbSubscriptionType . TIMESERIES . equals ( s . getType ( ) ) ) {
return ( TbTimeseriesSubscription ) s ;
} else {
return null ;
}
} , s - > true , s - > {
List < TsKvEntry > subscriptionUpdate = null ;
for ( TsKvEntry kv : ts ) {
if ( ( s . isAllKeys ( ) | | s . getKeyStates ( ) . containsKey ( ( kv . getKey ( ) ) ) ) ) {
if ( subscriptionUpdate = = null ) {
subscriptionUpdate = new ArrayList < > ( ) ;
}
subscriptionUpdate . add ( kv ) ;
}
}
return subscriptionUpdate ;
} , true ) ;
onTimeSeriesUpdate ( entityId , ts ) ;
if ( entityId . getEntityType ( ) = = EntityType . DEVICE ) {
updateDeviceInactivityTimeout ( tenantId , entityId , ts ) ;
}
@ -224,168 +168,68 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
}
@Override
public void onAttributesUpdate ( TenantId tenantId , EntityId entityId , String scope , List < AttributeKvEntry > attributes , TbCallback callback ) {
onAttributesUpdate ( tenantId , entityId , scope , attributes , true , callback ) ;
}
@Override
public void onAttributesUpdate ( TenantId tenantId , EntityId entityId , String scope , List < AttributeKvEntry > attributes , boolean notifyDevice , TbCallback callback ) {
onLocalTelemetrySubUpdate ( entityId ,
s - > {
if ( TbSubscriptionType . ATTRIBUTES . equals ( s . getType ( ) ) ) {
return ( TbAttributeSubscription ) s ;
} else {
return null ;
}
} ,
s - > ( TbAttributeSubscriptionScope . ANY_SCOPE . equals ( s . getScope ( ) ) | | scope . equals ( s . getScope ( ) . name ( ) ) ) ,
s - > {
List < TsKvEntry > subscriptionUpdate = null ;
for ( AttributeKvEntry kv : attributes ) {
if ( s . isAllKeys ( ) | | s . getKeyStates ( ) . containsKey ( kv . getKey ( ) ) ) {
if ( subscriptionUpdate = = null ) {
subscriptionUpdate = new ArrayList < > ( ) ;
}
subscriptionUpdate . add ( new BasicTsKvEntry ( kv . getLastUpdateTs ( ) , kv ) ) ;
}
}
return subscriptionUpdate ;
} , true ) ;
public void onTimeSeriesDelete ( TenantId tenantId , EntityId entityId , List < String > keys , TbCallback callback ) {
onTimeSeriesUpdate ( entityId ,
keys . stream ( ) . map ( key - > new BasicTsKvEntry ( 0 , new StringDataEntry ( key , "" ) ) ) . collect ( Collectors . toList ( ) ) ) ;
if ( entityId . getEntityType ( ) = = EntityType . DEVICE ) {
if ( TbAttributeSubscriptionScope . SERVER_SCOPE . name ( ) . equalsIgnoreCase ( scope ) ) {
updateDeviceInactivityTimeout ( tenantId , entityId , attributes ) ;
} else if ( TbAttributeSubscriptionScope . SHARED_SCOPE . name ( ) . equalsIgnoreCase ( scope ) & & notifyDevice ) {
clusterService . pushMsgToCore ( DeviceAttributesEventNotificationMsg . onUpdate ( tenantId ,
new DeviceId ( entityId . getId ( ) ) , DataConstants . SHARED_SCOPE , new ArrayList < > ( attributes ) )
, null ) ;
}
deleteDeviceInactivityTimeout ( tenantId , entityId , keys ) ;
}
callback . onSuccess ( ) ;
}
private void updateDeviceInactivityTimeout ( TenantId tenantId , EntityId entityId , List < ? extends KvEntry > kvEntries ) {
for ( KvEntry kvEntry : kvEntries ) {
if ( kvEntry . getKey ( ) . equals ( DefaultDeviceStateService . INACTIVITY_TIMEOUT ) ) {
deviceStateService . onDeviceInactivityTimeoutUpdate ( tenantId , new DeviceId ( entityId . getId ( ) ) , getLongValue ( kvEntry ) ) ;
}
public void onTimeSeriesUpdate ( EntityId entityId , List < TsKvEntry > update ) {
getEntityUpdatesInfo ( entityId ) . timeSeriesUpdateTs = System . currentTimeMillis ( ) ;
TbEntityRemoteSubsInfo subInfo = entitySubscriptions . get ( entityId ) ;
if ( subInfo ! = null ) {
log . trace ( "[{}] Handling time-series update: {}" , entityId , update ) ;
subInfo . getSubs ( ) . forEach ( ( serviceId , sub ) - > {
if ( sub . tsAllKeys ) {
onTimeSeriesUpdate ( serviceId , entityId , update ) ;
} else if ( sub . tsKeys ! = null ) {
List < TsKvEntry > tmp = getSubList ( update , sub . tsKeys ) ;
if ( tmp ! = null ) {
onTimeSeriesUpdate ( serviceId , entityId , tmp ) ;
}
}
} ) ;
} else {
log . trace ( "[{}] No time-series subscriptions for entity." , entityId ) ;
}
}
private void deleteDeviceInactivityTimeout ( TenantId tenantId , EntityId entityId , List < String > keys ) {
for ( String key : keys ) {
if ( key . equals ( DefaultDeviceStateService . INACTIVITY_TIMEOUT ) ) {
deviceStateService . onDeviceInactivityTimeoutUpdate ( tenantId , new DeviceId ( entityId . getId ( ) ) , 0 ) ;
}
private void onTimeSeriesUpdate ( String targe tId, EntityId entityId , List < TsKvEntry > update ) {
if ( serviceId . equals ( targetId ) ) {
localSubscriptionService . onTimeSeriesUpdate ( entityId , update , TbCallback . EMPTY ) ;
} else {
sendCoreNotification ( targetId , entityId , TbSubscriptionUtils . toProto ( true , entityId , update ) ) ;
}
}
@Override
public void onAlarmUpdate ( TenantId tenantId , EntityId entityId , AlarmInfo alarm , TbCallback callback ) {
onLocalAlarmSubUpdate ( entityId ,
s - > {
if ( TbSubscriptionType . ALARMS . equals ( s . getType ( ) ) ) {
return ( TbAlarmsSubscription ) s ;
} else {
return null ;
}
} ,
s - > alarm . getCreatedTime ( ) > = s . getTs ( ) | | alarm . getAssignTs ( ) > = s . getTs ( ) ,
alarm , false
) ;
callback . onSuccess ( ) ;
}
@Override
public void onAlarmDeleted ( TenantId tenantId , EntityId entityId , AlarmInfo alarm , TbCallback callback ) {
onLocalAlarmSubUpdate ( entityId ,
s - > {
if ( TbSubscriptionType . ALARMS . equals ( s . getType ( ) ) ) {
return ( TbAlarmsSubscription ) s ;
} else {
return null ;
}
} ,
s - > alarm . getCreatedTime ( ) > = s . getTs ( ) ,
alarm , true
) ;
callback . onSuccess ( ) ;
}
@Override
public void onNotificationUpdate ( TenantId tenantId , UserId recipientId , NotificationUpdate notificationUpdate , TbCallback callback ) {
Set < TbSubscription > subscriptions = subscriptionsByEntityId . get ( recipientId ) ;
if ( subscriptions ! = null ) {
NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate ( notificationUpdate ) ;
log . trace ( "Handling notificationUpdate for user {}: {}" , recipientId , notificationUpdate ) ;
subscriptions . stream ( )
. filter ( subscription - > subscription . getType ( ) = = TbSubscriptionType . NOTIFICATIONS
| | subscription . getType ( ) = = TbSubscriptionType . NOTIFICATIONS_COUNT )
. forEach ( subscription - > onNotificationsSubUpdate ( subscriptionUpdate , subscription ) ) ;
}
callback . onSuccess ( ) ;
public void onAttributesUpdate ( TenantId tenantId , EntityId entityId , String scope , List < AttributeKvEntry > attributes , TbCallback callback ) {
onAttributesUpdate ( tenantId , entityId , scope , attributes , true , callback ) ;
}
@Override
public void onNotificationRequestUpdate ( TenantId tenantId , NotificationRequestUpdate notificationRequestUpdate , TbCallback callback ) {
NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate ( notificationRequestUpdate ) ;
subscriptionsByEntityId . forEach ( ( entityId , subscriptions ) - > {
if ( entityId . getEntityType ( ) ! = EntityType . USER ) {
return ;
public void onAttributesUpdate ( TenantId tenantId , EntityId entityId , String scope , List < AttributeKvEntry > attributes , boolean notifyDevice , TbCallback callback ) {
getEntityUpdatesInfo ( entityId ) . attributesUpdateTs = System . currentTimeMillis ( ) ;
processAttributesUpdate ( entityId , attributes ) ;
if ( entityId . getEntityType ( ) = = EntityType . DEVICE ) {
if ( TbAttributeSubscriptionScope . SERVER_SCOPE . name ( ) . equalsIgnoreCase ( scope ) ) {
updateDeviceInactivityTimeout ( tenantId , entityId , attributes ) ;
} else if ( TbAttributeSubscriptionScope . SHARED_SCOPE . name ( ) . equalsIgnoreCase ( scope ) & & notifyDevice ) {
clusterService . pushMsgToCore ( DeviceAttributesEventNotificationMsg . onUpdate ( tenantId ,
new DeviceId ( entityId . getId ( ) ) , DataConstants . SHARED_SCOPE , new ArrayList < > ( attributes ) )
, null ) ;
}
log . trace ( "Handling notificationRequestUpdate for user {}: {}" , entityId , notificationRequestUpdate ) ;
subscriptions . forEach ( subscription - > {
if ( subscription . getType ( ) ! = TbSubscriptionType . NOTIFICATIONS & &
subscription . getType ( ) ! = TbSubscriptionType . NOTIFICATIONS_COUNT ) {
return ;
}
if ( ! subscription . getTenantId ( ) . equals ( tenantId ) ) {
return ;
}
onNotificationsSubUpdate ( subscriptionUpdate , subscription ) ;
} ) ;
} ) ;
callback . onSuccess ( ) ;
}
private void onNotificationsSubUpdate ( NotificationsSubscriptionUpdate subscriptionUpdate , TbSubscription subscription ) {
if ( serviceId . equals ( subscription . getServiceId ( ) ) ) {
log . trace ( "[{}][{}][{}] Subscription session is managed by current service, forwarding to localSubscriptionService (update: {})" ,
subscription . getServiceId ( ) , subscription . getEntityId ( ) , subscription . getSessionId ( ) , subscriptionUpdate ) ;
localSubscriptionService . onSubscriptionUpdate ( subscription . getSessionId ( ) ,
subscription . getSubscriptionId ( ) , subscriptionUpdate , TbCallback . EMPTY ) ;
} else {
log . trace ( "[{}][{}][{}] Subscription session is not managed by current service (update: {})" ,
subscription . getServiceId ( ) , subscription . getEntityId ( ) , subscription . getSessionId ( ) , subscriptionUpdate ) ;
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , subscription . getServiceId ( ) ) ;
ToCoreNotificationMsg updateProto = TbSubscriptionUtils . notificationsSubUpdateToProto ( subscription , subscriptionUpdate ) ;
TbProtoQueueMsg < ToCoreNotificationMsg > queueMsg = new TbProtoQueueMsg < > ( subscription . getEntityId ( ) . getId ( ) , updateProto ) ;
toCoreNotificationsProducer . send ( tpi , queueMsg , null ) ;
}
callback . onSuccess ( ) ;
}
@Override
public void onAttributesDelete ( TenantId tenantId , EntityId entityId , String scope , List < String > keys , boolean notifyDevice , TbCallback callback ) {
onLocalTelemetrySubUpdate ( entityId ,
s - > {
if ( TbSubscriptionType . ATTRIBUTES . equals ( s . getType ( ) ) ) {
return ( TbAttributeSubscription ) s ;
} else {
return null ;
}
} ,
s - > ( TbAttributeSubscriptionScope . ANY_SCOPE . equals ( s . getScope ( ) ) | | scope . equals ( s . getScope ( ) . name ( ) ) ) ,
s - > {
List < TsKvEntry > subscriptionUpdate = null ;
for ( String key : keys ) {
if ( s . isAllKeys ( ) | | s . getKeyStates ( ) . containsKey ( key ) ) {
if ( subscriptionUpdate = = null ) {
subscriptionUpdate = new ArrayList < > ( ) ;
}
subscriptionUpdate . add ( new BasicTsKvEntry ( 0 , new StringDataEntry ( key , "" ) ) ) ;
}
}
return subscriptionUpdate ;
} , false ) ;
processAttributesUpdate ( entityId ,
keys . stream ( ) . map ( key - > new BaseAttributeKvEntry ( 0 , new StringDataEntry ( key , "" ) ) ) . collect ( Collectors . toList ( ) ) ) ;
if ( entityId . getEntityType ( ) = = EntityType . DEVICE ) {
if ( TbAttributeSubscriptionScope . SERVER_SCOPE . name ( ) . equalsIgnoreCase ( scope )
| | TbAttributeSubscriptionScope . ANY_SCOPE . name ( ) . equalsIgnoreCase ( scope ) ) {
@ -398,222 +242,117 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
callback . onSuccess ( ) ;
}
@Override
public void onTimeSeriesDelete ( TenantId tenantId , EntityId entityId , List < String > keys , TbCallback callback ) {
onLocalTelemetrySubUpdate ( entityId ,
s - > {
if ( TbSubscriptionType . TIMESERIES . equals ( s . getType ( ) ) ) {
return ( TbTimeseriesSubscription ) s ;
} else {
return null ;
}
} , s - > true , s - > {
List < TsKvEntry > subscriptionUpdate = null ;
for ( String key : keys ) {
if ( s . isAllKeys ( ) | | s . getKeyStates ( ) . containsKey ( key ) ) {
if ( subscriptionUpdate = = null ) {
subscriptionUpdate = new ArrayList < > ( ) ;
}
subscriptionUpdate . add ( new BasicTsKvEntry ( 0 , new StringDataEntry ( key , "" ) ) ) ;
}
}
return subscriptionUpdate ;
} , false ) ;
if ( entityId . getEntityType ( ) = = EntityType . DEVICE ) {
deleteDeviceInactivityTimeout ( tenantId , entityId , keys ) ;
}
callback . onSuccess ( ) ;
}
private < T extends TbSubscription > void onLocalTelemetrySubUpdate ( EntityId entityId ,
Function < TbSubscription , T > castFunction ,
Predicate < T > filterFunction ,
Function < T , List < TsKvEntry > > processFunction ,
boolean ignoreEmptyUpdates ) {
Set < TbSubscription > entitySubscriptions = subscriptionsByEntityId . get ( entityId ) ;
if ( entitySubscriptions ! = null ) {
entitySubscriptions . stream ( ) . map ( castFunction ) . filter ( Objects : : nonNull ) . filter ( filterFunction ) . forEach ( s - > {
List < TsKvEntry > subscriptionUpdate = processFunction . apply ( s ) ;
if ( subscriptionUpdate ! = null & & ! subscriptionUpdate . isEmpty ( ) ) {
if ( serviceId . equals ( s . getServiceId ( ) ) ) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate ( s . getSubscriptionId ( ) , subscriptionUpdate ) ;
localSubscriptionService . onSubscriptionUpdate ( s . getSessionId ( ) , update , TbCallback . EMPTY ) ;
} else {
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , s . getServiceId ( ) ) ;
toCoreNotificationsProducer . send ( tpi , toProto ( s , subscriptionUpdate , ignoreEmptyUpdates ) , null ) ;
public void processAttributesUpdate ( EntityId entityId , List < AttributeKvEntry > update ) {
TbEntityRemoteSubsInfo subInfo = entitySubscriptions . get ( entityId ) ;
if ( subInfo ! = null ) {
log . trace ( "[{}] Handling attributes update: {}" , entityId , update ) ;
subInfo . getSubs ( ) . forEach ( ( serviceId , sub ) - > {
if ( sub . attrAllKeys ) {
processAttributesUpdate ( serviceId , entityId , update ) ;
} else if ( sub . attrKeys ! = null ) {
List < AttributeKvEntry > tmp = getSubList ( update , sub . attrKeys ) ;
if ( tmp ! = null ) {
processAttributesUpdate ( serviceId , entityId , tmp ) ;
}
}
} ) ;
} else {
log . debug ( "[{}] No device subscriptions to process! ", entityId ) ;
log . trace ( "[{}] No attributes subscriptions for entity." , entityId ) ;
}
}
private void onLocalAlarmSubUpdate ( EntityId entityId ,
Function < TbSubscription , TbAlarmsSubscription > castFunction ,
Predicate < TbAlarmsSubscription > filterFunction ,
AlarmInfo alarm , boolean deleted ) {
Set < TbSubscription > entitySubscriptions = subscriptionsByEntityId . get ( entityId ) ;
if ( alarm = = null ) {
log . warn ( "[{}] empty alarm update!" , entityId ) ;
return ;
}
if ( entitySubscriptions ! = null ) {
entitySubscriptions . stream ( ) . map ( castFunction ) . filter ( Objects : : nonNull ) . filter ( filterFunction ) . forEach ( s - > {
if ( serviceId . equals ( s . getServiceId ( ) ) ) {
AlarmSubscriptionUpdate update = new AlarmSubscriptionUpdate ( s . getSubscriptionId ( ) , alarm , deleted ) ;
localSubscriptionService . onSubscriptionUpdate ( s . getSessionId ( ) , update , TbCallback . EMPTY ) ;
} else {
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , s . getServiceId ( ) ) ;
toCoreNotificationsProducer . send ( tpi , toProto ( s , alarm , deleted ) , null ) ;
}
} ) ;
private void processAttributesUpdate ( String targetId , EntityId entityId , List < AttributeKvEntry > update ) {
List < TsKvEntry > tsKvEntryList = update . stream ( ) . map ( attr - > new BasicTsKvEntry ( attr . getLastUpdateTs ( ) , attr ) ) . collect ( Collectors . toList ( ) ) ;
if ( serviceId . equals ( targetId ) ) {
localSubscriptionService . onAttributesUpdate ( entityId , tsKvEntryList , TbCallback . EMPTY ) ;
} else {
log . debug ( "[{}] No device subscriptions to process!" , entityId ) ;
sendCoreNotification ( targetId , entityId , TbSubscriptionUtils . toProto ( false , entityId , tsKvEntryList ) ) ;
}
}
private void removeSubscriptionFromEntityMap ( TbSubscription sub ) {
Set < TbSubscription > entitySubSet = subscriptionsByEntityId . get ( sub . getEntityId ( ) ) ;
if ( entitySubSet ! = null ) {
entitySubSet . remove ( sub ) ;
if ( entitySubSet . isEmpty ( ) ) {
subscriptionsByEntityId . remove ( sub . getEntityId ( ) ) ;
private void updateDeviceInactivityTimeout ( TenantId tenantId , EntityId entityId , List < ? extends KvEntry > kvEntries ) {
for ( KvEntry kvEntry : kvEntries ) {
if ( kvEntry . getKey ( ) . equals ( DefaultDeviceStateService . INACTIVITY_TIMEOUT ) ) {
deviceStateService . onDeviceInactivityTimeoutUpdate ( tenantId , new DeviceId ( entityId . getId ( ) ) , getLongValue ( kvEntry ) ) ;
}
}
}
private void removeSubscriptionFromPartitionMap ( TbSubscription sub ) {
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , sub . getTenantId ( ) , sub . getEntityId ( ) ) ;
Set < TbSubscription > subs = partitionedSubscriptions . get ( tpi ) ;
if ( subs ! = null ) {
subs . remove ( sub ) ;
private void deleteDeviceInactivityTimeout ( TenantId tenantId , EntityId entityId , List < String > keys ) {
for ( String key : keys ) {
if ( key . equals ( DefaultDeviceStateService . INACTIVITY_TIMEOUT ) ) {
deviceStateService . onDeviceInactivityTimeoutUpdate ( tenantId , new DeviceId ( entityId . getId ( ) ) , 0 ) ;
}
}
}
private void handleNewAttributeSubscription ( TbAttributeSubscription subscription ) {
log . trace ( "[{}][{}][{}] Processing remote attribute subscription for entity [{}]" ,
serviceId , subscription . getSessionId ( ) , subscription . getSubscriptionId ( ) , subscription . getEntityId ( ) ) ;
final Map < String , Long > keyStates = subscription . getKeyStates ( ) ;
DonAsynchron . withCallback ( attrService . find ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , DataConstants . CLIENT_SCOPE , keyStates . keySet ( ) ) , values - > {
List < TsKvEntry > missedUpdates = new ArrayList < > ( ) ;
values . forEach ( latestEntry - > {
if ( latestEntry . getLastUpdateTs ( ) > keyStates . get ( latestEntry . getKey ( ) ) ) {
missedUpdates . add ( new BasicTsKvEntry ( latestEntry . getLastUpdateTs ( ) , latestEntry ) ) ;
}
} ) ;
if ( ! missedUpdates . isEmpty ( ) ) {
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , subscription . getServiceId ( ) ) ;
toCoreNotificationsProducer . send ( tpi , toProto ( subscription , missedUpdates ) , null ) ;
}
} ,
e - > log . error ( "Failed to fetch missed updates." , e ) , tsCallBackExecutor ) ;
@Override
public void onAlarmUpdate ( TenantId tenantId , EntityId entityId , AlarmInfo alarm , TbCallback callback ) {
onAlarmSubUpdate ( tenantId , entityId , alarm , false , callback ) ;
}
private void handleNewAlarmsSubscription ( TbAlarmsSubscription subscription ) {
log . trace ( "[{}][{}][{}] Processing remote alarm subscription for entity [{}]" ,
serviceId , subscription . getSessionId ( ) , subscription . getSubscriptionId ( ) , subscription . getEntityId ( ) ) ;
//TODO: @dlandiak search all new alarms for this entity.
@Override
public void onAlarmDeleted ( TenantId tenantId , EntityId entityId , AlarmInfo alarm , TbCallback callback ) {
onAlarmSubUpdate ( tenantId , entityId , alarm , true , callback ) ;
}
private void handleNewTelemetrySubscription ( TbTimeseriesSubscription subscription ) {
log . trace ( "[{}][{}][{}] Processing remote telemetry subscription for entity [{}]" ,
serviceId , subscription . getSessionId ( ) , subscription . getSubscriptionId ( ) , subscription . getEntityId ( ) ) ;
long curTs = System . currentTimeMillis ( ) ;
if ( subscription . isLatestValues ( ) ) {
DonAsynchron . withCallback ( tsService . findLatest ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , subscription . getKeyStates ( ) . keySet ( ) ) ,
missedUpdates - > {
if ( missedUpdates ! = null & & ! missedUpdates . isEmpty ( ) ) {
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , subscription . getServiceId ( ) ) ;
toCoreNotificationsProducer . send ( tpi , toProto ( subscription , missedUpdates ) , null ) ;
}
} ,
e - > log . error ( "Failed to fetch missed updates." , e ) ,
tsCallBackExecutor ) ;
} else {
List < ReadTsKvQuery > queries = new ArrayList < > ( ) ;
subscription . getKeyStates ( ) . forEach ( ( key , value ) - > {
if ( curTs > value ) {
long startTs = subscription . getStartTime ( ) > 0 ? Math . max ( subscription . getStartTime ( ) , value + 1L ) : ( value + 1L ) ;
long endTs = subscription . getEndTime ( ) > 0 ? Math . min ( subscription . getEndTime ( ) , curTs ) : curTs ;
queries . add ( new BaseReadTsKvQuery ( key , startTs , endTs , 0 , 1000 , Aggregation . NONE ) ) ;
private void onAlarmSubUpdate ( TenantId tenantId , EntityId entityId , AlarmInfo alarm , boolean deleted , TbCallback callback ) {
TbEntityRemoteSubsInfo subInfo = entitySubscriptions . get ( entityId ) ;
if ( subInfo ! = null ) {
log . trace ( "[{}][{}] Handling alarm update {}: {}" , tenantId , entityId , alarm , deleted ) ;
for ( Map . Entry < String , TbSubscriptionsInfo > entry : subInfo . getSubs ( ) . entrySet ( ) ) {
if ( entry . getValue ( ) . notifications ) {
onAlarmSubUpdate ( entry . getKey ( ) , entityId , alarm , deleted ) ;
}
} ) ;
if ( ! queries . isEmpty ( ) ) {
DonAsynchron . withCallback ( tsService . findAll ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , queries ) ,
missedUpdates - > {
if ( missedUpdates ! = null & & ! missedUpdates . isEmpty ( ) ) {
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , subscription . getServiceId ( ) ) ;
toCoreNotificationsProducer . send ( tpi , toProto ( subscription , missedUpdates ) , null ) ;
}
} ,
e - > log . error ( "Failed to fetch missed updates." , e ) ,
tsCallBackExecutor ) ;
}
}
callback . onSuccess ( ) ;
}
private TbProtoQueueMsg < ToCoreNotificationMsg > toProto ( TbSubscription subscription , List < TsKvEntry > updates ) {
return toProto ( subscription , updates , true ) ;
private void onAlarmSubUpdate ( String targetServiceId , EntityId entityId , AlarmInfo alarm , boolean deleted ) {
if ( alarm = = null ) {
log . warn ( "[{}] empty alarm update!" , entityId ) ;
return ;
}
if ( serviceId . equals ( targetServiceId ) ) {
log . trace ( "[{}] Forwarding to local service: {} deleted: {}" , entityId , alarm , deleted ) ;
localSubscriptionService . onAlarmUpdate ( entityId , alarm , deleted , TbCallback . EMPTY ) ;
} else {
sendCoreNotification ( targetServiceId , entityId ,
TbSubscriptionUtils . toAlarmSubUpdateToProto ( entityId , alarm , deleted ) ) ;
}
}
private TbProtoQueueMsg < ToCoreNotificationMsg > toProto ( TbSubscription subscription , List < TsKvEntry > updates , boolean ignoreEmptyUpdates ) {
TbSubscriptionUpdateProto . Builder builder = TbSubscriptionUpdateProto . newBuilder ( ) ;
builder . setSessionId ( subscription . getSessionId ( ) ) ;
builder . setSubscriptionId ( subscription . getSubscriptionId ( ) ) ;
Map < String , List < Object > > data = new TreeMap < > ( ) ;
for ( TsKvEntry tsEntry : updates ) {
List < Object > values = data . computeIfAbsent ( tsEntry . getKey ( ) , k - > new ArrayList < > ( ) ) ;
Object [ ] value = new Object [ 2 ] ;
value [ 0 ] = tsEntry . getTs ( ) ;
value [ 1 ] = tsEntry . getValueAsString ( ) ;
values . add ( value ) ;
}
private void sendCoreNotification ( String targetServiceId , EntityId entityId , ToCoreNotificationMsg msg ) {
log . trace ( "[{}] Forwarding to remote service [{}]: {}" , entityId , targetServiceId , msg ) ;
TopicPartitionInfo tpi = notificationsTopicService . getNotificationsTopic ( ServiceType . TB_CORE , targetServiceId ) ;
TbProtoQueueMsg < ToCoreNotificationMsg > queueMsg = new TbProtoQueueMsg < > ( entityId . getId ( ) , msg ) ;
toCoreNotificationsProducer . send ( tpi , queueMsg , null ) ;
}
data . forEach ( ( key , value ) - > {
TbSubscriptionUpdateValueListProto . Builder dataBuilder = TbSubscriptionUpdateValueListProto . newBuilder ( ) ;
dataBuilder . setKey ( key ) ;
boolean hasData = false ;
for ( Object v : value ) {
Object [ ] array = ( Object [ ] ) v ;
TbSubscriptionUpdateTsValue . Builder tsValueBuilder = TbSubscriptionUpdateTsValue . newBuilder ( ) ;
tsValueBuilder . setTs ( ( long ) array [ 0 ] ) ;
String strVal = ( String ) array [ 1 ] ;
if ( strVal ! = null ) {
hasData = true ;
tsValueBuilder . setValue ( strVal ) ;
@Override
public void onNotificationUpdate ( TenantId tenantId , UserId entityId , NotificationUpdate notificationUpdate , TbCallback callback ) {
TbEntityRemoteSubsInfo subInfo = entitySubscriptions . get ( entityId ) ;
if ( subInfo ! = null ) {
NotificationsSubscriptionUpdate subscriptionUpdate = new NotificationsSubscriptionUpdate ( notificationUpdate ) ;
log . trace ( "[{}][{}] Handling notificationUpdate for user {}" , tenantId , entityId , notificationUpdate ) ;
for ( Map . Entry < String , TbSubscriptionsInfo > entry : subInfo . getSubs ( ) . entrySet ( ) ) {
if ( entry . getValue ( ) . notifications ) {
onNotificationsSubUpdate ( entry . getKey ( ) , entityId , subscriptionUpdate ) ;
}
dataBuilder . addTsValue ( tsValueBuilder . build ( ) ) ;
}
if ( ! ignoreEmptyUpdates | | hasData ) {
builder . addData ( dataBuilder . build ( ) ) ;
}
} ) ;
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg . newBuilder ( ) . setToLocalSubscriptionServiceMsg (
LocalSubscriptionServiceMsgProto . newBuilder ( ) . setSubUpdate ( builder . build ( ) ) . build ( ) )
. build ( ) ;
return new TbProtoQueueMsg < > ( subscription . getEntityId ( ) . getId ( ) , toCoreMsg ) ;
}
callback . onSuccess ( ) ;
}
private TbProtoQueueMsg < ToCoreNotificationMsg > toProto ( TbSubscription subscription , AlarmInfo alarm , boolean deleted ) {
TbAlarmSubscriptionUpdateProto . Builder builder = TbAlarmSubscriptionUpdateProto . newBuilder ( ) ;
builder . setSessionId ( subscription . getSessionId ( ) ) ;
builder . setSubscriptionId ( subscription . getSubscriptionId ( ) ) ;
builder . setAlarm ( JacksonUtil . toString ( alarm ) ) ;
builder . setDeleted ( deleted ) ;
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg . newBuilder ( ) . setToLocalSubscriptionServiceMsg (
LocalSubscriptionServiceMsgProto . newBuilder ( )
. setAlarmSubUpdate ( builder . build ( ) ) . build ( ) )
. build ( ) ;
return new TbProtoQueueMsg < > ( subscription . getEntityId ( ) . getId ( ) , toCoreMsg ) ;
private void onNotificationsSubUpdate ( String targetServiceId , EntityId entityId , NotificationsSubscriptionUpdate subscriptionUpdate ) {
if ( serviceId . equals ( targetServiceId ) ) {
log . trace ( "[{}] Forwarding to local service: {}" , entityId , subscriptionUpdate ) ;
localSubscriptionService . onNotificationUpdate ( entityId , subscriptionUpdate , TbCallback . EMPTY ) ;
} else {
sendCoreNotification ( targetServiceId , entityId ,
TbSubscriptionUtils . notificationsSubUpdateToProto ( entityId , subscriptionUpdate ) ) ;
}
}
private static long getLongValue ( KvEntry kve ) {
@ -639,4 +378,31 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
}
}
private static < T extends KvEntry > List < T > getSubList ( List < T > ts , Set < String > keys ) {
List < T > update = null ;
for ( T entry : ts ) {
if ( keys . contains ( entry . getKey ( ) ) ) {
if ( update = = null ) {
update = new ArrayList < > ( ts . size ( ) ) ;
}
update . add ( entry ) ;
}
}
return update ;
}
private TbEntityUpdatesInfo getEntityUpdatesInfo ( EntityId entityId ) {
return entityUpdates . computeIfAbsent ( entityId , id - > new TbEntityUpdatesInfo ( initTs ) ) ;
}
private void cleanupEntityUpdates ( ) {
initTs = System . currentTimeMillis ( ) - TimeUnit . HOURS . toMillis ( 1 ) ;
int sizeBeforeCleanup = entityUpdates . size ( ) ;
entityUpdates . entrySet ( ) . removeIf ( kv - > {
var v = kv . getValue ( ) ;
return initTs > v . attributesUpdateTs & & initTs > v . timeSeriesUpdateTs ;
} ) ;
log . info ( "Removed {} old entity update records." , entityUpdates . size ( ) - sizeBeforeCleanup ) ;
}
}