@ -16,29 +16,52 @@
package org.thingsboard.server.actors.calculatedField ;
import lombok.extern.slf4j.Slf4j ;
import org.jetbrains.annotations.NotNull ;
import org.thingsboard.server.actors.ActorSystemContext ;
import org.thingsboard.server.actors.TbActorCtx ;
import org.thingsboard.server.actors.TbActorRef ;
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId ;
import org.thingsboard.server.actors.service.DefaultActorService ;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor ;
import org.thingsboard.server.cluster.TbClusterService ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.cf.CalculatedField ;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink ;
import org.thingsboard.server.common.data.id.AssetId ;
import org.thingsboard.server.common.data.id.AssetProfileId ;
import org.thingsboard.server.common.data.id.CalculatedFieldId ;
import org.thingsboard.server.common.data.id.DeviceId ;
import org.thingsboard.server.common.data.id.DeviceProfileId ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.EntityIdFactory ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.page.PageDataIterable ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg ;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.gen.transport.TransportProtos ;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto ;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto ;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto ;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg ;
import org.thingsboard.server.queue.discovery.HashPartitionService ;
import org.thingsboard.server.queue.discovery.PartitionService ;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService ;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId ;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx ;
import org.thingsboard.server.service.profile.TbAssetProfileCache ;
import org.thingsboard.server.service.profile.TbDeviceProfileCache ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.CopyOnWriteArrayList ;
@ -50,18 +73,22 @@ import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
public class CalculatedFieldManagerMessageProcessor extends AbstractContextAwareMsgProcessor {
private final Map < CalculatedFieldId , CalculatedField > calculatedFields = new HashMap < > ( ) ;
private final Map < CalculatedFieldId , CalculatedFieldCtx > calculatedFields = new HashMap < > ( ) ;
private final Map < EntityId , List < CalculatedFieldCtx > > entityIdCalculatedFields = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < EntityId , List < CalculatedFieldLink > > entityIdCalculatedFieldLinks = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < EntityId , Set < EntityId > > profileEntities = new ConcurrentHashMap < > ( ) ;
private final CalculatedFieldExecutionService cfService ;
private final TbAssetProfileCache assetProfileCache ;
private final TbDeviceProfileCache deviceProfileCache ;
protected TbActorCtx ctx ;
final TenantId tenantId ;
private final static int initFetchPackSize = 1024 ;
CalculatedFieldManagerMessageProcessor ( ActorSystemContext systemContext , TenantId tenantId ) {
super ( systemContext ) ;
this . cfService = systemContext . getCalculatedFieldExecutionService ( ) ;
this . assetProfileCache = systemContext . getAssetProfileCache ( ) ;
this . deviceProfileCache = systemContext . getDeviceProfileCache ( ) ;
this . tenantId = tenantId ;
@ -73,11 +100,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onFieldInitMsg ( CalculatedFieldInitMsg msg ) {
var cf = msg . getCf ( ) ;
calculatedFields . put ( cf . getId ( ) , cf ) ;
var cfCtx = new CalculatedFieldCtx ( cf , systemContext . getTbelInvokeService ( ) ) ;
calculatedFields . put ( cf . getId ( ) , cfCtx ) ;
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields . computeIfAbsent ( cf . getEntityId ( ) , id - > new CopyOnWriteArrayList < > ( ) )
. add ( new CalculatedFieldCtx ( cf , systemContext . getTbelInvokeService ( ) ) ) ;
entityIdCalculatedFields . computeIfAbsent ( cf . getEntityId ( ) , id - > new CopyOnWriteArrayList < > ( ) ) . add ( cfCtx ) ;
msg . getCallback ( ) . onSuccess ( ) ;
}
@ -99,14 +126,70 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onTelemetryMsg ( CalculatedFieldTelemetryMsg msg ) {
EntityId entityId = msg . getEntityId ( ) ;
var proto = msg . getProto ( ) ;
// 2 = 1 for CF processing + 1 for links processing
MultipleTbCallback callback = new MultipleTbCallback ( 2 , msg . getCallback ( ) ) ;
// process all cfs related to entity, or it's profile;
var entityIdFields = getCalculatedFieldsByEntityId ( entityId ) ;
var profileIdFields = getCalculatedFieldsByEntityId ( getProfileId ( tenantId , entityId ) ) ;
//TODO: Transfer only 'part' of the original callback.
getOrCreateActor ( entityId ) . tell ( new EntityCalculatedFieldTelemetryMsg ( msg , entityIdFields , profileIdFields , msg . getCallback ( ) ) ) ;
if ( ! entityIdFields . isEmpty ( ) | | ! profileIdFields . isEmpty ( ) ) {
getOrCreateActor ( entityId ) . tell ( new EntityCalculatedFieldTelemetryMsg ( msg , entityIdFields , profileIdFields , callback ) ) ;
} else {
callback . onSuccess ( ) ;
}
// process all links (if any);
var links = getCalculatedFieldLinksByEntityId ( entityId ) ;
List < CalculatedFieldEntityCtxId > linkedCalculatedFields = filterCalculatedFieldLinks ( msg ) ;
var linksSize = linkedCalculatedFields . size ( ) ;
if ( linksSize > 0 ) {
cfService . pushMsgToLinks ( msg , linkedCalculatedFields , callback ) ;
} else {
callback . onSuccess ( ) ;
}
}
public void onLinkedTelemetryMsg ( CalculatedFieldLinkedTelemetryMsg msg ) {
EntityId sourceEntityId = msg . getEntityId ( ) ;
var proto = msg . getProto ( ) ;
var linksList = proto . getLinksList ( ) ;
for ( var linkProto : linksList ) {
var link = toCalculatedFieldEntityCtxId ( linkProto ) ;
var targetEntityId = link . entityId ( ) ;
var targetEntityType = targetEntityId . getEntityType ( ) ;
var cf = calculatedFields . get ( link . cfId ( ) ) ;
if ( EntityType . DEVICE_PROFILE . equals ( targetEntityType ) | | EntityType . ASSET_PROFILE . equals ( targetEntityType ) ) {
// iterate over all entities that belong to profile and push the message for corresponding CF
var entityIds = getEntitiesByProfile ( targetEntityId ) ;
if ( ! entityIds . isEmpty ( ) ) {
MultipleTbCallback callback = new MultipleTbCallback ( entityIds . size ( ) , msg . getCallback ( ) ) ;
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg ( tenantId , sourceEntityId , proto . getMsg ( ) , cf , callback ) ;
entityIds . forEach ( entityId - > getOrCreateActor ( entityId ) . tell ( newMsg ) ) ;
} else {
msg . getCallback ( ) . onSuccess ( ) ;
}
} else {
// push the message to specific entity;
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg ( tenantId , sourceEntityId , proto . getMsg ( ) , cf , msg . getCallback ( ) ) ;
getOrCreateActor ( targetEntityId ) . tell ( newMsg ) ;
}
}
}
private CalculatedFieldEntityCtxId toCalculatedFieldEntityCtxId ( CalculatedFieldEntityCtxIdProto ctxIdProto ) {
EntityId entityId = EntityIdFactory . getByTypeAndUuid ( ctxIdProto . getEntityType ( ) , new UUID ( ctxIdProto . getEntityIdMSB ( ) , ctxIdProto . getEntityIdLSB ( ) ) ) ;
CalculatedFieldId calculatedFieldId = new CalculatedFieldId ( new UUID ( ctxIdProto . getCalculatedFieldIdMSB ( ) , ctxIdProto . getCalculatedFieldIdLSB ( ) ) ) ;
return new CalculatedFieldEntityCtxId ( tenantId , calculatedFieldId , entityId ) ;
}
private List < CalculatedFieldEntityCtxId > filterCalculatedFieldLinks ( CalculatedFieldTelemetryMsg msg ) {
EntityId entityId = msg . getEntityId ( ) ;
var proto = msg . getProto ( ) ;
List < CalculatedFieldEntityCtxId > result = new ArrayList < > ( ) ;
for ( var link : getCalculatedFieldLinksByEntityId ( entityId ) ) {
CalculatedFieldCtx ctx = calculatedFields . get ( link . getCalculatedFieldId ( ) ) ;
if ( ctx . linkMatches ( entityId , proto ) ) {
result . add ( ctx . toCalculatedFieldEntityCtxId ( ) ) ;
}
}
return result ;
}
private List < CalculatedFieldCtx > getCalculatedFieldsByEntityId ( EntityId entityId ) {
@ -131,6 +214,30 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
return result ;
}
private Set < EntityId > getEntitiesByProfile ( EntityId entityProfileId ) {
Set < EntityId > entities = profileEntities . get ( entityProfileId ) ;
if ( entities = = null ) {
entities = switch ( entityProfileId . getEntityType ( ) ) {
case ASSET_PROFILE - > profileEntities . computeIfAbsent ( entityProfileId , profileId - > {
Set < EntityId > assetIds = new HashSet < > ( ) ;
( new PageDataIterable < > ( pageLink - >
systemContext . getAssetService ( ) . findAssetIdsByTenantIdAndAssetProfileId ( tenantId , ( AssetProfileId ) profileId , pageLink ) , initFetchPackSize ) ) . forEach ( assetIds : : add ) ;
return assetIds ;
} ) ;
case DEVICE_PROFILE - > profileEntities . computeIfAbsent ( entityProfileId , profileId - > {
Set < EntityId > deviceIds = new HashSet < > ( ) ;
( new PageDataIterable < > ( pageLink - >
systemContext . getDeviceService ( ) . findDeviceIdsByTenantIdAndDeviceProfileId ( tenantId , ( DeviceProfileId ) entityProfileId , pageLink ) , initFetchPackSize ) ) . forEach ( deviceIds : : add ) ;
return deviceIds ;
} ) ;
default - > throw new IllegalArgumentException ( "Entity type should be ASSET_PROFILE or DEVICE_PROFILE." ) ;
} ;
}
log . trace ( "[{}] Found entities by profile in cache: {}" , entityProfileId , entities ) ;
return entities ;
}
private EntityId getProfileId ( TenantId tenantId , EntityId entityId ) {
return switch ( entityId . getEntityType ( ) ) {
case ASSET - > assetProfileCache . get ( tenantId , ( AssetId ) entityId ) . getId ( ) ;
@ -139,7 +246,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} ;
}
protected TbActorRef getOrCreateActor ( EntityId entityId ) {
private TbActorRef getOrCreateActor ( EntityId entityId ) {
return ctx . getOrCreateChildActor ( new TbCalculatedFieldEntityActorId ( entityId ) ,
( ) - > DefaultActorService . CF_ENTITY_DISPATCHER_NAME ,
( ) - > new CalculatedFieldEntityActorCreator ( systemContext , tenantId , entityId ) ,