@ -84,7 +84,6 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
@ -98,6 +97,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream ;
import static org.thingsboard.server.common.data.DataConstants.SCOPE ;
import static org.thingsboard.server.common.util.ProtoUtils.fromObjectProto ;
import static org.thingsboard.server.common.util.ProtoUtils.toObjectProto ;
@TbCoreComponent
@Service
@ -229,6 +230,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
TenantId tenantId = TenantId . fromUUID ( new UUID ( proto . getTenantIdMSB ( ) , proto . getTenantIdLSB ( ) ) ) ;
CalculatedFieldId calculatedFieldId = new CalculatedFieldId ( new UUID ( proto . getCalculatedFieldIdMSB ( ) , proto . getCalculatedFieldIdLSB ( ) ) ) ;
log . info ( "Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]" , tenantId , calculatedFieldId ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , calculatedFieldId ) ;
if ( ! tpi . isMyPartition ( ) ) {
clusterService . pushMsgToCore ( tenantId , calculatedFieldId , TransportProtos . ToCoreMsg . newBuilder ( ) . setCalculatedFieldMsg ( proto ) . build ( ) , null ) ;
log . debug ( "[{}][{}] Calculated field belongs to external partition. Probably rebalancing is in progress. Topic: {}" , tenantId , calculatedFieldId , tpi . getFullTopicName ( ) ) ;
callback . onFailure ( new RuntimeException ( "Calculated field belongs to external partition " + tpi . getFullTopicName ( ) + "!" ) ) ;
}
if ( proto . getDeleted ( ) ) {
log . warn ( "Executing onCalculatedFieldDelete, calculatedFieldId=[{}]" , calculatedFieldId ) ;
onCalculatedFieldDelete ( tenantId , calculatedFieldId , callback ) ;
@ -277,6 +284,54 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private boolean onCalculatedFieldUpdate ( CalculatedField updatedCalculatedField , TbCallback callback ) {
CalculatedField oldCalculatedField = calculatedFieldCache . getCalculatedField ( updatedCalculatedField . getTenantId ( ) , updatedCalculatedField . getId ( ) ) ;
boolean shouldReinit = true ;
if ( hasSignificantChanges ( oldCalculatedField , updatedCalculatedField ) ) {
onCalculatedFieldDelete ( updatedCalculatedField . getTenantId ( ) , updatedCalculatedField . getId ( ) , callback ) ;
} else {
callback . onSuccess ( ) ;
shouldReinit = false ;
}
return shouldReinit ;
}
private void onCalculatedFieldDelete ( TenantId tenantId , CalculatedFieldId calculatedFieldId , TbCallback callback ) {
try {
cleanupEntity ( calculatedFieldId ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , calculatedFieldId ) ;
Set < CalculatedFieldId > calculatedFieldIds = partitionedEntities . get ( tpi ) ;
if ( calculatedFieldIds ! = null ) {
calculatedFieldIds . remove ( calculatedFieldId ) ;
}
calculatedFieldCache . evict ( calculatedFieldId ) ;
states . keySet ( ) . removeIf ( ctxId - > ctxId . cfId ( ) . equals ( calculatedFieldId . getId ( ) ) ) ;
List < String > statesToRemove = states . keySet ( ) . stream ( )
. filter ( ctxId - > ctxId . cfId ( ) . equals ( calculatedFieldId . getId ( ) ) )
. map ( JacksonUtil : : writeValueAsString )
. toList ( ) ;
rocksDBService . deleteAll ( statesToRemove ) ;
} catch ( Exception e ) {
log . trace ( "Failed to delete calculated field: [{}]" , calculatedFieldId , e ) ;
callback . onFailure ( e ) ;
}
}
private boolean hasSignificantChanges ( CalculatedField oldCalculatedField , CalculatedField newCalculatedField ) {
if ( oldCalculatedField = = null ) {
return true ;
}
boolean entityIdChanged = ! oldCalculatedField . getEntityId ( ) . equals ( newCalculatedField . getEntityId ( ) ) ;
boolean typeChanged = ! oldCalculatedField . getType ( ) . equals ( newCalculatedField . getType ( ) ) ;
CalculatedFieldConfiguration oldConfig = oldCalculatedField . getConfiguration ( ) ;
CalculatedFieldConfiguration newConfig = newCalculatedField . getConfiguration ( ) ;
boolean argumentsChanged = ! oldConfig . getArguments ( ) . equals ( newConfig . getArguments ( ) ) ;
boolean outputTypeChanged = ! oldConfig . getOutput ( ) . getType ( ) . equals ( newConfig . getOutput ( ) . getType ( ) ) ;
boolean expressionChanged = ! oldConfig . getExpression ( ) . equals ( newConfig . getExpression ( ) ) ;
return entityIdChanged | | typeChanged | | argumentsChanged | | outputTypeChanged | | expressionChanged ;
}
@Override
public void onTelemetryUpdate ( TenantId tenantId , EntityId entityId , List < CalculatedFieldId > calculatedFieldIds , List < ? extends KvEntry > telemetry ) {
try {
@ -288,8 +343,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} else if ( EntityType . DEVICE . equals ( entityType ) ) {
profileId = deviceProfileCache . get ( tenantId , ( DeviceId ) entityId ) . getId ( ) ;
}
List < CalculatedFieldLink > cfLinks = calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , entityId ) ;
Optional . ofNullable ( profileId ) . ifPresent ( id - > calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , id ) ) ;
List < CalculatedFieldLink > cfLinks = new ArrayList < > ( calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , entityId ) ) ;
Optional . ofNullable ( profileId ) . ifPresent ( id - > {
cfLinks . addAll ( calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , id ) ) ;
} ) ;
cfLinks . forEach ( link - > {
CalculatedFieldId calculatedFieldId = link . getCalculatedFieldId ( ) ;
Map < String , String > attributes = link . getConfiguration ( ) . getAttributes ( ) ;
@ -312,6 +369,23 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private String getMappedKey ( KvEntry entry , Map < String , String > attributes , Map < String , String > timeSeries ) {
if ( entry instanceof AttributeKvEntry ) {
return attributes . entrySet ( ) . stream ( )
. filter ( attr - > attr . getValue ( ) . equals ( entry . getKey ( ) ) )
. map ( Map . Entry : : getKey )
. findFirst ( )
. orElse ( entry . getKey ( ) ) ;
} else if ( entry instanceof TsKvEntry ) {
return timeSeries . entrySet ( ) . stream ( )
. filter ( ts - > ts . getValue ( ) . equals ( entry . getKey ( ) ) )
. map ( Map . Entry : : getKey )
. findFirst ( )
. orElse ( entry . getKey ( ) ) ;
}
return entry . getKey ( ) ;
}
private void executeTelemetryUpdate ( TenantId tenantId , EntityId entityId , CalculatedFieldId calculatedFieldId , List < CalculatedFieldId > calculatedFieldIds , Map < String , KvEntry > updatedTelemetry ) {
log . info ( "Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]" , tenantId , entityId , calculatedFieldId ) ;
CalculatedField calculatedField = calculatedFieldCache . getCalculatedField ( tenantId , calculatedFieldId ) ;
@ -334,75 +408,23 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
log . info ( "Successfully updated telemetry for calculatedFieldId: [{}]" , calculatedFieldId ) ;
}
private String getMappedKey ( KvEntry entry , Map < String , String > attributes , Map < String , String > timeSeries ) {
if ( entry instanceof AttributeKvEntry ) {
return attributes . entrySet ( ) . stream ( )
. filter ( attr - > attr . getValue ( ) . equals ( entry . getKey ( ) ) )
. map ( Map . Entry : : getKey )
. findFirst ( )
. orElse ( entry . getKey ( ) ) ;
} else if ( entry instanceof TsKvEntry ) {
return timeSeries . entrySet ( ) . stream ( )
. filter ( ts - > ts . getValue ( ) . equals ( entry . getKey ( ) ) )
. map ( Map . Entry : : getKey )
. findFirst ( )
. orElse ( entry . getKey ( ) ) ;
}
return entry . getKey ( ) ;
}
private Object deserializeObjectProto ( TransportProtos . ObjectProto objectProto ) {
try {
String type = objectProto . getType ( ) ;
String value = objectProto . getValue ( ) ;
return switch ( type ) {
case "java.lang.String" - > value ;
case "java.lang.Integer" - > Integer . parseInt ( value ) ;
case "java.lang.Long" - > Long . parseLong ( value ) ;
case "java.lang.Double" - > Double . parseDouble ( value ) ;
case "java.lang.Boolean" - > Boolean . parseBoolean ( value ) ;
default - > throw new IllegalArgumentException ( "Unsupported object type: " + type ) ;
} ;
} catch ( Exception e ) {
log . error ( "Failed to deserialize ObjectProto: [{}]" , objectProto , e ) ;
return null ;
}
}
@Override
public void onCalculatedFieldStateMsg ( TransportProtos . CalculatedFieldStateMsgProto proto , TbCallback callback ) {
try {
TenantId tenantId = TenantId . fromUUID ( new UUID ( proto . getTenantIdMSB ( ) , proto . getTenantIdLSB ( ) ) ) ;
CalculatedFieldId calculatedFieldId = new CalculatedFieldId ( new UUID ( proto . getCalculatedFieldIdMSB ( ) , proto . getCalculatedFieldIdLSB ( ) ) ) ;
EntityId entityId = EntityIdFactory . getByTypeAndUuid ( proto . getEntityType ( ) , new UUID ( proto . getEntityIdMSB ( ) , proto . getEntityIdLSB ( ) ) ) ;
List < CalculatedFieldId > calculatedFieldIds = new ArrayList < > ( ) ;
for ( TransportProtos . CalculatedFieldIdProto cfIdProto : proto . getCalculatedFieldsList ( ) ) {
CalculatedFieldId cfId = new CalculatedFieldId ( new UUID (
cfIdProto . getCalculatedFieldIdMSB ( ) ,
cfIdProto . getCalculatedFieldIdLSB ( )
) ) ;
calculatedFieldIds . add ( cfId ) ;
if ( proto . getClear ( ) ) {
clearState ( tenantId , calculatedFieldId , entityId ) ;
return ;
}
Map < String , ArgumentEntry > argumentsMap = new HashMap < > ( ) ;
proto . getArgumentsMap ( ) . forEach ( ( key , entryProto ) - > {
ArgumentEntry argumentEntry ;
if ( entryProto . hasTsRecords ( ) ) {
TsRollingArgumentEntry tsRollingArgumentEntry = new TsRollingArgumentEntry ( ) ;
entryProto . getTsRecords ( ) . getTsRecordsMap ( ) . forEach ( ( ts , objectProto ) - > {
Object value = deserializeObjectProto ( objectProto ) ;
tsRollingArgumentEntry . getTsRecords ( ) . put ( ts , value ) ;
} ) ;
argumentEntry = tsRollingArgumentEntry ;
} else if ( entryProto . hasSingleValue ( ) ) {
TransportProtos . SingleValueProto singleRecordProto = entryProto . getSingleValue ( ) ;
Object value = deserializeObjectProto ( singleRecordProto . getValue ( ) ) ;
argumentEntry = new SingleValueArgumentEntry ( singleRecordProto . getTs ( ) , value ) ;
} else {
throw new IllegalArgumentException ( "Unsupported ArgumentEntryProto type" ) ;
}
argumentsMap . put ( key , argumentEntry ) ;
} ) ;
List < CalculatedFieldId > calculatedFieldIds = proto . getCalculatedFieldsList ( ) . stream ( )
. map ( cfIdProto - > new CalculatedFieldId ( new UUID ( cfIdProto . getCalculatedFieldIdMSB ( ) , cfIdProto . getCalculatedFieldIdLSB ( ) ) ) )
. toList ( ) ;
Map < String , ArgumentEntry > argumentsMap = proto . getArgumentsMap ( ) . entrySet ( ) . stream ( )
. collect ( Collectors . toMap ( Map . Entry : : getKey , entry - > fromArgumentEntryProto ( entry . getValue ( ) ) ) ) ;
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache . getCalculatedFieldCtx ( tenantId , calculatedFieldId , tbelInvokeService ) ;
updateOrInitializeState ( calculatedFieldCtx , entityId , argumentsMap , calculatedFieldIds ) ;
@ -424,16 +446,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldCache . getEntitiesByProfile ( tenantId , newProfileId ) . add ( entityId ) ;
calculatedFieldService . findCalculatedFieldIdsByEntityId ( tenantId , oldProfileId )
. forEach ( cfId - > {
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , cfId ) ;
if ( tpi . isMyPartition ( ) ) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId ( cfId . getId ( ) , entityId . getId ( ) ) ;
states . remove ( ctxId ) ;
rocksDBService . delete ( JacksonUtil . writeValueAsString ( ctxId ) ) ;
} else {
sendUpdateCalculatedFieldStateMsg ( tenantId , cfId , entityId , Collections . emptyList ( ) , null ) ;
}
} ) ;
. forEach ( cfId - > clearState ( tenantId , cfId , entityId ) ) ;
initializeStateForEntityByProfile ( tenantId , entityId , newProfileId , callback ) ;
} catch ( Exception e ) {
@ -455,16 +468,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , entityId ) . stream ( ) . map ( CalculatedFieldLink : : getCalculatedFieldId ) ,
calculatedFieldCache . getCalculatedFieldLinksByEntityId ( tenantId , profileId ) . stream ( ) . map ( CalculatedFieldLink : : getCalculatedFieldId )
) . toList ( ) ;
calculatedFieldIds . forEach ( cfId - > {
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , cfId ) ;
if ( tpi . isMyPartition ( ) ) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId ( cfId . getId ( ) , entityId . getId ( ) ) ;
states . remove ( ctxId ) ;
rocksDBService . delete ( JacksonUtil . writeValueAsString ( ctxId ) ) ;
} else {
sendUpdateCalculatedFieldStateMsg ( tenantId , cfId , entityId , Collections . emptyList ( ) , null ) ;
}
} ) ;
calculatedFieldIds . forEach ( cfId - > clearState ( tenantId , cfId , entityId ) ) ;
} else {
log . info ( "Executing profile entity added msg, tenantId=[{}], entityId=[{}]" , tenantId , entityId ) ;
calculatedFieldCache . getEntitiesByProfile ( tenantId , profileId ) . add ( entityId ) ;
@ -475,94 +479,16 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private void sendUpdateCalculatedFieldStateMsg ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId , List < CalculatedFieldId > calculatedFieldIds , Map < String , ArgumentEntry > argumentValues ) {
TransportProtos . CalculatedFieldStateMsgProto . Builder msgBuilder = TransportProtos . CalculatedFieldStateMsgProto . newBuilder ( )
. setTenantIdMSB ( tenantId . getId ( ) . getMostSignificantBits ( ) )
. setTenantIdLSB ( tenantId . getId ( ) . getLeastSignificantBits ( ) )
. setCalculatedFieldIdMSB ( calculatedFieldId . getId ( ) . getMostSignificantBits ( ) )
. setCalculatedFieldIdLSB ( calculatedFieldId . getId ( ) . getLeastSignificantBits ( ) )
. setEntityType ( entityId . getEntityType ( ) . name ( ) )
. setEntityIdMSB ( entityId . getId ( ) . getMostSignificantBits ( ) )
. setEntityIdLSB ( entityId . getId ( ) . getLeastSignificantBits ( ) ) ;
if ( argumentValues ! = null ) {
argumentValues . forEach ( ( key , argumentEntry ) - > {
TransportProtos . ArgumentEntryProto . Builder argumentEntryProtoBuilder = TransportProtos . ArgumentEntryProto . newBuilder ( ) ;
if ( argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry ) {
TransportProtos . TsRollingProto . Builder tsRollingProtoBuilder = TransportProtos . TsRollingProto . newBuilder ( ) ;
tsRollingArgumentEntry . getTsRecords ( ) . forEach ( ( ts , value ) - > {
TransportProtos . ObjectProto . Builder objectProtoBuilder = TransportProtos . ObjectProto . newBuilder ( )
. setType ( value . getClass ( ) . getName ( ) )
. setValue ( value . toString ( ) ) ;
tsRollingProtoBuilder . putTsRecords ( ts , objectProtoBuilder . build ( ) ) ;
} ) ;
argumentEntryProtoBuilder . setTsRecords ( tsRollingProtoBuilder . build ( ) ) ;
} else if ( argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry ) {
TransportProtos . SingleValueProto . Builder singleRecordProtoBuilder = TransportProtos . SingleValueProto . newBuilder ( )
. setTs ( singleValueArgumentEntry . getTs ( ) )
. setValue ( TransportProtos . ObjectProto . newBuilder ( )
. setType ( singleValueArgumentEntry . getValue ( ) . getClass ( ) . getName ( ) )
. setValue ( singleValueArgumentEntry . getValue ( ) . toString ( ) )
. build ( ) ) ;
argumentEntryProtoBuilder . setSingleValue ( singleRecordProtoBuilder . build ( ) ) ;
}
msgBuilder . putArguments ( key , argumentEntryProtoBuilder . build ( ) ) ;
} ) ;
}
clusterService . pushMsgToCore ( tenantId , calculatedFieldId , TransportProtos . ToCoreMsg . newBuilder ( ) . setCalculatedFieldStateMsg ( msgBuilder ) . build ( ) , null ) ;
}
private boolean onCalculatedFieldUpdate ( CalculatedField updatedCalculatedField , TbCallback callback ) {
CalculatedField oldCalculatedField = calculatedFieldCache . getCalculatedField ( updatedCalculatedField . getTenantId ( ) , updatedCalculatedField . getId ( ) ) ;
boolean shouldReinit = true ;
if ( hasSignificantChanges ( oldCalculatedField , updatedCalculatedField ) ) {
onCalculatedFieldDelete ( updatedCalculatedField . getTenantId ( ) , updatedCalculatedField . getId ( ) , callback ) ;
private void clearState ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId ) {
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , calculatedFieldId ) ;
if ( tpi . isMyPartition ( ) ) {
log . warn ( "Executing clearState, calculatedFieldId=[{}], entityId=[{}]" , calculatedFieldId , entityId ) ;
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId ( calculatedFieldId . getId ( ) , entityId . getId ( ) ) ;
states . remove ( ctxId ) ;
rocksDBService . delete ( JacksonUtil . writeValueAsString ( ctxId ) ) ;
} else {
callback . onSuccess ( ) ;
shouldReinit = false ;
sendClearCalculatedFieldStateMsg ( tenantId , calculatedFieldId , entityId ) ;
}
return shouldReinit ;
}
private void onCalculatedFieldDelete ( TenantId tenantId , CalculatedFieldId calculatedFieldId , TbCallback callback ) {
try {
cleanupEntity ( calculatedFieldId ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_CORE , tenantId , calculatedFieldId ) ;
Set < CalculatedFieldId > calculatedFieldIds = partitionedEntities . get ( tpi ) ;
if ( calculatedFieldIds ! = null ) {
calculatedFieldIds . remove ( calculatedFieldId ) ;
}
calculatedFieldCache . evict ( calculatedFieldId ) ;
states . keySet ( ) . removeIf ( ctxId - > ctxId . cfId ( ) . equals ( calculatedFieldId . getId ( ) ) ) ;
List < String > statesToRemove = states . keySet ( ) . stream ( )
. filter ( ctxId - > ctxId . cfId ( ) . equals ( calculatedFieldId . getId ( ) ) )
. map ( JacksonUtil : : writeValueAsString )
. toList ( ) ;
rocksDBService . deleteAll ( statesToRemove ) ;
} catch ( Exception e ) {
log . trace ( "Failed to delete calculated field: [{}]" , calculatedFieldId , e ) ;
callback . onFailure ( e ) ;
}
}
private boolean hasSignificantChanges ( CalculatedField oldCalculatedField , CalculatedField newCalculatedField ) {
if ( oldCalculatedField = = null ) {
return true ;
}
boolean entityIdChanged = ! oldCalculatedField . getEntityId ( ) . equals ( newCalculatedField . getEntityId ( ) ) ;
boolean typeChanged = ! oldCalculatedField . getType ( ) . equals ( newCalculatedField . getType ( ) ) ;
CalculatedFieldConfiguration oldConfig = oldCalculatedField . getConfiguration ( ) ;
CalculatedFieldConfiguration newConfig = newCalculatedField . getConfiguration ( ) ;
boolean argumentsChanged = ! oldConfig . getArguments ( ) . equals ( newConfig . getArguments ( ) ) ;
boolean outputTypeChanged = ! oldConfig . getOutput ( ) . getType ( ) . equals ( newConfig . getOutput ( ) . getType ( ) ) ;
boolean expressionChanged = ! oldConfig . getExpression ( ) . equals ( newConfig . getExpression ( ) ) ;
return entityIdChanged | | typeChanged | | argumentsChanged | | outputTypeChanged | | expressionChanged ;
}
private void initializeStateForEntityByProfile ( TenantId tenantId , EntityId entityId , EntityId profileId , TbCallback callback ) {
@ -605,82 +531,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} , calculatedFieldCallbackExecutor ) ;
}
private ListenableFuture < Void > fetchArguments ( TenantId tenantId , EntityId entityId , Map < String , Argument > necessaryArguments , Consumer < Map < String , ArgumentEntry > > onComplete ) {
Map < String , ArgumentEntry > argumentValues = new HashMap < > ( ) ;
List < ListenableFuture < ArgumentEntry > > futures = new ArrayList < > ( ) ;
necessaryArguments . forEach ( ( key , argument ) - > {
futures . add ( Futures . transform ( fetchArgumentValue ( tenantId , entityId , argument ) ,
result - > {
argumentValues . put ( key , result ) ;
return result ;
} , calculatedFieldCallbackExecutor ) ) ;
} ) ;
return Futures . transform ( Futures . allAsList ( futures ) , results - > {
onComplete . accept ( argumentValues ) ;
return null ;
} , calculatedFieldCallbackExecutor ) ;
}
private ListenableFuture < ArgumentEntry > fetchArgumentValue ( TenantId tenantId , EntityId targetEntityId , Argument argument ) {
EntityId argumentEntityId = argument . getEntityId ( ) ;
EntityId entityId = isProfileEntity ( argumentEntityId )
? targetEntityId
: argumentEntityId ;
return fetchKvEntry ( tenantId , entityId , argument ) ;
}
private ListenableFuture < ArgumentEntry > fetchKvEntry ( TenantId tenantId , EntityId entityId , Argument argument ) {
return switch ( argument . getType ( ) ) {
case "TS_ROLLING" - > fetchTsRolling ( tenantId , entityId , argument ) ;
case "ATTRIBUTE" - > transformSingleValueArgument (
Futures . transform (
attributesService . find ( tenantId , entityId , argument . getScope ( ) , argument . getKey ( ) ) ,
result - > result . or ( ( ) - > Optional . of ( new BaseAttributeKvEntry ( System . currentTimeMillis ( ) , createDefaultKvEntry ( argument ) ) ) ) ,
calculatedFieldCallbackExecutor )
) ;
case "TS_LATEST" - > transformSingleValueArgument (
Futures . transform (
timeseriesService . findLatest ( tenantId , entityId , argument . getKey ( ) ) ,
result - > result . or ( ( ) - > Optional . of ( new BasicTsKvEntry ( System . currentTimeMillis ( ) , createDefaultKvEntry ( argument ) ) ) ) ,
calculatedFieldCallbackExecutor ) ) ;
default - > throw new IllegalArgumentException ( "Invalid argument type '" + argument . getType ( ) + "'." ) ;
} ;
}
private ListenableFuture < ArgumentEntry > fetchTsRolling ( TenantId tenantId , EntityId entityId , Argument argument ) {
long currentTime = System . currentTimeMillis ( ) ;
long timeWindow = argument . getTimeWindow ( ) = = 0 ? System . currentTimeMillis ( ) : argument . getTimeWindow ( ) ;
long startTs = currentTime - timeWindow ;
int limit = argument . getLimit ( ) = = 0 ? MAX_LAST_RECORDS_VALUE : argument . getLimit ( ) ;
ReadTsKvQuery query = new BaseReadTsKvQuery ( argument . getKey ( ) , startTs , currentTime , 0 , limit , Aggregation . NONE ) ;
ListenableFuture < List < TsKvEntry > > tsRollingFuture = timeseriesService . findAll ( tenantId , entityId , List . of ( query ) ) ;
return Futures . transform ( tsRollingFuture , tsRolling - > tsRolling = = null ? TsRollingArgumentEntry . EMPTY : ArgumentEntry . createTsRollingArgument ( tsRolling ) , calculatedFieldCallbackExecutor ) ;
}
private ListenableFuture < ArgumentEntry > transformSingleValueArgument ( ListenableFuture < Optional < ? extends KvEntry > > kvEntryFuture ) {
return Futures . transform ( kvEntryFuture , kvEntry - > {
if ( kvEntry . isPresent ( ) & & kvEntry . get ( ) . getValue ( ) ! = null ) {
return ArgumentEntry . createSingleValueArgument ( kvEntry . get ( ) ) ;
} else {
return SingleValueArgumentEntry . EMPTY ;
}
} , calculatedFieldCallbackExecutor ) ;
}
private KvEntry createDefaultKvEntry ( Argument argument ) {
String key = argument . getKey ( ) ;
String defaultValue = argument . getDefaultValue ( ) ;
if ( NumberUtils . isParsable ( defaultValue ) ) {
return new DoubleDataEntry ( key , Double . parseDouble ( defaultValue ) ) ;
}
if ( "true" . equalsIgnoreCase ( defaultValue ) | | "false" . equalsIgnoreCase ( defaultValue ) ) {
return new BooleanDataEntry ( key , Boolean . parseBoolean ( defaultValue ) ) ;
}
return new StringDataEntry ( key , defaultValue ) ;
}
private void updateOrInitializeState ( CalculatedFieldCtx calculatedFieldCtx , EntityId entityId , Map < String , ArgumentEntry > argumentValues , List < CalculatedFieldId > calculatedFieldIds ) {
TenantId tenantId = calculatedFieldCtx . getTenantId ( ) ;
CalculatedFieldId cfId = calculatedFieldCtx . getCfId ( ) ;
@ -742,14 +592,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} , MoreExecutors . directExecutor ( ) ) ;
}
private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState ( CalculatedFieldEntityCtxId entityCtxId , CalculatedFieldType cfType ) {
String stateStr = rocksDBService . get ( JacksonUtil . writeValueAsString ( entityCtxId ) ) ;
if ( stateStr = = null ) {
return new CalculatedFieldEntityCtx ( entityCtxId , createStateByType ( cfType ) ) ;
}
return JacksonUtil . fromString ( stateStr , CalculatedFieldEntityCtx . class ) ;
}
private void pushMsgToRuleEngine ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId originatorId , CalculatedFieldResult calculatedFieldResult , List < CalculatedFieldId > calculatedFieldIds ) {
try {
String type = calculatedFieldResult . getType ( ) ;
@ -770,6 +612,166 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private ListenableFuture < Void > fetchArguments ( TenantId tenantId , EntityId entityId , Map < String , Argument > necessaryArguments , Consumer < Map < String , ArgumentEntry > > onComplete ) {
Map < String , ArgumentEntry > argumentValues = new HashMap < > ( ) ;
List < ListenableFuture < ArgumentEntry > > futures = new ArrayList < > ( ) ;
necessaryArguments . forEach ( ( key , argument ) - > {
futures . add ( Futures . transform ( fetchArgumentValue ( tenantId , entityId , argument ) ,
result - > {
argumentValues . put ( key , result ) ;
return result ;
} , calculatedFieldCallbackExecutor ) ) ;
} ) ;
return Futures . transform ( Futures . allAsList ( futures ) , results - > {
onComplete . accept ( argumentValues ) ;
return null ;
} , calculatedFieldCallbackExecutor ) ;
}
private ListenableFuture < ArgumentEntry > fetchArgumentValue ( TenantId tenantId , EntityId targetEntityId , Argument argument ) {
EntityId argumentEntityId = argument . getEntityId ( ) ;
EntityId entityId = isProfileEntity ( argumentEntityId )
? targetEntityId
: argumentEntityId ;
return fetchKvEntry ( tenantId , entityId , argument ) ;
}
private ListenableFuture < ArgumentEntry > fetchKvEntry ( TenantId tenantId , EntityId entityId , Argument argument ) {
return switch ( argument . getType ( ) ) {
case "TS_ROLLING" - > fetchTsRolling ( tenantId , entityId , argument ) ;
case "ATTRIBUTE" - > transformSingleValueArgument (
Futures . transform (
attributesService . find ( tenantId , entityId , argument . getScope ( ) , argument . getKey ( ) ) ,
result - > result . or ( ( ) - > Optional . of ( new BaseAttributeKvEntry ( System . currentTimeMillis ( ) , createDefaultKvEntry ( argument ) ) ) ) ,
calculatedFieldCallbackExecutor )
) ;
case "TS_LATEST" - > transformSingleValueArgument (
Futures . transform (
timeseriesService . findLatest ( tenantId , entityId , argument . getKey ( ) ) ,
result - > result . or ( ( ) - > Optional . of ( new BasicTsKvEntry ( System . currentTimeMillis ( ) , createDefaultKvEntry ( argument ) ) ) ) ,
calculatedFieldCallbackExecutor ) ) ;
default - > throw new IllegalArgumentException ( "Invalid argument type '" + argument . getType ( ) + "'." ) ;
} ;
}
private ListenableFuture < ArgumentEntry > transformSingleValueArgument ( ListenableFuture < Optional < ? extends KvEntry > > kvEntryFuture ) {
return Futures . transform ( kvEntryFuture , kvEntry - > {
if ( kvEntry . isPresent ( ) & & kvEntry . get ( ) . getValue ( ) ! = null ) {
return ArgumentEntry . createSingleValueArgument ( kvEntry . get ( ) ) ;
} else {
return SingleValueArgumentEntry . EMPTY ;
}
} , calculatedFieldCallbackExecutor ) ;
}
private ListenableFuture < ArgumentEntry > fetchTsRolling ( TenantId tenantId , EntityId entityId , Argument argument ) {
long currentTime = System . currentTimeMillis ( ) ;
long timeWindow = argument . getTimeWindow ( ) = = 0 ? System . currentTimeMillis ( ) : argument . getTimeWindow ( ) ;
long startTs = currentTime - timeWindow ;
int limit = argument . getLimit ( ) = = 0 ? MAX_LAST_RECORDS_VALUE : argument . getLimit ( ) ;
ReadTsKvQuery query = new BaseReadTsKvQuery ( argument . getKey ( ) , startTs , currentTime , 0 , limit , Aggregation . NONE ) ;
ListenableFuture < List < TsKvEntry > > tsRollingFuture = timeseriesService . findAll ( tenantId , entityId , List . of ( query ) ) ;
return Futures . transform ( tsRollingFuture , tsRolling - > tsRolling = = null ? TsRollingArgumentEntry . EMPTY : ArgumentEntry . createTsRollingArgument ( tsRolling ) , calculatedFieldCallbackExecutor ) ;
}
private void sendUpdateCalculatedFieldStateMsg ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId , List < CalculatedFieldId > calculatedFieldIds , Map < String , ArgumentEntry > argumentValues ) {
TransportProtos . CalculatedFieldStateMsgProto . Builder msgBuilder = createBaseCalculatedFieldStateMsg ( tenantId , calculatedFieldId , entityId ) ;
if ( argumentValues ! = null ) {
argumentValues . forEach ( ( key , argumentEntry ) - > msgBuilder . putArguments ( key , toArgumentEntryProto ( argumentEntry ) ) ) ;
}
if ( calculatedFieldIds ! = null ) {
calculatedFieldIds . forEach ( cfId - > msgBuilder . addCalculatedFields (
TransportProtos . CalculatedFieldIdProto . newBuilder ( )
. setCalculatedFieldIdMSB ( cfId . getId ( ) . getMostSignificantBits ( ) )
. setCalculatedFieldIdLSB ( cfId . getId ( ) . getLeastSignificantBits ( ) )
. build ( )
) ) ;
}
clusterService . pushMsgToCore ( tenantId , calculatedFieldId , TransportProtos . ToCoreMsg . newBuilder ( ) . setCalculatedFieldStateMsg ( msgBuilder ) . build ( ) , null ) ;
}
private void sendClearCalculatedFieldStateMsg ( TenantId tenantId , CalculatedFieldId calculatedFieldId , EntityId entityId ) {
TransportProtos . CalculatedFieldStateMsgProto msg = createBaseCalculatedFieldStateMsg ( tenantId , calculatedFieldId , entityId )
. setClear ( true )
. build ( ) ;
clusterService . pushMsgToCore ( tenantId , calculatedFieldId , TransportProtos . ToCoreMsg . newBuilder ( ) . setCalculatedFieldStateMsg ( msg ) . build ( ) , null ) ;
}
private TransportProtos . CalculatedFieldStateMsgProto . Builder createBaseCalculatedFieldStateMsg (
TenantId tenantId ,
CalculatedFieldId calculatedFieldId ,
EntityId entityId
) {
return TransportProtos . CalculatedFieldStateMsgProto . newBuilder ( )
. setTenantIdMSB ( tenantId . getId ( ) . getMostSignificantBits ( ) )
. setTenantIdLSB ( tenantId . getId ( ) . getLeastSignificantBits ( ) )
. setCalculatedFieldIdMSB ( calculatedFieldId . getId ( ) . getMostSignificantBits ( ) )
. setCalculatedFieldIdLSB ( calculatedFieldId . getId ( ) . getLeastSignificantBits ( ) )
. setEntityType ( entityId . getEntityType ( ) . name ( ) )
. setEntityIdMSB ( entityId . getId ( ) . getMostSignificantBits ( ) )
. setEntityIdLSB ( entityId . getId ( ) . getLeastSignificantBits ( ) ) ;
}
private TransportProtos . ArgumentEntryProto toArgumentEntryProto ( ArgumentEntry argumentEntry ) {
TransportProtos . ArgumentEntryProto . Builder argumentProtoBuilder = TransportProtos . ArgumentEntryProto . newBuilder ( ) ;
if ( argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry ) {
TransportProtos . TsRollingProto . Builder tsRollingProtoBuilder = TransportProtos . TsRollingProto . newBuilder ( ) ;
tsRollingArgumentEntry . getTsRecords ( ) . forEach ( ( ts , value ) - >
tsRollingProtoBuilder . putTsRecords ( ts , toObjectProto ( value ) )
) ;
argumentProtoBuilder . setTsRecords ( tsRollingProtoBuilder . build ( ) ) ;
} else if ( argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry ) {
argumentProtoBuilder . setSingleValue (
TransportProtos . SingleValueProto . newBuilder ( )
. setTs ( singleValueArgumentEntry . getTs ( ) )
. setValue ( toObjectProto ( singleValueArgumentEntry . getValue ( ) ) )
. build ( )
) ;
}
return argumentProtoBuilder . build ( ) ;
}
private ArgumentEntry fromArgumentEntryProto ( TransportProtos . ArgumentEntryProto entryProto ) {
if ( entryProto . hasTsRecords ( ) ) {
TsRollingArgumentEntry tsRollingArgumentEntry = new TsRollingArgumentEntry ( ) ;
entryProto . getTsRecords ( ) . getTsRecordsMap ( ) . forEach ( ( ts , objectProto ) - >
tsRollingArgumentEntry . getTsRecords ( ) . put ( ts , fromObjectProto ( objectProto ) )
) ;
return tsRollingArgumentEntry ;
} else if ( entryProto . hasSingleValue ( ) ) {
TransportProtos . SingleValueProto singleValueProto = entryProto . getSingleValue ( ) ;
return new SingleValueArgumentEntry ( singleValueProto . getTs ( ) , fromObjectProto ( singleValueProto . getValue ( ) ) ) ;
} else {
throw new IllegalArgumentException ( "Unsupported ArgumentEntryProto type" ) ;
}
}
private KvEntry createDefaultKvEntry ( Argument argument ) {
String key = argument . getKey ( ) ;
String defaultValue = argument . getDefaultValue ( ) ;
if ( NumberUtils . isParsable ( defaultValue ) ) {
return new DoubleDataEntry ( key , Double . parseDouble ( defaultValue ) ) ;
}
if ( "true" . equalsIgnoreCase ( defaultValue ) | | "false" . equalsIgnoreCase ( defaultValue ) ) {
return new BooleanDataEntry ( key , Boolean . parseBoolean ( defaultValue ) ) ;
}
return new StringDataEntry ( key , defaultValue ) ;
}
private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState ( CalculatedFieldEntityCtxId entityCtxId , CalculatedFieldType cfType ) {
String stateStr = rocksDBService . get ( JacksonUtil . writeValueAsString ( entityCtxId ) ) ;
if ( stateStr = = null ) {
return new CalculatedFieldEntityCtx ( entityCtxId , createStateByType ( cfType ) ) ;
}
return JacksonUtil . fromString ( stateStr , CalculatedFieldEntityCtx . class ) ;
}
private ObjectNode createJsonPayload ( CalculatedFieldResult calculatedFieldResult ) {
ObjectNode payload = JacksonUtil . newObjectNode ( ) ;
Map < String , Object > resultMap = calculatedFieldResult . getResultMap ( ) ;