diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index ec645085e6..cd88c93a71 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -233,7 +233,7 @@ public abstract class AbstractCalculatedFieldProcessingService { return config.getArguments().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - entry -> fetchTimeSeries(ctx.getTenantId(), entityId, entry.getValue(), config.getInterval(), ts) + entry -> fetchTimeSeries(ctx, entityId, entry.getValue(), config.getInterval(), ts) )); } @@ -341,11 +341,11 @@ public abstract class AbstractCalculatedFieldProcessingService { return resolveArgumentValue(argKey, argumentEntryFut); } - private ListenableFuture fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) { + private ListenableFuture fetchTimeSeries(CalculatedFieldCtx ctx, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) { long intervalStartTs = interval.getCurrentIntervalStartTs(); long intervalEndTs = interval.getCurrentIntervalEndTs(); ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), intervalStartTs, queryEndTs, 0, 1, Aggregation.NONE); - return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, intervalStartTs, intervalEndTs)); + return fetchTimeSeriesInternal(ctx.getTenantId(), entityId, query, timeSeries -> transformAggregationArgument(timeSeries, intervalStartTs, intervalEndTs, ctx)); } private ListenableFuture fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java index 7ec5098bc3..ef34b6ae8e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java @@ -19,11 +19,18 @@ import com.fasterxml.jackson.databind.JsonNode; import lombok.Data; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.script.api.tbel.TbelCfArg; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.Watermark; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; +import java.time.Instant; +import java.time.ZonedDateTime; import java.util.Map; +import java.util.concurrent.TimeUnit; @Data public class EntityAggregationArgumentEntry implements ArgumentEntry { @@ -32,10 +39,18 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry { private boolean forceResetPrevious; + private AggInterval interval; + private long watermarkDuration; + public EntityAggregationArgumentEntry(Map aggIntervals) { this.aggIntervals = aggIntervals; } + public EntityAggregationArgumentEntry(Map aggIntervals, CalculatedFieldCtx ctx) { + this(aggIntervals); + setCtx(ctx); + } + @Override public ArgumentEntryType getType() { return ArgumentEntryType.ENTITY_AGGREGATION; @@ -46,29 +61,64 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry { return aggIntervals; } + public void setCtx(CalculatedFieldCtx ctx) { + var configuration = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); + interval = configuration.getInterval(); + Watermark watermark = configuration.getWatermark(); + watermarkDuration = watermark == null ? 0 : TimeUnit.SECONDS.toMillis(watermark.getDuration()); + } + @Override public boolean updateEntry(ArgumentEntry entry) { - boolean updated = false; if (entry instanceof EntityAggregationArgumentEntry entityAggEntry) { aggIntervals.putAll(entityAggEntry.getAggIntervals()); + return true; } else if (entry instanceof SingleValueArgumentEntry singleValueArgEntry) { long entryTs = singleValueArgEntry.getTs(); - long argUpdateTs = System.currentTimeMillis(); - for (Map.Entry aggIntervalEntry : aggIntervals.entrySet()) { - if (singleValueArgEntry.isForceResetPrevious()) { - aggIntervalEntry.getValue().setLastArgsRefreshTs(argUpdateTs); - updated = true; - continue; - } - if (aggIntervalEntry.getKey().belongsToInterval(entryTs)) { - aggIntervalEntry.getValue().setLastArgsRefreshTs(argUpdateTs); - return true; - } + long now = System.currentTimeMillis(); + if (updateExistingIntervals(singleValueArgEntry, entryTs, now)) { + return true; + } + return createNewInterval(entryTs, now); + } + return false; + } + + private boolean updateExistingIntervals(SingleValueArgumentEntry entry, long entryTs, long now) { + boolean updated = false; + + for (Map.Entry aggIntervalEntry : aggIntervals.entrySet()) { + AggIntervalEntry interval = aggIntervalEntry.getKey(); + AggIntervalEntryStatus status = aggIntervalEntry.getValue(); + if (entry.isForceResetPrevious()) { + status.setLastArgsRefreshTs(now); + updated = true; + continue; + } + if (interval.belongsToInterval(entryTs)) { + status.setLastArgsRefreshTs(now); + return true; } } + return updated; } + private boolean createNewInterval(long entryTs, long now) { + ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(entryTs), interval.getZoneId()); + + long startTs = interval.getDateTimeIntervalStartTs(zdt); + long endTs = interval.getDateTimeIntervalEndTs(zdt); + + if (now - endTs > watermarkDuration) { + return false; + } + + AggIntervalEntry newInterval = new AggIntervalEntry(startTs, endTs); + aggIntervals.computeIfAbsent(newInterval, i -> new AggIntervalEntryStatus(now)); + return true; + } + @Override public boolean isEmpty() { return aggIntervals.isEmpty(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java index 0f6e01a344..59335717fb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java @@ -82,6 +82,15 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt interval = configuration.getInterval(); metrics = configuration.getMetrics(); produceIntermediateResult = configuration.isProduceIntermediateResult(); + setCtxToArguments(); + } + + private void setCtxToArguments() { + arguments.values().forEach(argument -> { + if (argument instanceof EntityAggregationArgumentEntry entityAggArgument) { + entityAggArgument.setCtx(ctx); + } + }); } @Override @@ -154,8 +163,10 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt } private void fillMissingIntervals() { + long now = System.currentTimeMillis(); ZoneId zoneId = interval.getZoneId(); long currentIntervalEndTs = interval.getCurrentIntervalEndTs(); + long watermarkThresholdTs = now - watermarkDuration; Map> intervals = getIntervals(); AggIntervalEntry lastIntervalEntry = intervals.keySet().stream().max(Comparator.comparing(AggIntervalEntry::getEndTs)).orElse(null); @@ -169,6 +180,13 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt while (nextEnd.toInstant().toEpochMilli() <= currentIntervalEndTs) { long nextStartTs = nextStart.toInstant().toEpochMilli(); long nextEndTs = nextEnd.toInstant().toEpochMilli(); + + if (nextEndTs < watermarkThresholdTs) { + nextStart = nextEnd; + nextEnd = interval.getNextIntervalStart(nextStart); + continue; + } + AggIntervalEntry missing = new AggIntervalEntry(nextStartTs, nextEndTs); arguments.forEach((argName, argumentEntry) -> { diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java index 7e0701cd2d..d6f08cbc69 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java @@ -75,7 +75,7 @@ public class CalculatedFieldArgumentUtils { return new SingleValueArgumentEntry(); } - public static ArgumentEntry transformAggregationArgument(List timeSeries, long startIntervalTs, long endIntervalTs) { + public static ArgumentEntry transformAggregationArgument(List timeSeries, long startIntervalTs, long endIntervalTs, CalculatedFieldCtx ctx) { Map aggIntervals = new HashMap<>(); AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(startIntervalTs, endIntervalTs); if (timeSeries == null || timeSeries.isEmpty()) { @@ -83,7 +83,7 @@ public class CalculatedFieldArgumentUtils { } else { aggIntervals.put(aggIntervalEntry, new AggIntervalEntryStatus(System.currentTimeMillis())); } - return new EntityAggregationArgumentEntry(aggIntervals); + return new EntityAggregationArgumentEntry(aggIntervals, ctx); } private static KvEntry createDefaultKvEntry(Argument argument) { diff --git a/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java b/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java index 253d02ebe5..66c11a06a6 100644 --- a/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java @@ -55,6 +55,8 @@ import static org.thingsboard.server.cf.CalculatedFieldIntegrationTest.POLL_INTE @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest { + private final String TZ = "Europe/Kyiv"; + private Tenant savedTenant; @Before @@ -93,7 +95,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest public void testCreateCfAndNoTelemetryDuringInterval_checkAggregation() throws Exception { Device device = createDevice("Device", "1234567890111"); - CustomInterval customInterval = new CustomInterval("Europe/Kyiv", 0L, 5L); + CustomInterval customInterval = new CustomInterval(TZ, 0L, 5L); createConsumptionCF(device.getId(), customInterval, null); long interval = customInterval.getCurrentIntervalDurationMillis(); @@ -113,7 +115,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest public void testCreateCfWithoutWatermark_checkAggregation() throws Exception { Device device = createDevice("Device", "1234567890111"); - CustomInterval customInterval = new CustomInterval("Europe/Kyiv", 0L, 5L); + CustomInterval customInterval = new CustomInterval(TZ, 0L, 5L); createConsumptionCF(device.getId(), customInterval, null); long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs(); @@ -156,7 +158,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest public void testCreateCfWithWatermark_checkAggregationDuringWatermark() throws Exception { Device device = createDevice("Device", "1234567890111"); - CustomInterval customInterval = new CustomInterval("Europe/Kyiv", 0L, 5L); + CustomInterval customInterval = new CustomInterval(TZ, 0L, 5L); Watermark watermark = new Watermark(10); createConsumptionCF(device.getId(), customInterval, watermark); @@ -196,6 +198,51 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest }); } + @Test + public void testSendFutureTelemetry_checkAggregation() throws Exception { + Device device = createDevice("Device", "1234567890111"); + + CustomInterval customInterval = new CustomInterval(TZ, 0L, 2L); + createConsumptionCF(device.getId(), customInterval, null); + + long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs(); + + long tsBeforeInterval = currentIntervalStartTs - 1000; + long tsInInterval_1 = currentIntervalStartTs + 1000; + long tsInInterval_2 = currentIntervalStartTs + 500; + long tsInInterval_3 = currentIntervalStartTs + 200; + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsBeforeInterval)); + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":100}}", tsInInterval_1)); + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":180}}", tsInInterval_2)); + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsInInterval_3)); + + long interval = customInterval.getCurrentIntervalDurationMillis(); + + await().alias("create CF -> perform aggregation after interval end") + .atMost(2 * interval, TimeUnit.MILLISECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode result = getLatestTelemetry(device.getId(), "consumption", "avgConsumption"); + assertThat(result).isNotNull(); + assertThat(result.get("consumption").get(0).get("value").asText()).isEqualTo("400"); + assertThat(result.get("avgConsumption").get(0).get("value").asText()).isEqualTo("133"); + }); + + postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":500}}", currentIntervalStartTs + 4500L)); + + await().alias("update telemetry that belongs to future interval -> check aggregation ") + .atMost(3 * interval, TimeUnit.MILLISECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode result = getLatestTelemetry(device.getId(), "consumption", "avgConsumption"); + assertThat(result).isNotNull(); + assertThat(result.get("consumption").get(0).get("value").asText()).isEqualTo("500"); + assertThat(result.get("consumption").get(0).get("ts").asLong()).isEqualTo(currentIntervalStartTs + 4000L); + assertThat(result.get("avgConsumption").get(0).get("value").asText()).isEqualTo("500"); + assertThat(result.get("avgConsumption").get(0).get("ts").asLong()).isEqualTo(currentIntervalStartTs + 4000L); + }); + } + private CalculatedField createConsumptionCF(EntityId entityId, AggInterval aggInterval, Watermark watermark) { Map arguments = new HashMap<>(); Argument argument = new Argument();