From 071c254f6752992b756f0e2dffbf045aec7cc17d Mon Sep 17 00:00:00 2001 From: dshvaika Date: Thu, 4 Dec 2025 13:09:38 +0200 Subject: [PATCH] Updated calculated field processing logic && minor refactoring --- ...tractCalculatedFieldProcessingService.java | 59 ++++++++----------- .../cf/ctx/state/CalculatedFieldCtx.java | 6 +- ...titiesAggregationCalculatedFieldState.java | 3 +- ...EntityAggregationCalculatedFieldState.java | 6 +- .../utils/CalculatedFieldArgumentUtils.java | 29 ++++----- 5 files changed, 44 insertions(+), 59 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index 845d5a50e9..13ceb406bb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -54,9 +54,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -75,7 +73,6 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; -import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; @@ -84,7 +81,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Function; @@ -101,7 +97,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATE import static org.thingsboard.server.dao.util.KvUtils.filterChangedAttr; import static org.thingsboard.server.dao.util.KvUtils.toTsKvEntryList; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry; -import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultKvEntry; +import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultTsKvEntry; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformAggMetricArgument; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformAggregationArgument; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformSingleValueArgument; @@ -210,14 +206,14 @@ public abstract class AbstractCalculatedFieldProcessingService { default -> { var resolvedEntityIdsFuture = resolveGeofencingEntityIds(ctx.getTenantId(), entityId, entry); argFutures.put(entry.getKey(), Futures.transformAsync(resolvedEntityIdsFuture, resolvedEntityIds -> - fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue()), MoreExecutors.directExecutor())); + fetchGeofencingArgumentValue(ctx.getTenantId(), resolvedEntityIds, entry.getValue(), startTs), MoreExecutors.directExecutor())); } } } return argFutures; } - protected Map> fetchRelatedEntitiesAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { + private Map> fetchRelatedEntitiesAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { if (!(ctx.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration config)) { return Collections.emptyMap(); } @@ -230,7 +226,7 @@ public abstract class AbstractCalculatedFieldProcessingService { )); } - protected Map> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { + private Map> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { if (!(ctx.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration config)) { return Collections.emptyMap(); } @@ -291,31 +287,28 @@ public abstract class AbstractCalculatedFieldProcessingService { return ownerService.getOwner(tenantId, entityId); } - private ListenableFuture fetchGeofencingKvEntry(TenantId tenantId, List geofencingEntities, Argument argument) { + private ListenableFuture fetchGeofencingArgumentValue(TenantId tenantId, List geofencingEntities, Argument argument, long startTs) { if (argument.getRefEntityKey().getType() != ArgumentType.ATTRIBUTE) { throw new IllegalStateException("Unsupported argument key type: " + argument.getRefEntityKey().getType()); } - List>> kvFutures = geofencingEntities.stream() + var geofencingEntityIdToKvEntryMapFutures = Futures.allAsList(fetchGeofencingEntityIdToKvEntriesFutures(tenantId, geofencingEntities, argument, startTs)); + return Futures.transform(geofencingEntityIdToKvEntryMapFutures, entries -> ArgumentEntry.createGeofencingValueArgument(entries.stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), MoreExecutors.directExecutor()); + } + + private List>> fetchGeofencingEntityIdToKvEntriesFutures(TenantId tenantId, List geofencingEntities, Argument argument, long startTs) { + return geofencingEntities.stream() .map(entityId -> { - var attributesFuture = attributesService.find( - tenantId, - entityId, - argument.getRefEntityKey().getScope(), - argument.getRefEntityKey().getKey() - ); + AttributeScope scope = argument.getRefEntityKey().getScope(); + String key = argument.getRefEntityKey().getKey(); + var attributesFuture = attributesService.find(tenantId, entityId, scope, key); return Futures.transform(attributesFuture, resultOpt -> - Map.entry(entityId, resultOpt.orElseGet(() -> createDefaultAttributeEntry(argument, System.currentTimeMillis()))), - calculatedFieldCallbackExecutor - ); + Map.entry(entityId, resultOpt.orElseGet(() -> createDefaultAttributeEntry(argument, startTs))), + calculatedFieldCallbackExecutor); }).collect(Collectors.toList()); - - ListenableFuture>> allFutures = Futures.allAsList(kvFutures); - - return Futures.transform(allFutures, entries -> ArgumentEntry.createGeofencingValueArgument(entries.stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), MoreExecutors.directExecutor()); } - public ListenableFuture fetchRelatedEntitiesArgumentEntry(TenantId tenantId, List aggEntities, Argument argument, long startTs) { + private ListenableFuture fetchRelatedEntitiesArgumentEntry(TenantId tenantId, List aggEntities, Argument argument, long startTs) { List>> futures = aggEntities.stream() .map(entityId -> { ListenableFuture argumentEntryFut = fetchArgumentValue(tenantId, entityId, argument, startTs); @@ -368,21 +361,17 @@ public abstract class AbstractCalculatedFieldProcessingService { return Futures.transform(attributeOptFuture, attrOpt -> { log.debug("[{}][{}] Fetched attribute for key {}: {}", tenantId, entityId, argument.getRefEntityKey(), attrOpt); - AttributeKvEntry attributeKvEntry = attrOpt.orElseGet(() -> new BaseAttributeKvEntry(createDefaultKvEntry(argument), defaultLastUpdateTs, SingleValueArgumentEntry.DEFAULT_VERSION)); - return transformSingleValueArgument(Optional.of(attributeKvEntry)); + return transformSingleValueArgument(attrOpt.orElseGet(() -> createDefaultAttributeEntry(argument, defaultLastUpdateTs))); }, calculatedFieldCallbackExecutor); } - protected ListenableFuture fetchTsLatest(TenantId tenantId, EntityId entityId, Argument argument, long defaultTs) { + private ListenableFuture fetchTsLatest(TenantId tenantId, EntityId entityId, Argument argument, long defaultTs) { String timeseriesKey = argument.getRefEntityKey().getKey(); log.trace("[{}][{}] Fetching latest timeseries {}", tenantId, entityId, timeseriesKey); - return transformSingleValueArgument( - Futures.transform( - timeseriesService.findLatest(tenantId, entityId, timeseriesKey), - result -> { - log.debug("[{}][{}] Fetched latest timeseries {}: {}", tenantId, entityId, timeseriesKey, result); - return result.or(() -> Optional.of(new BasicTsKvEntry(defaultTs, createDefaultKvEntry(argument), SingleValueArgumentEntry.DEFAULT_VERSION))); - }, calculatedFieldCallbackExecutor)); + return Futures.transform(timeseriesService.findLatest(tenantId, entityId, timeseriesKey), result -> { + log.debug("[{}][{}] Fetched latest timeseries {}: {}", tenantId, entityId, timeseriesKey, result); + return transformSingleValueArgument(result.orElseGet(() -> createDefaultTsKvEntry(argument, defaultTs))); + }, calculatedFieldCallbackExecutor); } private ListenableFuture fetchTimeSeriesInternal(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, Function, ArgumentEntry> transformArgument) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index d2002c707b..c2c65d3327 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -85,7 +85,7 @@ import static org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldSta @Slf4j public class CalculatedFieldCtx implements Closeable { - private static final long SCHEDULED_UPDATE_DISABLED_VALUE = -1L; + public static final long DISABLED_INTERVAL_VALUE = -1L; private CalculatedField calculatedField; @@ -201,7 +201,7 @@ public class CalculatedFieldCtx implements Closeable { } } if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) { - this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : SCHEDULED_UPDATE_DISABLED_VALUE; + this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : DISABLED_INTERVAL_VALUE; } if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) { this.useLatestTs = aggConfig.isUseLatestTs(); @@ -720,7 +720,7 @@ public class CalculatedFieldCtx implements Closeable { } private boolean isScheduledUpdateDisabled() { - return scheduledUpdateIntervalMillis == SCHEDULED_UPDATE_DISABLED_VALUE; + return scheduledUpdateIntervalMillis == DISABLED_INTERVAL_VALUE; } public boolean shouldFetchRelationQueryDynamicArgumentsFromDb(CalculatedFieldState state) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java index cdc256148c..7036e4bd77 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java @@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx.DISABLED_INTERVAL_VALUE; @Slf4j @Getter @@ -62,7 +63,7 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat private long lastMetricsEvalTs = DEFAULT_LAST_UPDATE_TS; @Setter private long lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS; - private long deduplicationIntervalMs = DEFAULT_LAST_UPDATE_TS; + private long deduplicationIntervalMs = DISABLED_INTERVAL_VALUE; private Map metrics; private ScheduledFuture reevaluationFuture; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java index f3c3e8a1cc..ba0b151eb2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java @@ -218,7 +218,7 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt if (argEntryIntervalStatus.getLastArgsRefreshTs() > argEntryIntervalStatus.getLastMetricsEvalTs()) { argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis()); processArgument(intervalEntry, argName, false, results); - } else if (argEntryIntervalStatus.getLastMetricsEvalTs() == -1) { + } else if (argEntryIntervalStatus.getLastMetricsEvalTs() == DEFAULT_LAST_UPDATE_TS) { argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis()); processArgument(intervalEntry, argName, true, results); } @@ -232,9 +232,9 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt if (argEntryIntervalStatus.intervalPassed(checkInterval)) { if (argEntryIntervalStatus.argsUpdated()) { argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis()); - argEntryIntervalStatus.setLastArgsRefreshTs(-1); + argEntryIntervalStatus.setLastArgsRefreshTs(DEFAULT_LAST_UPDATE_TS); processArgument(intervalEntry, argName, false, results); - } else if (argEntryIntervalStatus.getLastMetricsEvalTs() == -1) { + } else if (argEntryIntervalStatus.getLastMetricsEvalTs() == DEFAULT_LAST_UPDATE_TS) { argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis()); processArgument(intervalEntry, argName, true, results); } diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java index 97d53ac252..74bbdd8a34 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.utils; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; +import lombok.NonNull; import org.apache.commons.lang3.math.NumberUtils; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.configuration.Argument; @@ -25,6 +23,7 @@ import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; @@ -48,20 +47,13 @@ import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalcul import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; + +import static org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry.DEFAULT_VERSION; public class CalculatedFieldArgumentUtils { - public static ListenableFuture transformSingleValueArgument(ListenableFuture> kvEntryFuture) { - return Futures.transform(kvEntryFuture, CalculatedFieldArgumentUtils::transformSingleValueArgument, MoreExecutors.directExecutor()); - } - - public static ArgumentEntry transformSingleValueArgument(Optional kvEntry) { - if (kvEntry.isPresent() && kvEntry.get().getValue() != null) { - return ArgumentEntry.createSingleValueArgument(kvEntry.get()); - } else { - return new SingleValueArgumentEntry(); - } + public static ArgumentEntry transformSingleValueArgument(@NonNull KvEntry kvEntry) { + return kvEntry.getValue() != null ? ArgumentEntry.createSingleValueArgument(kvEntry) : new SingleValueArgumentEntry(); } public static ArgumentEntry transformTsRollingArgument(List tsRolling, int limit, long argTimeWindow) { @@ -94,7 +86,7 @@ public class CalculatedFieldArgumentUtils { return new EntityAggregationArgumentEntry(aggIntervals); } - public static KvEntry createDefaultKvEntry(Argument argument) { + private static KvEntry createDefaultKvEntry(Argument argument) { String key = argument.getRefEntityKey().getKey(); String defaultValue = argument.getDefaultValue(); if (StringUtils.isBlank(defaultValue)) { @@ -109,9 +101,12 @@ public class CalculatedFieldArgumentUtils { return new StringDataEntry(key, defaultValue); } + public static TsKvEntry createDefaultTsKvEntry(Argument argument, long ts) { + return new BasicTsKvEntry(ts, createDefaultKvEntry(argument), DEFAULT_VERSION); + } + public static AttributeKvEntry createDefaultAttributeEntry(Argument argument, long ts) { - KvEntry kvEntry = createDefaultKvEntry(argument); - return new BaseAttributeKvEntry(kvEntry, ts, 0L); + return new BaseAttributeKvEntry(createDefaultKvEntry(argument), ts, DEFAULT_VERSION); } public static CalculatedFieldState createStateByType(CalculatedFieldCtx ctx, EntityId entityId) {