From 31dc4ec8d71255fcbe474ca7514409dd89f319f8 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 4 Nov 2025 17:02:49 +0200 Subject: [PATCH] changed scheduling implementation --- ...CalculatedFieldEntityMessageProcessor.java | 2 +- ...tractCalculatedFieldProcessingService.java | 10 +--- .../service/cf/ctx/state/ArgumentEntry.java | 4 +- .../cf/ctx/state/CalculatedFieldCtx.java | 28 ++++++++- .../single/AggIntervalEntryStatus.java | 4 +- .../EntityAggregationArgumentEntry.java | 5 +- ...EntityAggregationCalculatedFieldState.java | 58 ++++++------------- .../utils/CalculatedFieldArgumentUtils.java | 2 +- .../src/main/resources/thingsboard.yml | 4 +- .../EntityAggregationCalculatedFieldTest.java | 55 ++++++++++++++++-- 10 files changed, 110 insertions(+), 62 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 69ea705c60..1668d0d213 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -128,7 +128,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM relatedEntitiesAggState.scheduleReevaluation(); } if (state instanceof EntityAggregationCalculatedFieldState entityAggState) { - entityAggState.scheduleReevaluation(); + entityAggState.fillMissingIntervals(); } states.put(cfId, state); } else { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index bae987d53c..45d8e3b4e0 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -23,7 +23,6 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; @@ -88,9 +87,6 @@ public abstract class AbstractCalculatedFieldProcessingService { protected ListeningExecutorService calculatedFieldCallbackExecutor; - @Value("${actors.calculated_fields.max_datapoints_limit}") - private int aggArgumentMaxDatapointsLimit; - @PostConstruct public void init() { calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( @@ -311,10 +307,10 @@ public abstract class AbstractCalculatedFieldProcessingService { } private ListenableFuture fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) { - long startInterval = interval.getCurrentIntervalStartTs(); + long intervalStartTs = interval.getCurrentIntervalStartTs(); long intervalEndTs = interval.getCurrentIntervalEndTs(); - ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startInterval, queryEndTs, 0, aggArgumentMaxDatapointsLimit, Aggregation.NONE); - return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, startInterval, intervalEndTs)); + ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), intervalStartTs, queryEndTs, 0, 1, Aggregation.NONE); + return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, intervalStartTs, intervalEndTs)); } private ListenableFuture fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index b331c11a47..55a61d1918 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; @@ -39,7 +40,8 @@ import java.util.Map; @JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING"), @JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING"), @JsonSubTypes.Type(value = PropagationArgumentEntry.class, name = "PROPAGATION"), - @JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "RELATED_ENTITIES") + @JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "RELATED_ENTITIES"), + @JsonSubTypes.Type(value = EntityAggregationArgumentEntry.class, name = "ENTITY_AGGREGATION") }) public interface ArgumentEntry { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 92895617c1..36c9436f9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -45,6 +45,8 @@ import org.thingsboard.server.common.data.cf.configuration.ScheduledUpdateSuppor import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunctionInput; import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.Watermark; import org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; @@ -95,6 +97,8 @@ public class CalculatedFieldCtx implements Closeable { private boolean useLatestTs; private boolean requiresScheduledReevaluation; + private long lastReevaluationTs; + private ActorSystemContext systemContext; private TbelInvokeService tbelInvokeService; private RelationService relationService; @@ -192,7 +196,6 @@ public class CalculatedFieldCtx implements Closeable { if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) { this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L; } - this.requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation(); if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) { this.useLatestTs = aggConfig.isUseLatestTs(); } @@ -207,6 +210,29 @@ public class CalculatedFieldCtx implements Closeable { this.maxSingleValueArgumentSize = systemContext.getApiLimitService().getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxSingleValueArgumentSizeInKBytes) * 1024; } + public boolean isRequiresScheduledReevaluation() { + if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) { + long now = System.currentTimeMillis(); + long cfCheckIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval()); + Watermark watermark = entityAggregationConfig.getWatermark(); + if (watermark != null) { + long checkIntervalMillis = TimeUnit.SECONDS.toMillis(watermark.getCheckInterval()); + if (cfCheckIntervalMillis == checkIntervalMillis) { + return true; + } + if (now + cfCheckIntervalMillis >= lastReevaluationTs + checkIntervalMillis) { + lastReevaluationTs = now; + return true; + } + } + long intervalEndTsMillis = entityAggregationConfig.getInterval().getCurrentIntervalEndTs(); + if (now + cfCheckIntervalMillis >= intervalEndTsMillis) { + return true; + } + } + return calculatedField.getConfiguration().requiresScheduledReevaluation(); + } + public void init() { switch (cfType) { case SCRIPT -> { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java index fd2a899503..2d350946e2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java @@ -38,8 +38,8 @@ public class AggIntervalEntryStatus { boolean intervalPassed = lastMetricsEvalTs <= System.currentTimeMillis() - checkInterval; boolean argsUpdatedDuringInterval = lastArgsRefreshTs > -1; if (intervalPassed && argsUpdatedDuringInterval) { - lastMetricsEvalTs = System.currentTimeMillis(); - lastArgsRefreshTs = -1; + setLastMetricsEvalTs(System.currentTimeMillis()); + setLastArgsRefreshTs(-1); return true; } return false; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java index 2723b35cc9..9ce47583e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java @@ -50,9 +50,10 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry { aggIntervals.putAll(entityAggEntry.getAggIntervals()); } else if (entry instanceof SingleValueArgumentEntry singleValueArgEntry) { long entryTs = singleValueArgEntry.getTs(); + long argUpdateTs = System.currentTimeMillis(); for (Map.Entry aggIntervalEntry : aggIntervals.entrySet()) { if (aggIntervalEntry.getKey().belongsToInterval(entryTs)) { - aggIntervalEntry.getValue().setLastArgsRefreshTs(System.currentTimeMillis()); + aggIntervalEntry.getValue().setLastArgsRefreshTs(argUpdateTs); return true; } } @@ -62,7 +63,7 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry { @Override public boolean isEmpty() { - return true; + return aggIntervals.isEmpty(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java index 0f64140cab..ff93d4a924 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java @@ -53,30 +53,16 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt private long checkInterval; private Map metrics; - private final Map> intervals = new HashMap<>(); - private CalculatedFieldProcessingService cfProcessingService; public EntityAggregationCalculatedFieldState(EntityId entityId) { super(entityId); } - public void scheduleReevaluation() { - prepareIntervals(); - fillMissingIntervals(interval.getCurrentIntervalEndTs(), intervalDuration); - long now = System.currentTimeMillis(); - intervals.forEach((intervalEntry, argumentIntervalStatuses) -> { - if (intervalEntry.belongsToInterval(now)) { - ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx); - } else { - if (intervalEntry.getEndTs() <= now) { - ctx.scheduleReevaluation(checkInterval, actorCtx); - } - } - }); - } - - private void fillMissingIntervals(long currentIntervalEndTs, long intervalDuration) { + public void fillMissingIntervals() { + long currentIntervalEndTs = interval.getCurrentIntervalEndTs(); + long intervalDuration = interval.getIntervalDurationMillis(); + Map> intervals = getIntervals(); AggIntervalEntry lastIntervalEntry = intervals.keySet().stream().max(Comparator.comparing(AggIntervalEntry::getEndTs)).orElse(null); if (lastIntervalEntry == null) { return; @@ -91,8 +77,7 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt arguments.forEach((argName, argumentEntry) -> { var entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry; AggIntervalEntryStatus intervalEntryStatus = new AggIntervalEntryStatus(System.currentTimeMillis()); - entityAggEntry.getAggIntervals().put(missingAggIntervalEntry, intervalEntryStatus); - intervals.computeIfAbsent(missingAggIntervalEntry, i -> new HashMap<>()).put(argName, intervalEntryStatus); + entityAggEntry.getAggIntervals().computeIfAbsent(missingAggIntervalEntry, missingInterval -> intervalEntryStatus); }); nextStartTs = nextEndTs; @@ -100,6 +85,17 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt } } + private Map> getIntervals() { + Map> intervals = new HashMap<>(); + arguments.forEach((argName, entry) -> { + var argEntry = (EntityAggregationArgumentEntry) entry; + argEntry.getAggIntervals().forEach((intervalEntry, status) -> + intervals.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, status) + ); + }); + return intervals; + } + @Override public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) { super.setCtx(ctx, actorCtx); @@ -121,11 +117,11 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt @Override public ListenableFuture performCalculation(Map updatedArgs, CalculatedFieldCtx ctx) throws Exception { createIntervalIfNotExist(); - prepareIntervals(); long now = System.currentTimeMillis(); Map> results = new HashMap<>(); List expiredIntervals = new ArrayList<>(); + Map> intervals = getIntervals(); intervals.forEach((intervalEntry, argIntervalStatuses) -> { processInterval(now, intervalEntry, argIntervalStatuses, expiredIntervals, results); }); @@ -143,37 +139,22 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt .build()); } - private void prepareIntervals() { - arguments.forEach((argName, entry) -> { - var argEntry = (EntityAggregationArgumentEntry) entry; - argEntry.getAggIntervals().forEach((intervalEntry, status) -> - intervals.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, status) - ); - }); - } - private void removeExpiredIntervals(List expiredIntervals) { expiredIntervals.forEach(expiredInterval -> { arguments.values().stream() .map(EntityAggregationArgumentEntry.class::cast) .forEach(arg -> arg.getAggIntervals().remove(expiredInterval)); - intervals.remove(expiredInterval); }); } private void createIntervalIfNotExist() { AggIntervalEntry currentInterval = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs()); - if (intervals.containsKey(currentInterval)) { - return; - } arguments.forEach((argName, argumentEntry) -> { var entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry; if (!entityAggEntry.getAggIntervals().containsKey(currentInterval)) { - entityAggEntry.getAggIntervals().put(currentInterval, new AggIntervalEntryStatus()); - intervals.computeIfAbsent(currentInterval, i -> new HashMap<>()).put(argName, new AggIntervalEntryStatus()); + entityAggEntry.getAggIntervals().computeIfAbsent(currentInterval, current -> new AggIntervalEntryStatus()); } }); - ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx); } private void processInterval(long now, @@ -208,9 +189,6 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt args.forEach((argName, argEntryIntervalStatus) -> { if (argEntryIntervalStatus.shouldRecalculate(checkInterval)) { processMetric(intervalEntry, argName, false, results); - ctx.scheduleReevaluation(checkInterval, actorCtx); - } else if (argEntryIntervalStatus.intervalPassed(checkInterval)) { - processMetric(intervalEntry, argName, true, results); } }); } diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java index 38a204d882..107b528957 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java @@ -80,7 +80,7 @@ public class CalculatedFieldArgumentUtils { if (defaultValue != null) { ArgumentEntry.createSingleValueArgument(new DoubleDataEntry(argKey, defaultValue.doubleValue())); } - return ArgumentEntry.createSingleValueArgument(new StringDataEntry(argKey, null)); + return new SingleValueArgumentEntry(); } public static ArgumentEntry transformAggregationArgument(List timeSeries, long startIntervalTs, long endIntervalTs) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b29970417a..e49bcb9bf0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -530,9 +530,7 @@ actors: # Time in seconds to receive calculation result. calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}" # Interval in seconds to re-evaluate calculated fields that have a time schedule. 2 minutes by default. - check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:120}" - # Maximum allowed datapoints fetched by aggregation calculated fields - max_datapoints_limit: "${CF_AGG_MAX_DATAPOINTS_LIMIT:50000}" + check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:30}" debug: settings: diff --git a/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java b/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java index 5a73b026f0..f4123b31d6 100644 --- a/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java @@ -20,6 +20,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; @@ -53,6 +54,9 @@ import static org.thingsboard.server.cf.CalculatedFieldIntegrationTest.POLL_INTE @DaoSqlTest @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) +@TestPropertySource(properties = { + "actors.calculated_fields.check_interval=1" +}) public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest { private Tenant savedTenant; @@ -91,7 +95,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest public void testCreateCf_checkAggregation() throws Exception { Device device = createDevice("Device", "1234567890111"); - CustomInterval customInterval = new CustomInterval(60L, 0L, "Europe/Kyiv"); + CustomInterval customInterval = new CustomInterval(30L, 0L, "Europe/Kyiv"); long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs(); long currentIntervalEndTs = customInterval.getCurrentIntervalEndTs(); @@ -105,7 +109,8 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsInInterval_3)); long interval = customInterval.getIntervalDurationMillis(); - CalculatedField totalConsumptionCF = createTotalConsumptionCF(device.getId(), customInterval); + Watermark watermark = new Watermark(60, 10); + CalculatedField totalConsumptionCF = createTotalConsumptionCF(device.getId(), customInterval, watermark); await().alias("create CF and perform aggregation after interval end") .atMost(2 * interval, TimeUnit.MILLISECONDS) @@ -117,7 +122,49 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest }); } - private CalculatedField createTotalConsumptionCF(EntityId entityId, AggInterval aggInterval) { + @Test + public void testCreateCf_checkAggregationDuringWatermark() throws Exception { + Device device = createDevice("Device", "1234567890111"); + + CustomInterval customInterval = new CustomInterval(30L, 0L, "Europe/Kyiv"); + long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs(); + long currentIntervalEndTs = customInterval.getCurrentIntervalEndTs(); + + long tsBeforeInterval = currentIntervalStartTs - 1000L; + long tsInInterval_1 = currentIntervalStartTs + 1000L; + long tsInInterval_2 = currentIntervalStartTs + 500L; + long tsInInterval_3 = currentIntervalStartTs + 200L; + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsBeforeInterval)); + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":100}}", tsInInterval_1)); + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":180}}", tsInInterval_2)); + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsInInterval_3)); + + long interval = customInterval.getIntervalDurationMillis(); + Watermark watermark = new Watermark(60, 10); + CalculatedField totalConsumptionCF = createTotalConsumptionCF(device.getId(), customInterval, watermark); + + await().alias("create CF and perform aggregation after interval end") + .atMost(2 * interval, TimeUnit.MILLISECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode result = getLatestTelemetry(device.getId(), "consumptionPerMin"); + assertThat(result).isNotNull(); + assertThat(result.get("consumptionPerMin").get(0).get("value").asText()).isEqualTo("400"); + }); + + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":300}}", tsInInterval_1)); + + await().alias("create CF and perform aggregation after interval end") + .atMost(2 * watermark.getCheckInterval(), TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode result = getLatestTelemetry(device.getId(), "consumptionPerMin"); + assertThat(result).isNotNull(); + assertThat(result.get("consumptionPerMin").get(0).get("value").asText()).isEqualTo("600"); + }); + } + + private CalculatedField createTotalConsumptionCF(EntityId entityId, AggInterval aggInterval, Watermark watermark) { Map arguments = new HashMap<>(); Argument argument = new Argument(); argument.setRefEntityKey(new ReferencedEntityKey("energy", ArgumentType.TS_LATEST, null)); @@ -137,7 +184,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest return createAggCf("Consumption per minute", entityId, aggInterval, - new Watermark(TimeUnit.MINUTES.toMillis(1), TimeUnit.SECONDS.toMillis(10)), + watermark, arguments, aggMetrics, output);