From 2778f79e5e8a1d9262450a33edcfbc77d64e88c2 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 21 Oct 2025 12:45:29 +0300 Subject: [PATCH] minor refactoring --- ...CalculatedFieldEntityMessageProcessor.java | 80 +++++++------------ ...alculatedFieldManagerMessageProcessor.java | 4 +- ...tractCalculatedFieldProcessingService.java | 14 ---- .../cf/CalculatedFieldProcessingService.java | 2 - ...faultCalculatedFieldProcessingService.java | 5 -- .../service/cf/ctx/state/ArgumentEntry.java | 6 +- .../cf/ctx/state/ArgumentEntryType.java | 2 +- .../AggSingleEntityArgumentEntry.java | 1 - ...itiesAggregationCalculatedFieldState.java} | 64 ++++++++------- ...java => RelatedEntitiesArgumentEntry.java} | 20 ++--- .../utils/CalculatedFieldArgumentUtils.java | 4 +- .../server/utils/CalculatedFieldUtils.java | 18 ++--- ... => RelatedEntitiesArgumentEntryTest.java} | 27 ++----- 13 files changed, 96 insertions(+), 151 deletions(-) rename application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/{RelaredEntitiesAggregationCalculatedFieldState.java => RelatedEntitiesAggregationCalculatedFieldState.java} (84%) rename application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/{AggArgumentEntry.java => RelatedEntitiesArgumentEntry.java} (72%) rename application/src/test/java/org/thingsboard/server/service/cf/ctx/state/{AggArgumentEntryTest.java => RelatedEntitiesArgumentEntryTest.java} (80%) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 5a1ada08c5..434f555325 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -52,9 +52,8 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.RelaredEntitiesAggregationCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; @@ -227,17 +226,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); var state = states.get(ctx.getCfId()); try { - boolean justRestored = false; + Map updatedArgs = new HashMap<>(); if (state == null) { state = createState(ctx); - justRestored = true; + } else { + if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) { + Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, msg.getRelatedEntityId(), ctx.getArguments()); + updatedArgs = relatedEntitiesAggState.updateEntityData(toAggSingleEntityArguments(msg.getRelatedEntityId(), fetchedArgs)); + } + + state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); } if (state.isSizeOk()) { - Map updatedArgs = new HashMap<>(); - if (!justRestored) { - updatedArgs = updateAggregationState(msg.getRelatedEntityId(), state, ctx); - } - processStateIfReady(state, updatedArgs, ctx, new ArrayList<>(), null, null, callback); + processStateIfReady(state, updatedArgs, ctx, Collections.singletonList(ctx.getCfId()), null, null, callback); } else { throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(); } @@ -250,19 +251,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - private Map updateAggregationState(EntityId relatedEntityId, CalculatedFieldState state, CalculatedFieldCtx ctx) { - Map fetchedArgs = fetchAggArguments(ctx, relatedEntityId); - Map updatedArgs = state.update(fetchedArgs, ctx); - - if (state instanceof RelaredEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) { - relatedEntitiesAggState.setLastMetricsEvalTs(-1); - } - - state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); - - return updatedArgs; - } - private void handleRelationDelete(CalculatedFieldRelatedEntityMsg msg) throws CalculatedFieldException { CalculatedFieldCtx ctx = msg.getCalculatedField(); CalculatedFieldId cfId = ctx.getCfId(); @@ -271,34 +259,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM msg.getCallback().onSuccess(); return; } - if (state instanceof RelaredEntitiesAggregationCalculatedFieldState aggState) { - cleanupAggregationState(msg.getRelatedEntityId(), aggState); - processStateIfReady(state, Collections.emptyMap(), state.getCtx(), Collections.emptyList(), null, null, msg.getCallback()); + if (state instanceof RelatedEntitiesAggregationCalculatedFieldState aggState) { + aggState.cleanupEntityData(msg.getRelatedEntityId()); + + state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); + + if (state.isSizeOk()) { + processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback()); + } else { + throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); + } } else { + // todo: log msg.getCallback().onSuccess(); } } - private void cleanupAggregationState(EntityId relatedEntityId, RelaredEntitiesAggregationCalculatedFieldState state) { - state.getArguments().values().forEach(argEntry -> { - AggArgumentEntry aggEntry = (AggArgumentEntry) argEntry; - aggEntry.getAggInputs().remove(relatedEntityId); - }); - state.getInputs().remove(relatedEntityId); - state.setLastMetricsEvalTs(-1); - } - - @SneakyThrows - private Map fetchAggArguments(CalculatedFieldCtx ctx, EntityId entityId) { - ListenableFuture> argumentsFuture = cfService.fetchAggEntityArguments(ctx, entityId); - // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. - // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. - // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, - // but this will significantly complicate the code. - return argumentsFuture.get(1, TimeUnit.MINUTES); - } - - public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException { log.trace("[{}] Processing CF telemetry msg: {}", msg.getEntityId(), msg); var proto = msg.getProto(); @@ -692,17 +668,21 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments); if (CalculatedFieldType.RELATED_ENTITIES_AGGREGATION.equals(ctx.getCfType())) { - fetchedArgs = fetchedArgs.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - argEntry -> new AggSingleEntityArgumentEntry(entityId, argEntry.getValue()) - )); + fetchedArgs = toAggSingleEntityArguments(entityId, fetchedArgs); } fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); return fetchedArgs; } + private Map toAggSingleEntityArguments(EntityId relatedEntityId, Map fetchedArgs) { + return fetchedArgs.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + argEntry -> new AggSingleEntityArgumentEntry(relatedEntityId, argEntry.getValue()) + )); + } + private static List getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { List cfIds = new LinkedList<>(); for (var cfId : proto.getPreviousCalculatedFieldsList()) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 5b4528370b..7f1c7b1925 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -469,8 +469,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { EntityId entityId = msg.getEntityId(); log.debug("Received telemetry msg from entity [{}]", entityId); - // 3 = 1 for CF processing + 1 for links processing + 1 for owner entity processing - MultipleTbCallback callback = new MultipleTbCallback(3, msg.getCallback()); + // 4 = 1 for CF processing + 1 for links processing + 1 for owner entity processing + 1 for aggregation processing + MultipleTbCallback callback = new MultipleTbCallback(4, msg.getCallback()); // process all cfs related to entity, or it's profile; var entityIdFields = getCalculatedFieldsByEntityId(entityId); var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId)); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index 3ddc8e1067..0c0f2f23e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -201,20 +201,6 @@ public abstract class AbstractCalculatedFieldProcessingService { )); } - protected ListenableFuture> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { - RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig = (RelatedEntitiesAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); - - Map> argsFutures = aggConfig.getArguments().entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> fetchSingleAggArgumentEntry(ctx.getTenantId(), entityId, entry.getValue(), ts) - )); - - return Futures.whenAllComplete(argsFutures.values()) - .call(() -> resolveArgumentFutures(argsFutures), - MoreExecutors.directExecutor()); - } - private ListenableFuture> resolveGeofencingEntityIds(TenantId tenantId, EntityId entityId, Map.Entry entry) { Argument value = entry.getValue(); if (value.getRefEntityId() != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 52b3341151..a9139572b8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -33,8 +33,6 @@ public interface CalculatedFieldProcessingService { ListenableFuture> fetchArguments(CalculatedFieldCtx ctx, EntityId entityId); - ListenableFuture> fetchAggEntityArguments(CalculatedFieldCtx ctx, EntityId entityId); - Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId); Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 9074c1036f..d7957dce9b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -91,11 +91,6 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF return super.fetchArguments(ctx, entityId, System.currentTimeMillis()); } - @Override - public ListenableFuture> fetchAggEntityArguments(CalculatedFieldCtx ctx, EntityId entityId) { - return super.fetchEntityAggArguments(ctx, entityId, System.currentTimeMillis()); - } - @Override public Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) { return switch (ctx.getCfType()) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index c8f7dd0c3d..5bb16292be 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -22,7 +22,7 @@ import org.thingsboard.script.api.tbel.TbelCfArg; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; @@ -40,7 +40,7 @@ import java.util.Map; @JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING"), @JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING"), @JsonSubTypes.Type(value = PropagationArgumentEntry.class, name = "PROPAGATION"), - @JsonSubTypes.Type(value = AggArgumentEntry.class, name = "AGGREGATE_LATEST"), + @JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "AGGREGATE_LATEST"), @JsonSubTypes.Type(value = AggSingleEntityArgumentEntry.class, name = "AGGREGATE_LATEST_SINGLE") }) public interface ArgumentEntry { @@ -77,7 +77,7 @@ public interface ArgumentEntry { } static ArgumentEntry createAggArgument(Map entityIdkvEntryMap) { - return new AggArgumentEntry(entityIdkvEntryMap, false); + return new RelatedEntitiesArgumentEntry(entityIdkvEntryMap, false); } static ArgumentEntry createAggSingleArgument(EntityId entityId, KvEntry kvEntry) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java index 9882b8181b..5c672cf04e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java @@ -16,5 +16,5 @@ package org.thingsboard.server.service.cf.ctx.state; public enum ArgumentEntryType { - SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, AGGREGATE_LATEST, AGGREGATE_LATEST_SINGLE + SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES, AGGREGATE_LATEST_SINGLE } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java index 6430fe3f1c..b935256860 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java @@ -33,7 +33,6 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; public class AggSingleEntityArgumentEntry extends SingleValueArgumentEntry { private EntityId entityId; - private boolean deleted; public AggSingleEntityArgumentEntry(EntityId entityId, ArgumentEntry entry) { super(entry); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelaredEntitiesAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java similarity index 84% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelaredEntitiesAggregationCalculatedFieldState.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java index b53042b1fe..c0baca836c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelaredEntitiesAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -46,7 +45,7 @@ import java.util.Map.Entry; @Slf4j @Getter -public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculatedFieldState { +public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculatedFieldState { @Setter private long lastArgsRefreshTs = -1; @@ -55,9 +54,7 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat private long deduplicationInterval = -1; private Map metrics; - private final Map> inputs = new HashMap<>(); - - public RelaredEntitiesAggregationCalculatedFieldState(EntityId entityId) { + public RelatedEntitiesAggregationCalculatedFieldState(EntityId entityId) { super(entityId); } @@ -75,7 +72,6 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat lastArgsRefreshTs = -1; lastMetricsEvalTs = -1; metrics = null; - inputs.clear(); } @Override @@ -91,17 +87,8 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat @Override public Map update(Map argumentValues, CalculatedFieldCtx ctx) { - Map updatedArguments = super.update(argumentValues, ctx); lastArgsRefreshTs = System.currentTimeMillis(); - for (Map.Entry argEntry : arguments.entrySet()) { - String key = argEntry.getKey(); - AggArgumentEntry aggArgumentEntry = (AggArgumentEntry) argEntry.getValue(); - Map aggInputs = aggArgumentEntry.getAggInputs(); - aggInputs.forEach((entityId, argumentEntry) -> { - inputs.computeIfAbsent(entityId, k -> new HashMap<>()).put(key, argumentEntry); - }); - } - return updatedArguments; + return super.update(argumentValues, ctx); } @Override @@ -115,7 +102,7 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() .type(output.getType()) .scope(output.getScope()) - .result(createResultJson(ctx.isUseLatestTs(), aggResult)) + .result(toSimpleResult(ctx.isUseLatestTs(), aggResult)) .build()); } else { return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() @@ -124,20 +111,47 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat } } + public Map updateEntityData(Map fetchedArgs) { + lastMetricsEvalTs = -1; + return update(fetchedArgs, ctx); + } + + public void cleanupEntityData(EntityId relatedEntityId) { + arguments.values().forEach(argEntry -> { + RelatedEntitiesArgumentEntry aggEntry = (RelatedEntitiesArgumentEntry) argEntry; + aggEntry.getAggInputs().remove(relatedEntityId); + }); + lastMetricsEvalTs = -1; + lastArgsRefreshTs = System.currentTimeMillis(); + } + private boolean shouldRecalculate() { boolean intervalPassed = lastMetricsEvalTs <= System.currentTimeMillis() - deduplicationInterval; boolean argsUpdatedDuringInterval = lastArgsRefreshTs > lastMetricsEvalTs; return intervalPassed && argsUpdatedDuringInterval; } + private Map> prepareInputs() { + Map> inputs = new HashMap<>(); + for (Map.Entry argEntry : arguments.entrySet()) { + String key = argEntry.getKey(); + RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = (RelatedEntitiesArgumentEntry) argEntry.getValue(); + relatedEntitiesArgumentEntry.getAggInputs().forEach((entityId, argumentEntry) -> { + inputs.computeIfAbsent(entityId, k -> new HashMap<>()).put(key, argumentEntry); + }); + } + return inputs; + } + private ObjectNode aggregateMetrics(Output output) throws Exception { ObjectNode aggResult = JacksonUtil.newObjectNode(); + Map> inputs = prepareInputs(); for (Entry entry : metrics.entrySet()) { String metricKey = entry.getKey(); AggMetric metric = entry.getValue(); AggEntry aggMetricEntry = AggFunctionFactory.createAggFunction(metric.getFunction()); - aggregateMetric(metric, aggMetricEntry); + aggregateMetric(metric, aggMetricEntry, inputs); aggMetricEntry.result().ifPresent(result -> { aggResult.set(metricKey, JacksonUtil.valueToTree(formatResult(result, output.getDecimalsByDefault()))); }); @@ -145,7 +159,7 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat return aggResult; } - private void aggregateMetric(AggMetric metric, AggEntry aggEntry) throws Exception { + private void aggregateMetric(AggMetric metric, AggEntry aggEntry, Map> inputs) throws Exception { for (Map entityInputs : inputs.values()) { if (applyAggregation(metric.getFilter(), entityInputs)) { Object arg = resolveAggregationInput(metric.getInput(), entityInputs); @@ -183,16 +197,4 @@ public class RelaredEntitiesAggregationCalculatedFieldState extends BaseCalculat } } - protected JsonNode createResultJson(boolean useLatestTs, JsonNode result) { - long latestTs = getLatestTimestamp(); - if (useLatestTs && latestTs != -1) { - ObjectNode resultNode = JacksonUtil.newObjectNode(); - resultNode.put("ts", latestTs); - resultNode.set("values", result); - return resultNode; - } else { - return result; - } - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java similarity index 72% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggArgumentEntry.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java index e002aece2c..5385808923 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java @@ -27,7 +27,7 @@ import java.util.Map; @Data @AllArgsConstructor -public class AggArgumentEntry implements ArgumentEntry { +public class RelatedEntitiesArgumentEntry implements ArgumentEntry { private final Map aggInputs; @@ -35,7 +35,7 @@ public class AggArgumentEntry implements ArgumentEntry { @Override public ArgumentEntryType getType() { - return ArgumentEntryType.AGGREGATE_LATEST; + return ArgumentEntryType.RELATED_ENTITIES; } @Override @@ -45,19 +45,15 @@ public class AggArgumentEntry implements ArgumentEntry { @Override public boolean updateEntry(ArgumentEntry entry) { - if (entry instanceof AggArgumentEntry aggArgumentEntry) { - aggInputs.putAll(aggArgumentEntry.aggInputs); + if (entry instanceof RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry) { + aggInputs.putAll(relatedEntitiesArgumentEntry.aggInputs); return true; } else if (entry instanceof AggSingleEntityArgumentEntry aggSingleEntityArgumentEntry) { - if (aggSingleEntityArgumentEntry.isDeleted()) { - aggInputs.remove(aggSingleEntityArgumentEntry.getEntityId()); + ArgumentEntry argumentEntry = aggInputs.get(aggSingleEntityArgumentEntry.getEntityId()); + if (argumentEntry != null) { + argumentEntry.updateEntry(aggSingleEntityArgumentEntry); } else { - ArgumentEntry argumentEntry = aggInputs.get(aggSingleEntityArgumentEntry.getEntityId()); - if (argumentEntry != null) { - argumentEntry.updateEntry(aggSingleEntityArgumentEntry); - } else { - aggInputs.put(aggSingleEntityArgumentEntry.getEntityId(), aggSingleEntityArgumentEntry); - } + aggInputs.put(aggSingleEntityArgumentEntry.getEntityId(), aggSingleEntityArgumentEntry); } return true; } else { diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java index e1763ffa2d..239ffedbc7 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java @@ -35,7 +35,7 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.RelaredEntitiesAggregationCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState; @@ -91,7 +91,7 @@ public class CalculatedFieldArgumentUtils { case GEOFENCING -> new GeofencingCalculatedFieldState(entityId); case ALARM -> new AlarmCalculatedFieldState(entityId); case PROPAGATION -> new PropagationCalculatedFieldState(entityId); - case RELATED_ENTITIES_AGGREGATION -> new RelaredEntitiesAggregationCalculatedFieldState(entityId); + case RELATED_ENTITIES_AGGREGATION -> new RelatedEntitiesAggregationCalculatedFieldState(entityId); }; } diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 67af13fdb3..389fad8ff3 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -47,9 +47,9 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.RelaredEntitiesAggregationCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmRuleState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; @@ -104,9 +104,9 @@ public class CalculatedFieldUtils { case SINGLE_VALUE -> builder.addSingleValueArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) argEntry)); case TS_ROLLING -> builder.addRollingValueArguments(toRollingArgumentProto(argName, (TsRollingArgumentEntry) argEntry)); case GEOFENCING -> builder.addGeofencingArguments(toGeofencingArgumentProto(argName, (GeofencingArgumentEntry) argEntry)); - case AGGREGATE_LATEST -> { - AggArgumentEntry aggArgumentEntry = (AggArgumentEntry) argEntry; - aggArgumentEntry.getAggInputs() + case RELATED_ENTITIES -> { + RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = (RelatedEntitiesArgumentEntry) argEntry; + relatedEntitiesArgumentEntry.getAggInputs() .forEach((entityId, entry) -> aggBuilder.addAggArguments(toAggSingleArgumentProto(argName, entityId, entry))); } } @@ -120,7 +120,7 @@ public class CalculatedFieldUtils { alarmStateProto.setClearRuleState(toAlarmRuleStateProto(alarmState.getClearRuleState())); } } - if (state instanceof RelaredEntitiesAggregationCalculatedFieldState aggState) { + if (state instanceof RelatedEntitiesAggregationCalculatedFieldState aggState) { aggBuilder.setLastArgsUpdateTs(aggState.getLastArgsRefreshTs()); builder.setLatestValuesAggregationState(aggBuilder.build()); } @@ -214,7 +214,7 @@ public class CalculatedFieldUtils { case GEOFENCING -> new GeofencingCalculatedFieldState(id.entityId()); case ALARM -> new AlarmCalculatedFieldState(id.entityId()); case PROPAGATION -> new PropagationCalculatedFieldState(id.entityId()); - case RELATED_ENTITIES_AGGREGATION -> new RelaredEntitiesAggregationCalculatedFieldState(id.entityId()); + case RELATED_ENTITIES_AGGREGATION -> new RelatedEntitiesAggregationCalculatedFieldState(id.entityId()); }; proto.getSingleValueArgumentsList().forEach(argProto -> @@ -241,7 +241,7 @@ public class CalculatedFieldUtils { } } case RELATED_ENTITIES_AGGREGATION -> { - RelaredEntitiesAggregationCalculatedFieldState aggState = (RelaredEntitiesAggregationCalculatedFieldState) state; + RelatedEntitiesAggregationCalculatedFieldState aggState = (RelatedEntitiesAggregationCalculatedFieldState) state; LatestValuesAggregationStateProto aggregationStateProto = proto.getLatestValuesAggregationState(); Map> arguments = new HashMap<>(); aggregationStateProto.getAggArgumentsList().forEach(argProto -> { @@ -249,7 +249,7 @@ public class CalculatedFieldUtils { arguments.computeIfAbsent(argProto.getValue().getArgName(), name -> new HashMap<>()).put(entry.getEntityId(), entry); }); arguments.forEach((argName, entityInputs) -> { - aggState.getArguments().put(argName, new AggArgumentEntry(entityInputs, false)); + aggState.getArguments().put(argName, new RelatedEntitiesArgumentEntry(entityInputs, false)); }); aggState.setLastArgsRefreshTs(aggregationStateProto.getLastArgsUpdateTs()); } diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/AggArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java similarity index 80% rename from application/src/test/java/org/thingsboard/server/service/cf/ctx/state/AggArgumentEntryTest.java rename to application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java index c1c2d85c55..14d2801e0e 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/AggArgumentEntryTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import java.util.HashMap; @@ -31,9 +31,9 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -public class AggArgumentEntryTest { +public class RelatedEntitiesArgumentEntryTest { - private AggArgumentEntry entry; + private RelatedEntitiesArgumentEntry entry; private final DeviceId device1 = new DeviceId(UUID.fromString("1984e5f4-9ff0-4187-84ae-e4438bba4c8a")); private final DeviceId device2 = new DeviceId(UUID.fromString("937fc062-1a9d-438f-aa22-55a93fc908b7")); @@ -46,7 +46,7 @@ public class AggArgumentEntryTest { aggInputs.put(device1, new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 100, new LongDataEntry("key", 12L), 1L))); aggInputs.put(device2, new AggSingleEntityArgumentEntry(device2, new BasicTsKvEntry(ts - 150, new LongDataEntry("key", 16L), 6L))); - entry = new AggArgumentEntry(aggInputs, false); + entry = new RelatedEntitiesArgumentEntry(aggInputs, false); } @Test @@ -61,17 +61,17 @@ public class AggArgumentEntryTest { DeviceId device3 = new DeviceId(UUID.randomUUID()); DeviceId device4 = new DeviceId(UUID.randomUUID()); - AggArgumentEntry aggArgumentEntry = new AggArgumentEntry(Map.of( + RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = new RelatedEntitiesArgumentEntry(Map.of( device3, new AggSingleEntityArgumentEntry(device3, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 16L), 13L)), device4, new AggSingleEntityArgumentEntry(device4, new BasicTsKvEntry(ts - 60, new LongDataEntry("key", 23L), 7L)) ), false); - assertThat(entry.updateEntry(aggArgumentEntry)).isTrue(); + assertThat(entry.updateEntry(relatedEntitiesArgumentEntry)).isTrue(); Map aggInputs = entry.getAggInputs(); assertThat(aggInputs.size()).isEqualTo(4); - assertThat(aggInputs.get(device3)).isEqualTo(aggArgumentEntry.getAggInputs().get(device3)); - assertThat(aggInputs.get(device4)).isEqualTo(aggArgumentEntry.getAggInputs().get(device4)); + assertThat(aggInputs.get(device3)).isEqualTo(relatedEntitiesArgumentEntry.getAggInputs().get(device3)); + assertThat(aggInputs.get(device4)).isEqualTo(relatedEntitiesArgumentEntry.getAggInputs().get(device4)); } @Test @@ -98,15 +98,4 @@ public class AggArgumentEntryTest { assertThat(aggInputs.get(device2)).isEqualTo(singleEntityArgumentEntry); } - @Test - void testUpdateEntryWhenDeletedAggSingleEntityArgumentEntryPassed() { - AggSingleEntityArgumentEntry singleEntityArgumentEntry = new AggSingleEntityArgumentEntry(device2, true); - - assertThat(entry.updateEntry(singleEntityArgumentEntry)).isTrue(); - - Map aggInputs = entry.getAggInputs(); - assertThat(aggInputs.size()).isEqualTo(1); - assertThat(aggInputs.get(device2)).isNull(); - } - }