@ -35,7 +35,6 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors ;
import org.thingsboard.script.api.tbel.TbelInvokeService ;
import org.thingsboard.server.cluster.TbClusterService ;
import org.thingsboard.server.common.data.AttributeScope ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.cf.CalculatedField ;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink ;
@ -98,7 +97,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap ;
import java.util.function.Consumer ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
import static org.thingsboard.server.common.data.DataConstants.SCOPE ;
import static org.thingsboard.server.common.util.ProtoUtils.fromObjectProto ;
@ -115,7 +113,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private final CalculatedFieldCache calculatedFieldCache ;
private final AttributesService attributesService ;
private final TimeseriesService timeseriesService ;
// private final RocksDBService rocksDBService;
private final RocksDBService rocksDBService ;
private final TbClusterService clusterService ;
private final TbelInvokeService tbelInvokeService ;
@ -168,35 +166,38 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
protected Map < TopicPartitionInfo , List < ListenableFuture < ? > > > onAddedPartitions ( Set < TopicPartitionInfo > addedPartitions ) {
var result = new HashMap < TopicPartitionInfo , List < ListenableFuture < ? > > > ( ) ;
PageDataIterable < CalculatedField > cfs = new PageDataIterable < > ( calculatedFieldService : : findAllCalculatedFields , initFetchPackSize ) ;
Map < TopicPartitionInfo , List < CalculatedField > > tpiCalculatedField Map = new HashMap < > ( ) ;
Map < TopicPartitionInfo , List < CalculatedFieldEntityCtxId > > tpiTargetEntity Map = new HashMap < > ( ) ;
for ( CalculatedField cf : cfs ) {
TopicPartitionInfo tpi ;
try {
tpi = partitionService . resolve ( ServiceType . TB_CORE , cf . getTenantId ( ) , cf . getId ( ) ) ;
} catch ( Exception e ) {
log . warn ( "Failed to resolve partition for CalculatedField [{}], tenant id [{}]. Reason: {}" ,
cf . getId ( ) , cf . getTenantId ( ) , e . getMessage ( ) ) ;
continue ;
}
if ( addedPartitions . contains ( tpi ) & & states . keySet ( ) . stream ( ) . noneMatch ( ctxId - > ctxId . cfId ( ) . equals ( cf . getId ( ) . getId ( ) ) ) ) {
tpiCalculatedFieldMap . computeIfAbsent ( tpi , k - > new ArrayList < > ( ) ) . add ( cf ) ;
Consumer < EntityId > resolvePartition = entityId - > {
TopicPartitionInfo tpi ;
try {
tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGINE , cf . getTenantId ( ) , entityId ) ;
if ( addedPartitions . contains ( tpi ) & & states . keySet ( ) . stream ( ) . noneMatch ( ctxId - > ctxId . cfId ( ) . equals ( cf . getId ( ) . getId ( ) ) ) ) {
tpiTargetEntityMap . computeIfAbsent ( tpi , k - > new ArrayList < > ( ) ) . add ( new CalculatedFieldEntityCtxId ( cf . getId ( ) . getId ( ) , entityId . getId ( ) ) ) ;
}
} catch ( Exception e ) {
log . warn ( "Failed to resolve partition for CalculatedFieldEntityCtxId: entityId=[{}], tenantId=[{}]. Reason: {}" ,
entityId , cf . getTenantId ( ) , e . getMessage ( ) ) ;
}
} ;
EntityId cfEntityId = cf . getEntityId ( ) ;
if ( isProfileEntity ( cfEntityId ) ) {
calculatedFieldCache . getEntitiesByProfile ( cf . getTenantId ( ) , cfEntityId ) . forEach ( resolvePartition ) ;
} else {
resolvePartition . accept ( cfEntityId ) ;
}
}
for ( var entry : tpiCalculatedFieldMap . entrySet ( ) ) {
for ( List < CalculatedField > partition : Lists . partition ( entry . getValue ( ) , 1000 ) ) {
for ( var entry : tpiTargetEntity Map . entrySet ( ) ) {
for ( List < CalculatedFieldEntityCtxId > partition : Lists . partition ( entry . getValue ( ) , 1000 ) ) {
log . info ( "[{}] Submit task for CalculatedFields: {}" , entry . getKey ( ) , partition . size ( ) ) ;
var future = calculatedFieldExecutor . submit ( ( ) - > {
try {
for ( CalculatedField cf : partition ) {
EntityId cfEntityId = cf . getEntityId ( ) ;
if ( isProfileEntity ( cfEntityId ) ) {
calculatedFieldCache . getEntitiesByProfile ( cf . getTenantId ( ) , cfEntityId )
. forEach ( entityId - > restoreState ( cf , entityId ) ) ;
} else {
restoreState ( cf , cfEntityId ) ;
}
for ( CalculatedFieldEntityCtxId ctxId : partition ) {
restoreState ( ctxId . cfId ( ) , ctxId . entityId ( ) ) ;
}
} catch ( Throwable t ) {
log . error ( "Unexpected exception while restoring CalculatedField states" , t ) ;
@ -209,17 +210,16 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return result ;
}
private void restoreState ( CalculatedField cf , EntityId entityId ) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId ( cf . getId ( ) . getId ( ) , entityId . getId ( ) ) ;
// String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId));
private void restoreState ( UUID calculatedFieldId , UUID entityId ) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId ( calculatedFieldId , entityId ) ;
String storedState = rocksDBService . get ( JacksonUtil . writeValueAsString ( ctxId ) ) ;
String storedState = null ;
if ( storedState ! = null ) {
CalculatedFieldEntityCtx restoredCtx = JacksonUtil . fromString ( storedState , CalculatedFieldEntityCtx . class ) ;
states . put ( ctxId , restoredCtx ) ;
log . info ( "Restored state for CalculatedField [{}]" , cf . getId ( ) ) ;
log . info ( "Restored state for CalculatedField [{}]" , calculatedFieldId ) ;
} else {
log . warn ( "No state found for CalculatedField [{}], entity [{}]." , cf . getId ( ) , entityId ) ;
log . warn ( "No state found for CalculatedField [{}], entity [{}]." , calculatedFieldId , entityId ) ;
}
}
@ -238,12 +238,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
TenantId tenantId = TenantId . fromUUID ( new UUID ( proto . getTenantIdMSB ( ) , proto . getTenantIdLSB ( ) ) ) ;
CalculatedFieldId calculatedFieldId = new CalculatedFieldId ( new UUID ( proto . getCalculatedFieldIdMSB ( ) , proto . getCalculatedFieldIdLSB ( ) ) ) ;
log . info ( "Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]" , tenantId , calculatedFieldId ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , calculatedFieldId ) ;
if ( ! tpi . isMyPartition ( ) ) {
clusterService . pushMsgToCore ( tenantId , calculatedFieldId , TransportProtos . ToCoreMsg . newBuilder ( ) . setCalculatedFieldMsg ( proto ) . build ( ) , null ) ;
log . debug ( "[{}][{}] Calculated field belongs to external partition. Probably rebalancing is in progress. Topic: {}" , tenantId , calculatedFieldId , tpi . getFullTopicName ( ) ) ;
callback . onFailure ( new RuntimeException ( "Calculated field belongs to external partition " + tpi . getFullTopicName ( ) + "!" ) ) ;
}
if ( proto . getDeleted ( ) ) {
log . warn ( "Executing onCalculatedFieldDelete, calculatedFieldId=[{}]" , calculatedFieldId ) ;
onCalculatedFieldDelete ( tenantId , calculatedFieldId , callback ) ;
@ -307,18 +301,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private void onCalculatedFieldDelete ( TenantId tenantId , CalculatedFieldId calculatedFieldId , TbCallback callback ) {
try {
cleanupEntity ( calculatedFieldId ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , calculatedFieldId ) ;
Set < CalculatedFieldId > calculatedFieldIds = partitionedEntities . get ( tpi ) ;
if ( calculatedFieldIds ! = null ) {
calculatedFieldIds . remove ( calculatedFieldId ) ;
}
// calculatedFieldCache.evict(calculatedFieldId);
states . keySet ( ) . removeIf ( ctxId - > ctxId . cfId ( ) . equals ( calculatedFieldId . getId ( ) ) ) ;
List < String > statesToRemove = states . keySet ( ) . stream ( )
. filter ( ctxId - > ctxId . cfId ( ) . equals ( calculatedFieldId . getId ( ) ) )
. map ( JacksonUtil : : writeValueAsString )
. toList ( ) ;
// rocksDBService.deleteAll(statesToRemove);
rocksDBService . deleteAll ( statesToRemove ) ;
} catch ( Exception e ) {
log . trace ( "Failed to delete calculated field: [{}]" , calculatedFieldId , e ) ;
callback . onFailure ( e ) ;
@ -345,22 +333,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
try {
TenantId tenantId = calculatedFieldTelemetryUpdateRequest . getTenantId ( ) ;
EntityId entityId = calculatedFieldTelemetryUpdateRequest . getEntityId ( ) ;
AttributeScope scope = calculatedFieldTelemetryUpdateRequest . getScope ( ) ;
List < ? extends KvEntry > telemetry = calculatedFieldTelemetryUpdateRequest . getKvEntries ( ) ;
List < CalculatedFieldId > calculatedFieldIds = calculatedFieldTelemetryUpdateRequest . getCalculatedFieldIds ( ) ;
if ( supportedReferencedEntities . contains ( entityId . getEntityType ( ) ) ) {
EntityId profileId = getProfileId ( tenantId , entityId ) ;
List < CalculatedFieldLink > cfLinks = Stream . concat (
calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , entityId ) . stream ( ) ,
profileId ! = null ? calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , profileId ) . stream ( ) : Stream . empty ( )
) . toList ( ) ;
cfLinks . forEach ( link - > {
getCalculatedFieldLinks ( tenantId , entityId , profileId ) . forEach ( link - > {
CalculatedFieldId calculatedFieldId = link . getCalculatedFieldId ( ) ;
Map < String , String > telemetryKeys = getTelemetryKeysFromLink ( link , scope ) ;
Map < String , KvEntry > updatedTelemetry = telemetry . stream ( )
Map < String , String > telemetryKeys = calculatedFieldTelemetryUpdateRequest . getTelemetryKeysFromLink ( link ) ;
Map < String , KvEntry > updatedTelemetry = calculatedFieldTelemetryUpdateRequest . getKvEntries ( ) . stream ( )
. filter ( entry - > telemetryKeys . containsValue ( entry . getKey ( ) ) )
. collect ( Collectors . toMap (
entry - > getMappedKey ( entry , telemetryKeys ) ,
@ -369,7 +349,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
) ) ;
if ( ! updatedTelemetry . isEmpty ( ) ) {
executeTelemetryUpdate ( tenantId , entityId , calculatedFieldId , calculatedFieldIds , updatedTelemetry ) ;
List < CalculatedFieldId > previousCalculatedFieldIds = calculatedFieldTelemetryUpdateRequest . getPreviousCalculatedFieldIds ( ) ;
executeTelemetryUpdate ( tenantId , entityId , calculatedFieldId , previousCalculatedFieldIds , updatedTelemetry ) ;
}
} ) ;
}
@ -378,14 +359,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private Map < String , String > getTelemetryKeysFromLink ( CalculatedFieldLink link , AttributeScope scope ) {
return scope = = null ? link . getConfiguration ( ) . getTimeSeries ( ) : switch ( scope ) {
case CLIENT_SCOPE - > link . getConfiguration ( ) . getClientAttributes ( ) ;
case SERVER_SCOPE - > link . getConfiguration ( ) . getServerAttributes ( ) ;
case SHARED_SCOPE - > link . getConfiguration ( ) . getSharedAttributes ( ) ;
} ;
}
private String getMappedKey ( KvEntry entry , Map < String , String > telemetry ) {
return telemetry . entrySet ( ) . stream ( )
. filter ( kvEntry - > kvEntry . getValue ( ) . equals ( entry . getKey ( ) ) )
@ -394,7 +367,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
. orElse ( entry . getKey ( ) ) ;
}
private void executeTelemetryUpdate ( TenantId tenantId , EntityId entityId , CalculatedFieldId calculatedFieldId , List < CalculatedFieldId > c alculatedFieldIds, Map < String , KvEntry > updatedTelemetry ) {
private void executeTelemetryUpdate ( TenantId tenantId , EntityId entityId , CalculatedFieldId calculatedFieldId , List < CalculatedFieldId > previousC alculatedFieldIds, Map < String , KvEntry > updatedTelemetry ) {
log . info ( "Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]" , tenantId , entityId , calculatedFieldId ) ;
CalculatedField calculatedField = calculatedFieldCache . getCalculatedField ( tenantId , calculatedFieldId ) ;
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache . getCalculatedFieldCtx ( tenantId , calculatedFieldId , tbelInvokeService ) ;
@ -406,12 +379,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
case ASSET_PROFILE , DEVICE_PROFILE - > {
boolean isCommonEntity = calculatedField . getConfiguration ( ) . getReferencedEntities ( ) . contains ( entityId ) ;
if ( isCommonEntity ) {
calculatedFieldCache . getEntitiesByProfile ( tenantId , cfEntityId ) . forEach ( id - > updateOrInitializeState ( calculatedFieldCtx , id , argumentValues , c alculatedFieldIds) ) ;
calculatedFieldCache . getEntitiesByProfile ( tenantId , cfEntityId ) . forEach ( id - > updateOrInitializeState ( calculatedFieldCtx , id , argumentValues , previousC alculatedFieldIds) ) ;
} else {
updateOrInitializeState ( calculatedFieldCtx , entityId , argumentValues , c alculatedFieldIds) ;
updateOrInitializeState ( calculatedFieldCtx , entityId , argumentValues , previousC alculatedFieldIds) ;
}
}
default - > updateOrInitializeState ( calculatedFieldCtx , cfEntityId , argumentValues , calculatedFieldIds ) ;
default - >
updateOrInitializeState ( calculatedFieldCtx , cfEntityId , argumentValues , previousCalculatedFieldIds ) ;
}
}
@ -427,14 +401,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return ;
}
List < CalculatedFieldId > c alculatedFieldIds = proto . getCalculatedFieldsList ( ) . stream ( )
List < CalculatedFieldId > previousC alculatedFieldIds = proto . getPrevious CalculatedFieldsList ( ) . stream ( )
. map ( cfIdProto - > new CalculatedFieldId ( new UUID ( cfIdProto . getCalculatedFieldIdMSB ( ) , cfIdProto . getCalculatedFieldIdLSB ( ) ) ) )
. collect ( Collectors . toCollection ( ArrayList : : new ) ) ;
Map < String , ArgumentEntry > argumentsMap = proto . getArgumentsMap ( ) . entrySet ( ) . stream ( )
. collect ( Collectors . toMap ( Map . Entry : : getKey , entry - > fromArgumentEntryProto ( entry . getValue ( ) ) ) ) ;
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache . getCalculatedFieldCtx ( tenantId , calculatedFieldId , tbelInvokeService ) ;
updateOrInitializeState ( calculatedFieldCtx , entityId , argumentsMap , c alculatedFieldIds) ;
updateOrInitializeState ( calculatedFieldCtx , entityId , argumentsMap , previousC alculatedFieldIds) ;
} catch ( Exception e ) {
log . trace ( "Failed to process calculated field update state msg: [{}]" , proto , e ) ;
}
@ -449,9 +423,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
EntityId newProfileId = EntityIdFactory . getByTypeAndUuid ( proto . getEntityProfileType ( ) , new UUID ( proto . getNewProfileIdMSB ( ) , proto . getNewProfileIdLSB ( ) ) ) ;
log . info ( "Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]" , tenantId , entityId ) ;
// calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId);
// calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId);
calculatedFieldService . findCalculatedFieldIdsByEntityId ( tenantId , oldProfileId )
. forEach ( cfId - > clearState ( tenantId , cfId , entityId ) ) ;
@ -470,18 +441,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
log . info ( "Received ProfileEntityMsgProto for processing: tenantId=[{}], entityId=[{}]" , tenantId , entityId ) ;
if ( proto . getDeleted ( ) ) {
log . info ( "Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]" , tenantId , entityId ) ;
// calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId);
List < CalculatedFieldId > calculatedFieldIds = Stream . concat (
calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , entityId ) . stream ( ) . map ( CalculatedFieldLink : : getCalculatedFieldId ) ,
calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , profileId ) . stream ( ) . map ( CalculatedFieldLink : : getCalculatedFieldId )
) . toList ( ) ;
calculatedFieldIds . forEach ( cfId - > clearState ( tenantId , cfId , entityId ) ) ;
getCalculatedFieldLinks ( tenantId , entityId , profileId )
. forEach ( link - > clearState ( tenantId , link . getCalculatedFieldId ( ) , entityId ) ) ;
} else {
log . info ( "Executing profile entity added msg, tenantId=[{}], entityId=[{}]" , tenantId , entityId ) ;
// calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId);
initializeStateForEntityByProfile ( tenantId , entityId , profileId , callback ) ;
}
} catch ( Exception e ) {
@ -490,12 +454,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private void clearState ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId ) {
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CO RE , tenantId , calculatedField Id) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGIN E , tenantId , entity Id) ;
if ( tpi . isMyPartition ( ) ) {
log . warn ( "Executing clearState, calculatedFieldId=[{}], entityId=[{}]" , calculatedFieldId , entityId ) ;
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId ( calculatedFieldId . getId ( ) , entityId . getId ( ) ) ;
states . remove ( ctxId ) ;
// rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
rocksDBService . delete ( JacksonUtil . writeValueAsString ( ctxId ) ) ;
} else {
sendClearCalculatedFieldStateMsg ( tenantId , calculatedFieldId , entityId ) ;
}
@ -541,12 +505,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} , calculatedFieldCallbackExecutor ) ;
}
private void updateOrInitializeState ( CalculatedFieldCtx calculatedFieldCtx , EntityId entityId , Map < String , ArgumentEntry > argumentValues , List < CalculatedFieldId > c alculatedFieldIds) {
private void updateOrInitializeState ( CalculatedFieldCtx calculatedFieldCtx , EntityId entityId , Map < String , ArgumentEntry > argumentValues , List < CalculatedFieldId > previousC alculatedFieldIds) {
TenantId tenantId = calculatedFieldCtx . getTenantId ( ) ;
CalculatedFieldId cfId = calculatedFieldCtx . getCfId ( ) ;
Map < String , ArgumentEntry > argumentsMap = new HashMap < > ( argumentValues ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CO RE , tenantId , cf Id) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGIN E , tenantId , entity Id) ;
if ( tpi . isMyPartition ( ) ) {
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId ( cfId . getId ( ) , entityId . getId ( ) ) ;
@ -559,12 +523,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
Consumer < CalculatedFieldState > performUpdateState = ( state ) - > {
if ( state . updateState ( argumentsMap ) ) {
calculatedFieldEntityCtx . setState ( state ) ;
// rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
rocksDBService . put ( JacksonUtil . writeValueAsString ( entityCtxId ) , JacksonUtil . writeValueAsString ( calculatedFieldEntityCtx ) ) ;
Map < String , ArgumentEntry > arguments = state . getArguments ( ) ;
boolean allArgsPresent = arguments . keySet ( ) . containsAll ( calculatedFieldCtx . getArguments ( ) . keySet ( ) ) & &
! arguments . containsValue ( SingleValueArgumentEntry . EMPTY ) & & ! arguments . containsValue ( TsRollingArgumentEntry . EMPTY ) ;
if ( allArgsPresent ) {
performCalculation ( calculatedFieldCtx , state , entityId , c alculatedFieldIds) ;
performCalculation ( calculatedFieldCtx , state , entityId , previousC alculatedFieldIds) ;
}
log . info ( "Successfully updated state: calculatedFieldId=[{}], entityId=[{}]" , calculatedFieldCtx . getCfId ( ) , entityId ) ;
}
@ -599,17 +563,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return calculatedFieldEntityCtx ;
} ) ;
} else {
sendUpdateCalculatedFieldStateMsg ( tenantId , cfId , entityId , c alculatedFieldIds, argumentsMap ) ;
sendUpdateCalculatedFieldStateMsg ( tenantId , cfId , entityId , previousC alculatedFieldIds, argumentsMap ) ;
}
}
private void performCalculation ( CalculatedFieldCtx calculatedFieldCtx , CalculatedFieldState state , EntityId entityId , List < CalculatedFieldId > c alculatedFieldIds) {
private void performCalculation ( CalculatedFieldCtx calculatedFieldCtx , CalculatedFieldState state , EntityId entityId , List < CalculatedFieldId > previousC alculatedFieldIds) {
ListenableFuture < CalculatedFieldResult > resultFuture = state . performCalculation ( calculatedFieldCtx ) ;
Futures . addCallback ( resultFuture , new FutureCallback < > ( ) {
@Override
public void onSuccess ( CalculatedFieldResult result ) {
if ( result ! = null ) {
pushMsgToRuleEngine ( calculatedFieldCtx . getTenantId ( ) , calculatedFieldCtx . getCfId ( ) , entityId , result , c alculatedFieldIds) ;
pushMsgToRuleEngine ( calculatedFieldCtx . getTenantId ( ) , calculatedFieldCtx . getCfId ( ) , entityId , result , previousC alculatedFieldIds) ;
}
}
@ -620,20 +584,20 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} , MoreExecutors . directExecutor ( ) ) ;
}
private void pushMsgToRuleEngine ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId originatorId , CalculatedFieldResult calculatedFieldResult , List < CalculatedFieldId > c alculatedFieldIds) {
private void pushMsgToRuleEngine ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId originatorId , CalculatedFieldResult calculatedFieldResult , List < CalculatedFieldId > previousC alculatedFieldIds) {
try {
OutputType type = calculatedFieldResult . getType ( ) ;
TbMsgType msgType = OutputType . ATTRIBUTES . equals ( type ) ? TbMsgType . POST_ATTRIBUTES_REQUEST : TbMsgType . POST_TELEMETRY_REQUEST ;
TbMsgMetaData md = OutputType . ATTRIBUTES . equals ( type ) ? new TbMsgMetaData ( Map . of ( SCOPE , calculatedFieldResult . getScope ( ) . name ( ) ) ) : TbMsgMetaData . EMPTY ;
ObjectNode payload = createJsonPayload ( calculatedFieldResult ) ;
if ( calculatedFieldIds = = null ) {
calculatedFieldIds = new ArrayList < > ( ) ;
}
if ( calculatedFieldIds . contains ( calculatedFieldId ) ) {
if ( previousCalculatedFieldIds ! = null & & previousCalculatedFieldIds . contains ( calculatedFieldId ) ) {
throw new IllegalArgumentException ( "Calculated field [" + calculatedFieldId . getId ( ) + "] refers to itself, causing an infinite loop." ) ;
}
List < CalculatedFieldId > calculatedFieldIds = previousCalculatedFieldIds ! = null
? new ArrayList < > ( previousCalculatedFieldIds )
: new ArrayList < > ( ) ;
calculatedFieldIds . add ( calculatedFieldId ) ;
TbMsg msg = TbMsg . newMsg ( ) . type ( msgType ) . originator ( originatorId ) . calculatedFieldIds ( calculatedFieldIds ) . metaData ( md ) . data ( JacksonUtil . writeValueAsString ( payload ) ) . build ( ) ;
TbMsg msg = TbMsg . newMsg ( ) . type ( msgType ) . originator ( originatorId ) . previousC alculatedFieldIds( calculatedFieldIds ) . metaData ( md ) . data ( JacksonUtil . writeValueAsString ( payload ) ) . build ( ) ;
clusterService . pushMsgToRuleEngine ( tenantId , originatorId , msg , null ) ;
log . info ( "Pushed message to rule engine: originatorId=[{}]" , originatorId ) ;
} catch ( Exception e ) {
@ -641,6 +605,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private List < CalculatedFieldLink > getCalculatedFieldLinks ( TenantId tenantId , EntityId entityId , EntityId profileId ) {
List < CalculatedFieldLink > links = new ArrayList < > ( calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , entityId ) ) ;
if ( profileId ! = null ) {
links . addAll ( calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , profileId ) ) ;
}
return links ;
}
private ListenableFuture < Void > fetchArguments ( TenantId tenantId , EntityId entityId , Map < String , Argument > necessaryArguments , Consumer < Map < String , ArgumentEntry > > onComplete ) {
Map < String , ArgumentEntry > argumentValues = new HashMap < > ( ) ;
List < ListenableFuture < ArgumentEntry > > futures = new ArrayList < > ( ) ;
@ -704,13 +676,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return Futures . transform ( tsRollingFuture , tsRolling - > tsRolling = = null ? TsRollingArgumentEntry . EMPTY : ArgumentEntry . createTsRollingArgument ( tsRolling ) , calculatedFieldCallbackExecutor ) ;
}
private void sendUpdateCalculatedFieldStateMsg ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId , List < CalculatedFieldId > c alculatedFieldIds, Map < String , ArgumentEntry > argumentValues ) {
private void sendUpdateCalculatedFieldStateMsg ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId , List < CalculatedFieldId > previousC alculatedFieldIds, Map < String , ArgumentEntry > argumentValues ) {
TransportProtos . CalculatedFieldStateMsgProto . Builder msgBuilder = createBaseCalculatedFieldStateMsg ( tenantId , calculatedFieldId , entityId ) ;
if ( argumentValues ! = null ) {
argumentValues . forEach ( ( key , argumentEntry ) - > msgBuilder . putArguments ( key , toArgumentEntryProto ( argumentEntry ) ) ) ;
}
if ( c alculatedFieldIds ! = null ) {
c alculatedFieldIds. forEach ( cfId - > msgBuilder . addCalculatedFields (
if ( previousC alculatedFieldIds ! = null ) {
previousC alculatedFieldIds. forEach ( cfId - > msgBuilder . addPrevious CalculatedFields (
TransportProtos . CalculatedFieldIdProto . newBuilder ( )
. setCalculatedFieldIdMSB ( cfId . getId ( ) . getMostSignificantBits ( ) )
. setCalculatedFieldIdLSB ( cfId . getId ( ) . getLeastSignificantBits ( ) )
@ -794,11 +766,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState ( CalculatedFieldEntityCtxId entityCtxId , CalculatedFieldType cfType ) {
// String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId));
// if (stateStr == null) {
return new CalculatedFieldEntityCtx ( entityCtxId , createStateByType ( cfType ) ) ;
// }
// return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class);
String stateStr = rocksDBService . get ( JacksonUtil . writeValueAsString ( entityCtxId ) ) ;
if ( stateStr = = null ) {
return new CalculatedFieldEntityCtx ( entityCtxId , createStateByType ( cfType ) ) ;
}
return JacksonUtil . fromString ( stateStr , CalculatedFieldEntityCtx . class ) ;
}
private ObjectNode createJsonPayload ( CalculatedFieldResult calculatedFieldResult ) {