|
|
|
@ -52,9 +52,8 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.aggregation.AggArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelaredEntitiesAggregationCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; |
|
|
|
@ -227,17 +226,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); |
|
|
|
var state = states.get(ctx.getCfId()); |
|
|
|
try { |
|
|
|
boolean justRestored = false; |
|
|
|
Map<String, ArgumentEntry> updatedArgs = new HashMap<>(); |
|
|
|
if (state == null) { |
|
|
|
state = createState(ctx); |
|
|
|
justRestored = true; |
|
|
|
} else { |
|
|
|
if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) { |
|
|
|
Map<String, ArgumentEntry> fetchedArgs = cfService.fetchArgsFromDb(tenantId, msg.getRelatedEntityId(), ctx.getArguments()); |
|
|
|
updatedArgs = relatedEntitiesAggState.updateEntityData(toAggSingleEntityArguments(msg.getRelatedEntityId(), fetchedArgs)); |
|
|
|
} |
|
|
|
|
|
|
|
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); |
|
|
|
} |
|
|
|
if (state.isSizeOk()) { |
|
|
|
Map<String, ArgumentEntry> updatedArgs = new HashMap<>(); |
|
|
|
if (!justRestored) { |
|
|
|
updatedArgs = updateAggregationState(msg.getRelatedEntityId(), state, ctx); |
|
|
|
} |
|
|
|
processStateIfReady(state, updatedArgs, ctx, new ArrayList<>(), null, null, callback); |
|
|
|
processStateIfReady(state, updatedArgs, ctx, Collections.singletonList(ctx.getCfId()), null, null, callback); |
|
|
|
} else { |
|
|
|
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(); |
|
|
|
} |
|
|
|
@ -250,19 +251,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Map<String, ArgumentEntry> updateAggregationState(EntityId relatedEntityId, CalculatedFieldState state, CalculatedFieldCtx ctx) { |
|
|
|
Map<String, ArgumentEntry> fetchedArgs = fetchAggArguments(ctx, relatedEntityId); |
|
|
|
Map<String, ArgumentEntry> updatedArgs = state.update(fetchedArgs, ctx); |
|
|
|
|
|
|
|
if (state instanceof RelaredEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) { |
|
|
|
relatedEntitiesAggState.setLastMetricsEvalTs(-1); |
|
|
|
} |
|
|
|
|
|
|
|
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); |
|
|
|
|
|
|
|
return updatedArgs; |
|
|
|
} |
|
|
|
|
|
|
|
private void handleRelationDelete(CalculatedFieldRelatedEntityMsg msg) throws CalculatedFieldException { |
|
|
|
CalculatedFieldCtx ctx = msg.getCalculatedField(); |
|
|
|
CalculatedFieldId cfId = ctx.getCfId(); |
|
|
|
@ -271,34 +259,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
msg.getCallback().onSuccess(); |
|
|
|
return; |
|
|
|
} |
|
|
|
if (state instanceof RelaredEntitiesAggregationCalculatedFieldState aggState) { |
|
|
|
cleanupAggregationState(msg.getRelatedEntityId(), aggState); |
|
|
|
processStateIfReady(state, Collections.emptyMap(), state.getCtx(), Collections.emptyList(), null, null, msg.getCallback()); |
|
|
|
if (state instanceof RelatedEntitiesAggregationCalculatedFieldState aggState) { |
|
|
|
aggState.cleanupEntityData(msg.getRelatedEntityId()); |
|
|
|
|
|
|
|
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); |
|
|
|
|
|
|
|
if (state.isSizeOk()) { |
|
|
|
processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback()); |
|
|
|
} else { |
|
|
|
throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); |
|
|
|
} |
|
|
|
} else { |
|
|
|
// todo: log
|
|
|
|
msg.getCallback().onSuccess(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void cleanupAggregationState(EntityId relatedEntityId, RelaredEntitiesAggregationCalculatedFieldState state) { |
|
|
|
state.getArguments().values().forEach(argEntry -> { |
|
|
|
AggArgumentEntry aggEntry = (AggArgumentEntry) argEntry; |
|
|
|
aggEntry.getAggInputs().remove(relatedEntityId); |
|
|
|
}); |
|
|
|
state.getInputs().remove(relatedEntityId); |
|
|
|
state.setLastMetricsEvalTs(-1); |
|
|
|
} |
|
|
|
|
|
|
|
@SneakyThrows |
|
|
|
private Map<String, ArgumentEntry> fetchAggArguments(CalculatedFieldCtx ctx, EntityId entityId) { |
|
|
|
ListenableFuture<Map<String, ArgumentEntry>> argumentsFuture = cfService.fetchAggEntityArguments(ctx, entityId); |
|
|
|
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
|
|
|
|
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
|
|
|
|
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
|
|
|
|
// but this will significantly complicate the code.
|
|
|
|
return argumentsFuture.get(1, TimeUnit.MINUTES); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException { |
|
|
|
log.trace("[{}] Processing CF telemetry msg: {}", msg.getEntityId(), msg); |
|
|
|
var proto = msg.getProto(); |
|
|
|
@ -692,17 +668,21 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
Map<String, ArgumentEntry> fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments); |
|
|
|
|
|
|
|
if (CalculatedFieldType.RELATED_ENTITIES_AGGREGATION.equals(ctx.getCfType())) { |
|
|
|
fetchedArgs = fetchedArgs.entrySet().stream() |
|
|
|
.collect(Collectors.toMap( |
|
|
|
Map.Entry::getKey, |
|
|
|
argEntry -> new AggSingleEntityArgumentEntry(entityId, argEntry.getValue()) |
|
|
|
)); |
|
|
|
fetchedArgs = toAggSingleEntityArguments(entityId, fetchedArgs); |
|
|
|
} |
|
|
|
fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); |
|
|
|
|
|
|
|
return fetchedArgs; |
|
|
|
} |
|
|
|
|
|
|
|
private Map<String, ArgumentEntry> toAggSingleEntityArguments(EntityId relatedEntityId, Map<String, ArgumentEntry> fetchedArgs) { |
|
|
|
return fetchedArgs.entrySet().stream() |
|
|
|
.collect(Collectors.toMap( |
|
|
|
Map.Entry::getKey, |
|
|
|
argEntry -> new AggSingleEntityArgumentEntry(relatedEntityId, argEntry.getValue()) |
|
|
|
)); |
|
|
|
} |
|
|
|
|
|
|
|
private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { |
|
|
|
List<CalculatedFieldId> cfIds = new LinkedList<>(); |
|
|
|
for (var cfId : proto.getPreviousCalculatedFieldsList()) { |
|
|
|
|