@ -23,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.AllArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.stereotype.Service ;
import org.springframework.util.ConcurrentReferenceHashMap ;
import org.thingsboard.server.common.data.Customer ;
import org.thingsboard.server.common.data.DataConstants ;
import org.thingsboard.server.common.data.EntityType ;
@ -40,8 +41,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery ;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery ;
import org.thingsboard.server.common.data.kv.TsKvEntry ;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent ;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg ;
import org.thingsboard.server.dao.timeseries.TimeseriesService ;
import org.thingsboard.server.queue.util.TbCoreComponent ;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService ;
import org.thingsboard.server.service.security.model.SecurityUser ;
@ -50,19 +52,22 @@ import java.util.ArrayList;
import java.util.Collection ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ExecutionException ;
import java.util.stream.Collectors ;
import static org.apache.commons.lang3.StringUtils.isBlank ;
@Service
@TbCoreComponent
@AllArgsConstructor
@Slf4j
public class DefaultTbEntityViewService extends AbstractTbEntityService implements TbEntityViewService {
private final TimeseriesService tsService ;
final Map < TenantId , Map < EntityId , List < EntityView > > > localCache = new ConcurrentHashMap < > ( ) ;
@Override
public EntityView save ( EntityView entityView , EntityView existingEntityView , SecurityUser user ) throws ThingsboardException {
ActionType actionType = entityView . getId ( ) = = null ? ActionType . ADDED : ActionType . UPDATED ;
@ -71,6 +76,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
this . updateEntityViewAttributes ( user , savedEntityView , existingEntityView ) ;
notificationEntityService . notifyCreateOrUpdateEntity ( savedEntityView . getTenantId ( ) , savedEntityView . getId ( ) , savedEntityView ,
null , actionType , user ) ;
localCache . computeIfAbsent ( savedEntityView . getTenantId ( ) , ( k ) - > new ConcurrentReferenceHashMap < > ( ) ) . clear ( ) ;
tbClusterService . broadcastEntityStateChangeEvent ( savedEntityView . getTenantId ( ) , savedEntityView . getId ( ) ,
entityView . getId ( ) = = null ? ComponentLifecycleEvent . CREATED : ComponentLifecycleEvent . UPDATED ) ;
return savedEntityView ;
} catch ( Exception e ) {
notificationEntityService . notifyEntity ( user . getTenantId ( ) , emptyId ( EntityType . ENTITY_VIEW ) , entityView , null , actionType , user , e ) ;
@ -122,6 +130,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
entityViewService . deleteEntityView ( tenantId , entityViewId ) ;
notificationEntityService . notifyDeleteEntity ( tenantId , entityViewId , entityView , entityView . getCustomerId ( ) , ActionType . DELETED ,
relatedEdgeIds , user , entityViewId . toString ( ) ) ;
localCache . computeIfAbsent ( tenantId , ( k ) - > new ConcurrentReferenceHashMap < > ( ) ) . clear ( ) ;
tbClusterService . broadcastEntityStateChangeEvent ( tenantId , entityViewId , ComponentLifecycleEvent . DELETED ) ;
} catch ( Exception e ) {
notificationEntityService . notifyEntity ( tenantId , emptyId ( EntityType . ENTITY_VIEW ) , null , null ,
ActionType . DELETED , user , e , entityViewId . toString ( ) ) ;
@ -214,6 +225,51 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
}
}
@Override
public ListenableFuture < List < EntityView > > findEntityViewsByTenantIdAndEntityIdAsync ( TenantId tenantId , EntityId entityId ) {
Map < EntityId , List < EntityView > > localCacheByTenant = localCache . computeIfAbsent ( tenantId , ( k ) - > new ConcurrentReferenceHashMap < > ( ) ) ;
List < EntityView > fromLocalCache = localCacheByTenant . get ( entityId ) ;
if ( fromLocalCache ! = null ) {
return Futures . immediateFuture ( fromLocalCache ) ;
}
ListenableFuture < List < EntityView > > future = entityViewService . findEntityViewsByTenantIdAndEntityIdAsync ( tenantId , entityId ) ;
return Futures . transform ( future , ( entityViewList ) - > {
localCacheByTenant . put ( entityId , entityViewList ) ;
return entityViewList ;
} , MoreExecutors . directExecutor ( ) ) ;
}
@Override
public void onComponentLifecycleMsg ( ComponentLifecycleMsg componentLifecycleMsg ) {
Map < EntityId , List < EntityView > > localCacheByTenant = localCache . computeIfAbsent ( componentLifecycleMsg . getTenantId ( ) , ( k ) - > new ConcurrentReferenceHashMap < > ( ) ) ;
EntityViewId entityViewId = new EntityViewId ( componentLifecycleMsg . getEntityId ( ) . getId ( ) ) ;
deleteOldCacheValue ( localCacheByTenant , entityViewId ) ;
if ( componentLifecycleMsg . getEvent ( ) ! = ComponentLifecycleEvent . DELETED ) {
EntityView entityView = entityViewService . findEntityViewById ( componentLifecycleMsg . getTenantId ( ) , entityViewId ) ;
if ( entityView ! = null ) {
localCacheByTenant . remove ( entityView . getEntityId ( ) ) ;
}
}
}
private void deleteOldCacheValue ( Map < EntityId , List < EntityView > > localCacheByTenant , EntityViewId entityViewId ) {
for ( var entry : localCacheByTenant . entrySet ( ) ) {
EntityView toDelete = null ;
for ( EntityView view : entry . getValue ( ) ) {
if ( entityViewId . equals ( view . getId ( ) ) ) {
toDelete = view ;
break ;
}
}
if ( toDelete ! = null ) {
entry . getValue ( ) . remove ( toDelete ) ;
break ;
}
}
}
private ListenableFuture < List < Void > > copyAttributesFromEntityToEntityView ( EntityView entityView , String scope , Collection < String > keys , SecurityUser user ) throws ThingsboardException {
EntityViewId entityId = entityView . getId ( ) ;
if ( keys ! = null & & ! keys . isEmpty ( ) ) {
@ -229,8 +285,8 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
long lastUpdateTs = attributeKvEntry . getLastUpdateTs ( ) ;
return startTime = = 0 & & endTime = = 0 | |
( endTime = = 0 & & startTime < lastUpdateTs ) | |
( startTime = = 0 & & endTime > lastUpdateTs )
? true : startTime < lastUpdateTs & & endTime > lastUpdateTs ;
( startTime = = 0 & & endTime > lastUpdateTs ) | |
( startTime < lastUpdateTs & & endTime > lastUpdateTs ) ;
} ) . collect ( Collectors . toList ( ) ) ;
tsSubService . saveAndNotify ( entityView . getTenantId ( ) , entityId , scope , attributes , new FutureCallback < Void > ( ) {
@Override
@ -345,7 +401,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
@Override
public void onFailure ( Throwable t ) {
try {
logTimeseriesDeleted ( entityView . getTenantId ( ) , user , entityId , keys , t ) ;
logTimeseriesDeleted ( entityView . getTenantId ( ) , user , entityId , keys , t ) ;
} catch ( ThingsboardException e ) {
log . error ( "Failed to log timeseries delete" , e ) ;
}