@ -66,7 +66,6 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageDataIterable ;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.common.util.ProtoUtils ;
@ -120,6 +119,8 @@ import java.util.function.Supplier;
import java.util.stream.Collectors ;
import static org.thingsboard.server.common.data.DataConstants.SCOPE ;
import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto ;
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY ;
@Service
@Slf4j
@ -242,14 +243,59 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
@Override
public void onTelemetryMsg ( CalculatedFieldTelemetryMsgProto msg , TbCallback callback ) {
try {
CalculatedFieldTelemetryUpdateRequest request = fromProto ( msg ) ;
EntityId entityId = request . getEntityId ( ) ;
if ( supportedReferencedEntities . contains ( entityId . getEntityType ( ) ) ) {
TenantId tenantId = request . getTenantId ( ) ;
TopicPartitionInfo tpi = partitionService . resolve ( CALCULATED_FIELD_QUEUE_KEY , entityId ) ;
if ( tpi . isMyPartition ( ) ) {
processCalculatedFields ( request , entityId ) ;
processCalculatedFields ( request , getProfileId ( tenantId , entityId ) ) ;
callback . onSuccess ( ) ;
Map < TopicPartitionInfo , List < CalculatedFieldEntityCtxId > > tpiStatesToUpdate = new HashMap < > ( ) ;
processCalculatedFieldLinks ( request , tpiStatesToUpdate ) ;
if ( ! tpiStatesToUpdate . isEmpty ( ) ) {
tpiStatesToUpdate . forEach ( ( topicPartitionInfo , ctxIds ) - > {
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto ( msg , ctxIds ) ;
clusterService . pushMsgToCalculatedFields ( topicPartitionInfo , UUID . randomUUID ( ) , ToCalculatedFieldMsg . newBuilder ( ) . setLinkedTelemetryMsg ( linkedTelemetryMsgProto ) . build ( ) , null ) ;
} ) ;
}
} else {
clusterService . pushMsgToCalculatedFields ( tpi , UUID . randomUUID ( ) , ToCalculatedFieldMsg . newBuilder ( ) . setTelemetryMsg ( msg ) . build ( ) , null ) ;
}
}
} catch ( Exception e ) {
log . trace ( "Failed to update telemetry." , e ) ;
}
}
@Override
public void onLinkedTelemetryMsg ( CalculatedFieldLinkedTelemetryMsgProto linkedMsg , TbCallback callback ) {
try {
CalculatedFieldTelemetryUpdateRequest request = fromProto ( linkedMsg . getMsg ( ) ) ;
if ( linkedMsg . getLinksList ( ) . isEmpty ( ) ) {
onTelemetryMsg ( linkedMsg . getMsg ( ) , callback ) ;
return ;
}
linkedMsg . getLinksList ( ) . forEach ( ctxIdProto - > {
CalculatedFieldId calculatedFieldId = new CalculatedFieldId ( new UUID ( ctxIdProto . getCalculatedFieldIdMSB ( ) , ctxIdProto . getCalculatedFieldIdLSB ( ) ) ) ;
CalculatedFieldCtx ctx = calculatedFieldCache . getCalculatedFieldCtx ( calculatedFieldId ) ;
callback . onSuccess ( ) ;
Map < String , KvEntry > updatedTelemetry = request . getMappedTelemetry ( ctx , request . getEntityId ( ) ) ;
if ( ! updatedTelemetry . isEmpty ( ) ) {
EntityId targetEntityId = EntityIdFactory . getByTypeAndUuid ( ctxIdProto . getEntityType ( ) , new UUID ( ctxIdProto . getEntityIdMSB ( ) , ctxIdProto . getEntityIdLSB ( ) ) ) ;
executeTelemetryUpdate ( ctx , targetEntityId , request . getPreviousCalculatedFieldIds ( ) , updatedTelemetry ) ;
}
} ) ;
} catch ( Exception e ) {
log . trace ( "Failed to process telemetry update msg: [{}]" , linkedMsg , e ) ;
}
}
@Override
@ -263,7 +309,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
Consumer < EntityId > resolvePartition = entityId - > {
TopicPartitionInfo tpi ;
try {
tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGINE , cf . getTenantId ( ) , entityId ) ;
tpi = partitionService . resolve ( CALCULATED_FIELD_QUEUE_KEY , entityId ) ;
if ( addedPartitions . contains ( tpi ) & & states . keySet ( ) . stream ( ) . noneMatch ( ctxId - > ctxId . cfId ( ) . equals ( cf . getId ( ) ) ) ) {
tpiTargetEntityMap . computeIfAbsent ( tpi , k - > new ArrayList < > ( ) ) . add ( new CalculatedFieldEntityCtxId ( cf . getId ( ) , entityId ) ) ;
}
@ -378,6 +424,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
@Override
public void pushCalculatedFieldLifecycleMsgToQueue ( CalculatedField calculatedField , ComponentLifecycleMsgProto proto ) {
EntityId entityId = calculatedField . getEntityId ( ) ;
ToCalculatedFieldMsg msg = ToCalculatedFieldMsg . newBuilder ( ) . setComponentLifecycleMsg ( proto ) . build ( ) ;
switch ( entityId . getEntityType ( ) ) {
case ASSET , DEVICE - > {
TopicPartitionInfo tpi = partitionService . resolve ( CALCULATED_FIELD_QUEUE_KEY , entityId ) ;
clusterService . pushMsgToCalculatedFields ( tpi , UUID . randomUUID ( ) , msg , null ) ;
}
case ASSET_PROFILE , DEVICE_PROFILE - > {
Set < TopicPartitionInfo > tpiSet = calculatedFieldCache . getEntitiesByProfile ( calculatedField . getTenantId ( ) , entityId ) . stream ( )
. map ( targetEntityId - > partitionService . resolve ( CALCULATED_FIELD_QUEUE_KEY , targetEntityId ) )
. collect ( Collectors . toSet ( ) ) ;
tpiSet . forEach ( tpi - > clusterService . pushMsgToCalculatedFields ( tpi , UUID . randomUUID ( ) , msg , null ) ) ;
}
default - > throw new IllegalArgumentException ( "Entity type '" + calculatedField . getId ( ) . getEntityType ( )
+ "' does not support calculated fields." ) ;
}
}
private boolean onCalculatedFieldUpdate ( CalculatedField updatedCalculatedField , TbCallback callback ) {
CalculatedField oldCalculatedField = calculatedFieldCache . getCalculatedField ( updatedCalculatedField . getId ( ) ) ;
boolean shouldReinit = true ;
@ -418,38 +484,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return entityIdChanged | | typeChanged | | argumentsChanged ;
}
@Override
public void onTelemetryUpdate ( CalculatedFieldTelemetryMsgProto proto , TbCallback callback ) {
try {
CalculatedFieldTelemetryUpdateRequest request = fromProto ( proto ) ;
EntityId entityId = request . getEntityId ( ) ;
if ( supportedReferencedEntities . contains ( entityId . getEntityType ( ) ) ) {
TenantId tenantId = request . getTenantId ( ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGINE , tenantId , entityId ) ;
if ( tpi . isMyPartition ( ) ) {
processCalculatedFields ( request , entityId ) ;
processCalculatedFields ( request , getProfileId ( tenantId , entityId ) ) ;
Map < TopicPartitionInfo , List < CalculatedFieldEntityCtxId > > tpiStatesToUpdate = new HashMap < > ( ) ;
processCalculatedFieldLinks ( request , tpiStatesToUpdate ) ;
if ( ! tpiStatesToUpdate . isEmpty ( ) ) {
tpiStatesToUpdate . forEach ( ( topicPartitionInfo , ctxIds ) - > {
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto ( proto , ctxIds ) ;
clusterService . pushMsgToCalculatedFields ( topicPartitionInfo , UUID . randomUUID ( ) , ToCalculatedFieldMsg . newBuilder ( ) . setLinkedTelemetryMsg ( linkedTelemetryMsgProto ) . build ( ) , null ) ;
} ) ;
}
} else {
clusterService . pushMsgToCalculatedFields ( tpi , UUID . randomUUID ( ) , ToCalculatedFieldMsg . newBuilder ( ) . setTelemetryMsg ( proto ) . build ( ) , null ) ;
}
}
} catch ( Exception e ) {
log . trace ( "Failed to update telemetry." , e ) ;
}
}
private void processCalculatedFields ( CalculatedFieldTelemetryUpdateRequest request , EntityId cfTargetEntityId ) {
if ( cfTargetEntityId ! = null ) {
calculatedFieldCache . getCalculatedFieldCtxsByEntityId ( cfTargetEntityId ) . forEach ( ctx - > {
@ -483,7 +517,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private void processCalculatedFieldLink ( CalculatedFieldTelemetryUpdateRequest request , EntityId targetEntity , CalculatedFieldCtx ctx , Map < TopicPartitionInfo , List < CalculatedFieldEntityCtxId > > tpiStates ) {
TopicPartitionInfo targetEntityTpi = partitionService . resolve ( ServiceType . TB_RULE_ENGINE , request . getTenantId ( ) , targetEntity ) ;
TopicPartitionInfo targetEntityTpi = partitionService . resolve ( CALCULATED_FIELD_QUEUE_KEY , targetEntity ) ;
if ( targetEntityTpi . isMyPartition ( ) ) {
Map < String , KvEntry > updatedTelemetry = request . getMappedTelemetry ( ctx , request . getEntityId ( ) ) ;
if ( ! updatedTelemetry . isEmpty ( ) ) {
@ -495,31 +529,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
@Override
public void onTelemetryUpdate ( CalculatedFieldLinkedTelemetryMsgProto proto , TbCallback callback ) {
try {
CalculatedFieldTelemetryUpdateRequest request = fromProto ( proto . getMsg ( ) ) ;
if ( proto . getLinksList ( ) . isEmpty ( ) ) {
onTelemetryUpdate ( proto , callback ) ;
return ;
}
proto . getLinksList ( ) . forEach ( ctxIdProto - > {
CalculatedFieldId calculatedFieldId = new CalculatedFieldId ( new UUID ( ctxIdProto . getCalculatedFieldIdMSB ( ) , ctxIdProto . getCalculatedFieldIdLSB ( ) ) ) ;
CalculatedFieldCtx ctx = calculatedFieldCache . getCalculatedFieldCtx ( calculatedFieldId ) ;
Map < String , KvEntry > updatedTelemetry = request . getMappedTelemetry ( ctx , request . getEntityId ( ) ) ;
if ( ! updatedTelemetry . isEmpty ( ) ) {
EntityId targetEntityId = EntityIdFactory . getByTypeAndUuid ( ctxIdProto . getEntityType ( ) , new UUID ( ctxIdProto . getEntityIdMSB ( ) , ctxIdProto . getEntityIdLSB ( ) ) ) ;
executeTelemetryUpdate ( ctx , targetEntityId , request . getPreviousCalculatedFieldIds ( ) , updatedTelemetry ) ;
}
} ) ;
} catch ( Exception e ) {
log . trace ( "Failed to process telemetry update msg: [{}]" , proto , e ) ;
}
}
private void executeTelemetryUpdate ( CalculatedFieldCtx cfCtx , EntityId entityId , List < CalculatedFieldId > previousCalculatedFieldIds , Map < String , KvEntry > updatedTelemetry ) {
log . info ( "Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]" , cfCtx . getTenantId ( ) , entityId , cfCtx . getCfId ( ) ) ;
Map < String , ArgumentEntry > argumentValues = updatedTelemetry . entrySet ( ) . stream ( )
@ -534,7 +543,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
TenantId tenantId = TenantId . fromUUID ( new UUID ( proto . getTenantIdMSB ( ) , proto . getTenantIdLSB ( ) ) ) ;
EntityId entityId = EntityIdFactory . getByTypeAndUuid ( proto . getEntityType ( ) , new UUID ( proto . getEntityIdMSB ( ) , proto . getEntityIdLSB ( ) ) ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGINE , tenantId , entityId ) ;
TopicPartitionInfo tpi = partitionService . resolve ( CALCULATED_FIELD_QUEUE_KEY , entityId ) ;
if ( tpi . isMyPartition ( ) ) {
log . info ( "Received CalculatedFieldEntityUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]" , tenantId , entityId ) ;
if ( proto . getDeleted ( ) ) {
@ -824,7 +833,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
List < Long > versions = result . getVersions ( ) ;
for ( int i = 0 ; i < entries . size ( ) ; i + + ) {
long tsVersion = versions . get ( i ) ;
TsKvProto tsProto = ProtoUtils . toTsKvProto ( entries . get ( i ) ) . toBuilder ( ) . setVersion ( tsVersion ) . build ( ) ;
TsKvProto tsProto = toTsKvProto ( entries . get ( i ) ) . toBuilder ( ) . setVersion ( tsVersion ) . build ( ) ;
telemetryMsg . addTsData ( tsProto ) ;
}
msg . setTelemetryMsg ( telemetryMsg . build ( ) ) ;
@ -858,8 +867,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
telemetryMsg . setEntityIdMSB ( entityId . getId ( ) . getMostSignificantBits ( ) ) ;
telemetryMsg . setEntityIdLSB ( entityId . getId ( ) . getLeastSignificantBits ( ) ) ;
for ( CalculatedFieldId cfId : calculatedFieldIds ) {
telemetryMsg . addPreviousCalculatedFields ( toProto ( cfId ) ) ;
if ( calculatedFieldIds ! = null ) {
for ( CalculatedFieldId cfId : calculatedFieldIds ) {
telemetryMsg . addPreviousCalculatedFields ( toProto ( cfId ) ) ;
}
}
return telemetryMsg ;