@ -15,7 +15,9 @@
* /
package org.thingsboard.server.actors.calculatedField ;
import lombok.Getter ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Value ;
import org.thingsboard.server.actors.ActorSystemContext ;
import org.thingsboard.server.actors.TbActorCtx ;
import org.thingsboard.server.actors.TbActorRef ;
@ -24,6 +26,7 @@ import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor ;
import org.thingsboard.server.common.data.DataConstants ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.ProfileEntityIdInfo ;
import org.thingsboard.server.common.data.cf.CalculatedField ;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink ;
import org.thingsboard.server.common.data.id.AssetId ;
@ -31,17 +34,22 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.page.PageDataIterable ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldActorInitMsg ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg ;
import org.thingsboard.server.common.msg.cf.ProfileEntityMsg ;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.dao.asset.AssetService ;
import org.thingsboard.server.dao.cf.CalculatedFieldService ;
import org.thingsboard.server.dao.device.DeviceService ;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService ;
import org.thingsboard.server.service.cf.CalculatedFieldStateService ;
import org.thingsboard.server.service.cf.cache.CalculatedField EntityProfileCache ;
import org.thingsboard.server.service.cf.cache.Tenant EntityProfileCache ;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId ;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx ;
import org.thingsboard.server.service.profile.TbAssetProfileCache ;
@ -68,22 +76,30 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private final CalculatedFieldProcessingService cfExecService ;
private final CalculatedFieldStateService cfStateService ;
private final CalculatedFieldEntityProfileCache cfEntityCache ;
private final CalculatedFieldService cfDaoService ;
private final DeviceService deviceService ;
private final AssetService assetService ;
private final TbAssetProfileCache assetProfileCache ;
private final TbDeviceProfileCache deviceProfileCache ;
private final TenantEntityProfileCache entityProfileCache ;
protected final TenantId tenantId ;
protected TbActorCtx ctx ;
@Value ( "${calculated_fields.init_fetch_pack_size:50000}" )
@Getter
private int initFetchPackSize ;
CalculatedFieldManagerMessageProcessor ( ActorSystemContext systemContext , TenantId tenantId ) {
super ( systemContext ) ;
this . cfEntityCache = systemContext . getCalculatedFieldEntityProfileCache ( ) ;
this . cfExecService = systemContext . getCalculatedFieldProcessingService ( ) ;
this . cfStateService = systemContext . getCalculatedFieldStateService ( ) ;
this . cfDaoService = systemContext . getCalculatedFieldService ( ) ;
this . deviceService = systemContext . getDeviceService ( ) ;
this . assetService = systemContext . getAssetService ( ) ;
this . assetProfileCache = systemContext . getAssetProfileCache ( ) ;
this . deviceProfileCache = systemContext . getDeviceProfileCache ( ) ;
this . entityProfileCache = new TenantEntityProfileCache ( ) ;
this . tenantId = tenantId ;
}
@ -100,6 +116,19 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
ctx . stop ( ctx . getSelf ( ) ) ;
}
public void onActorInitMsg ( CalculatedFieldActorInitMsg msg ) {
log . debug ( "[{}] Processing CF actor init message." , msg . getTenantId ( ) . getId ( ) ) ;
initEntityProfileCache ( ) ;
initCalculatedFields ( ) ;
msg . getCallback ( ) . onSuccess ( ) ;
}
public void onProfileEntityMsg ( ProfileEntityMsg msg ) {
log . debug ( "[{}] Processing profile entity message." , msg . getTenantId ( ) . getId ( ) ) ;
entityProfileCache . add ( msg . getProfileEntityId ( ) , msg . getEntityId ( ) ) ;
msg . getCallback ( ) . onSuccess ( ) ;
}
public void onFieldInitMsg ( CalculatedFieldInitMsg msg ) throws CalculatedFieldException {
log . debug ( "[{}] Processing CF init message." , msg . getCf ( ) . getId ( ) ) ;
var cf = msg . getCf ( ) ;
@ -180,16 +209,35 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
break ;
}
case DEVICE_PROFILE :
case ASSET_PROFILE : {
switch ( event ) {
case DELETED :
onProfileDeleted ( msg . getData ( ) , msg . getCallback ( ) ) ;
break ;
default :
msg . getCallback ( ) . onSuccess ( ) ;
break ;
}
break ;
}
default : {
msg . getCallback ( ) . onSuccess ( ) ;
}
}
}
private void onProfileDeleted ( ComponentLifecycleMsg msg , TbCallback callback ) {
entityProfileCache . removeProfileId ( msg . getEntityId ( ) ) ;
callback . onSuccess ( ) ;
}
private void onEntityCreated ( ComponentLifecycleMsg msg , TbCallback callback ) {
EntityId entityId = msg . getEntityId ( ) ;
EntityId profileId = getProfileId ( tenantId , entityId ) ;
cfEntityCache . add ( tenantId , profileId , entityId ) ;
if ( profileId ! = null ) {
entityProfileCache . add ( profileId , entityId ) ;
}
if ( ! isMyPartition ( entityId , callback ) ) {
return ;
}
@ -207,7 +255,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private void onEntityUpdated ( ComponentLifecycleMsg msg , TbCallback callback ) {
if ( msg . getOldProfileId ( ) ! = null & & ! msg . getOldProfileId ( ) . equals ( msg . getProfileId ( ) ) ) {
cfEntity Cache. update ( tenantId , msg . getOldProfileId ( ) , msg . getProfileId ( ) , msg . getEntityId ( ) ) ;
entityProfile Cache. update ( msg . getOldProfileId ( ) , msg . getProfileId ( ) , msg . getEntityId ( ) ) ;
if ( ! isMyPartition ( msg . getEntityId ( ) , callback ) ) {
return ;
}
@ -226,7 +274,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
private void onEntityDeleted ( ComponentLifecycleMsg msg , TbCallback callback ) {
cfEntityCache . evict ( tenantId , msg . getEntityId ( ) ) ;
entityProfileCache . removeEntityId ( msg . getEntityId ( ) ) ;
if ( isMyPartition ( msg . getEntityId ( ) , callback ) ) {
log . debug ( "Pushing entity lifecycle msg to specific actor [{}]" , msg . getEntityId ( ) ) ;
getOrCreateActor ( msg . getEntityId ( ) ) . tell ( new CalculatedFieldEntityDeleteMsg ( tenantId , msg . getEntityId ( ) , callback ) ) ;
@ -322,7 +370,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
EntityId entityId = cfCtx . getEntityId ( ) ;
EntityType entityType = cfCtx . getEntityId ( ) . getEntityType ( ) ;
if ( isProfileEntity ( entityType ) ) {
var entityIds = cfEntity Cache. getEntityIdsByProfileId ( tenantId , entityId ) ;
var entityIds = entityProfile Cache. getEntityIdsByProfileId ( entityId ) ;
if ( ! entityIds . isEmpty ( ) ) {
//TODO: no need to do this if we cache all created actors and know which one belong to us;
var multiCallback = new MultipleTbCallback ( entityIds . size ( ) , callback ) ;
@ -383,7 +431,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
var cf = calculatedFields . get ( link . cfId ( ) ) ;
if ( EntityType . DEVICE_PROFILE . equals ( targetEntityType ) | | EntityType . ASSET_PROFILE . equals ( targetEntityType ) ) {
// iterate over all entities that belong to profile and push the message for corresponding CF
var entityIds = cfEntity Cache. getEntityIdsByProfileId ( tenantId , targetEntityId ) ;
var entityIds = entityProfile Cache. getEntityIdsByProfileId ( targetEntityId ) ;
if ( ! entityIds . isEmpty ( ) ) {
MultipleTbCallback multipleCallback = new MultipleTbCallback ( entityIds . size ( ) , callback ) ;
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg ( tenantId , sourceEntityId , proto . getMsg ( ) , cf , multipleCallback ) ;
@ -445,7 +493,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
EntityId entityId = cfCtx . getEntityId ( ) ;
EntityType entityType = cfCtx . getEntityId ( ) . getEntityType ( ) ;
if ( isProfileEntity ( entityType ) ) {
var entityIds = cfEntity Cache. getEntityIdsByProfileId ( tenantId , entityId ) ;
var entityIds = entityProfile Cache. getEntityIdsByProfileId ( entityId ) ;
if ( ! entityIds . isEmpty ( ) ) {
var multiCallback = new MultipleTbCallback ( entityIds . size ( ) , callback ) ;
entityIds . forEach ( id - > {
@ -513,21 +561,46 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
public void onPartitionChange ( CalculatedFieldPartitionChangeMsg msg ) {
initCalculatedFields ( ) ;
ctx . broadcastToChildren ( msg , true ) ;
}
public void initCalculatedFields ( ) {
cfDaoService . findCalculatedFieldsByTenantId ( tenantId ) . forEach ( cf - > {
PageDataIterable < CalculatedField > cfs = new PageDataIterable < > ( pageLink - > cfDaoService . findCalculatedFieldsByTenantId ( tenantId , pageLink ) , initFetchPackSize ) ;
cfs . forEach ( cf - > {
try {
onFieldInitMsg ( new CalculatedFieldInitMsg ( cf . getTenantId ( ) , cf ) ) ;
} catch ( CalculatedFieldException e ) {
throw new RuntimeException ( e ) ;
}
} ) ;
cfDaoService . findAllCalculatedFieldLinksByTenantId ( tenantId ) . forEach ( cfLink - > {
onLinkInitMsg ( new CalculatedFieldLinkInitMsg ( cfLink . getTenantId ( ) , cfLink ) ) ;
calculatedFields . values ( ) . forEach ( cf - > {
entityIdCalculatedFields . computeIfAbsent ( cf . getEntityId ( ) , id - > new CopyOnWriteArrayList < > ( ) ) . add ( cf ) ;
} ) ;
PageDataIterable < CalculatedFieldLink > cfls = new PageDataIterable < > ( pageLink - > cfDaoService . findAllCalculatedFieldLinksByTenantId ( tenantId , pageLink ) , initFetchPackSize ) ;
cfls . forEach ( link - > {
onLinkInitMsg ( new CalculatedFieldLinkInitMsg ( link . getTenantId ( ) , link ) ) ;
} ) ;
}
private void initEntityProfileCache ( ) {
PageDataIterable < ProfileEntityIdInfo > deviceIdInfos = new PageDataIterable < > ( pageLink - > deviceService . findProfileEntityIdInfosByTenantId ( tenantId , pageLink ) , initFetchPackSize ) ;
for ( ProfileEntityIdInfo idInfo : deviceIdInfos ) {
log . trace ( "Processing device record: {}" , idInfo ) ;
try {
entityProfileCache . add ( idInfo . getProfileId ( ) , idInfo . getEntityId ( ) ) ;
} catch ( Exception e ) {
log . error ( "Failed to process device record: {}" , idInfo , e ) ;
}
}
PageDataIterable < ProfileEntityIdInfo > assetIdInfos = new PageDataIterable < > ( pageLink - > assetService . findProfileEntityIdInfosByTenantId ( tenantId , pageLink ) , initFetchPackSize ) ;
for ( ProfileEntityIdInfo idInfo : assetIdInfos ) {
log . trace ( "Processing asset record: {}" , idInfo ) ;
try {
entityProfileCache . add ( idInfo . getProfileId ( ) , idInfo . getEntityId ( ) ) ;
} catch ( Exception e ) {
log . error ( "Failed to process asset record: {}" , idInfo , e ) ;
}
}
}
}