From 322f0b444dbd026499dfdee8c1fd9a3e5b5c0a95 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 29 Oct 2025 16:56:33 +0200 Subject: [PATCH] implemented state --- .../server/actors/ActorSystemContext.java | 4 +- ...CalculatedFieldEntityMessageProcessor.java | 7 +- ...alculatedFieldManagerMessageProcessor.java | 2 +- ...tractCalculatedFieldProcessingService.java | 102 +++++++--- .../cf/CalculatedFieldProcessingService.java | 4 +- ...faultCalculatedFieldProcessingService.java | 9 +- .../cf/ctx/state/ArgumentEntryType.java | 2 +- .../cf/ctx/state/CalculatedFieldCtx.java | 23 ++- .../aggregation/single/AggIntervalEntry.java | 10 +- .../single/AggIntervalEntryStatus.java | 26 +++ .../EntityAggregationArgumentEntry.java | 59 ++++++ ...EntityAggregationCalculatedFieldState.java | 184 ++++++++++++++---- .../server/utils/CalculatedFieldUtils.java | 1 + .../src/main/resources/thingsboard.yml | 5 +- .../thingsboard/server/cf/AlarmRulesTest.java | 2 +- ...gregationCalculatedFieldConfiguration.java | 7 - ...gregationCalculatedFieldConfiguration.java | 4 +- .../single/interval/AggInterval.java | 9 +- .../single/interval/AggIntervalType.java | 3 +- .../single/interval/BaseAggInterval.java | 122 ++++++++++-- .../single/interval/CustomInterval.java | 15 ++ .../single/interval/DayInterval.java | 3 + .../single/interval/HourInterval.java | 3 + .../single/interval/MonthInterval.java | 3 + .../single/interval/SpecificTimeInterval.java | 42 ---- .../single/interval/Watermark.java | 11 ++ .../single/interval/WeekInterval.java | 3 + .../single/interval/WeekSunSatInterval.java | 3 + .../single/interval/YearInterval.java | 3 + 29 files changed, 507 insertions(+), 164 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java delete mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/SpecificTimeInterval.java create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 35cf9cb467..c8b4c37ead 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -664,9 +664,9 @@ public class ActorSystemContext { @Getter private long cfCalculationResultTimeout; - @Value("${actors.alarms.reevaluation_interval:120}") + @Value("${actors.calculated_fields.check_interval:120}") @Getter - private long alarmRulesReevaluationInterval; + private long cfCheckInterval; @Autowired @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 6d3116970b..083af60945 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; -import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -55,7 +54,6 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; -import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; @@ -447,10 +445,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM var configuration = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); long delayUntilIntervalEnd = configuration.getInterval().getDelayUntilIntervalEnd(); ctx.scheduleReevaluation(delayUntilIntervalEnd, actorCtx); - } else { - Map arguments = fetchArguments(ctx); - state.update(arguments, ctx); } + Map arguments = fetchArguments(ctx); + state.update(arguments, ctx); state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); states.put(ctx.getCfId(), state); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 65cedf443f..075918e6e8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -186,7 +186,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } catch (Exception e) { log.warn("[{}] Failed to trigger CFs reevaluation", tenantId, e); } - }, systemContext.getAlarmRulesReevaluationInterval(), systemContext.getAlarmRulesReevaluationInterval(), TimeUnit.SECONDS); + }, systemContext.getCfCheckInterval(), systemContext.getCfCheckInterval(), TimeUnit.SECONDS); } public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index b1abfdf36c..2110a78f5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.cf.configuration.aggregation.AggKeyInp import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric; import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; @@ -39,6 +40,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -53,6 +55,8 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry; +import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntryStatus; +import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationArgumentEntry; import java.util.Collections; import java.util.HashMap; @@ -63,7 +67,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.cf.CalculatedFieldType.PROPAGATION; @@ -106,7 +109,7 @@ public abstract class AbstractCalculatedFieldProcessingService { case GEOFENCING -> fetchGeofencingCalculatedFieldArguments(ctx, entityId, false, ts); case SIMPLE, SCRIPT, ALARM, PROPAGATION -> getBaseCalculatedFieldArguments(ctx, entityId, ts); case RELATED_ENTITIES_AGGREGATION -> fetchRelatedEntitiesAggArguments(ctx, entityId, ts); - case ENTITY_AGGREGATION -> null; + case ENTITY_AGGREGATION -> fetchEntityAggArguments(ctx, entityId, ts); }; if (ctx.getCfType() == PROPAGATION) { argFutures.put(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId)); @@ -201,6 +204,16 @@ public abstract class AbstractCalculatedFieldProcessingService { )); } + protected Map> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { + EntityAggregationCalculatedFieldConfiguration aggConfig = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); + + return aggConfig.getArguments().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> fetchTimeSeries(ctx.getTenantId(), entityId, entry.getValue(), aggConfig.getInterval()) + )); + } + private ListenableFuture> resolveRelatedEntities(TenantId tenantId, EntityId entityId, RelationPathLevel relation) { ListenableFuture> relationsFut = relationService.findByRelationPathQueryAsync(tenantId, new EntityRelationPathQuery(entityId, List.of(relation))); @@ -294,15 +307,21 @@ public abstract class AbstractCalculatedFieldProcessingService { }; } - protected Map fetchArgumentValuesDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception { + protected Map fetchMetricsDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception { var config = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); - Map argumentValues = new HashMap<>(); + Map metricsResult = new HashMap<>(); for (Entry entry : config.getMetrics().entrySet()) { String metricName = entry.getKey(); AggMetric metric = entry.getValue(); AggFunction function = metric.getFunction(); - BaseReadTsKvQuery query = new BaseReadTsKvQuery(((AggKeyInput) metric.getInput()).getKey(), interval.getStartTs(), interval.getEndTs(), 0, 1, Aggregation.valueOf(function.name())); + + AggKeyInput input = (AggKeyInput) metric.getInput(); + String argName = input.getKey(); + Argument argument = ctx.getArguments().get(argName); + String key = argument.getRefEntityKey().getKey(); + + BaseReadTsKvQuery query = new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), 0, 1, Aggregation.valueOf(function.name())); log.trace("[{}][{}] Fetching timeseries for query {}", ctx.getTenantId(), entityId, query); ListenableFuture> tsFuture = timeseriesService.findAll(ctx.getTenantId(), entityId, List.of(query)); ListenableFuture argumentEntryFut = Futures.transform(tsFuture, timeSeries -> { @@ -318,32 +337,61 @@ public abstract class AbstractCalculatedFieldProcessingService { // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, // but this will significantly complicate the code. ArgumentEntry argumentEntry = argumentEntryFut.get(1, TimeUnit.MINUTES); - argumentValues.put(metricName, argumentEntry); + metricsResult.put(metricName, argumentEntry); } - return argumentValues; + return metricsResult; + } + + protected ArgumentEntry fetchMetricDuringInterval(EntityId entityId, AggIntervalEntry interval, String metricName, CalculatedFieldCtx ctx) throws Exception { + var config = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); + + AggMetric metric = config.getMetrics().get(metricName); + AggFunction function = metric.getFunction(); + + AggKeyInput input = (AggKeyInput) metric.getInput(); + String argName = input.getKey(); + Argument argument = ctx.getArguments().get(argName); + String key = argument.getRefEntityKey().getKey(); + + BaseReadTsKvQuery query = new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), 0, 1, Aggregation.valueOf(function.name())); + log.trace("[{}][{}] Fetching timeseries for query {}", ctx.getTenantId(), entityId, query); + ListenableFuture> tsFuture = timeseriesService.findAll(ctx.getTenantId(), entityId, List.of(query)); + ListenableFuture argumentEntryFut = Futures.transform(tsFuture, timeSeries -> { + log.debug("[{}][{}] Fetched {} timeseries for query {}", ctx.getTenantId(), entityId, timeSeries == null ? 0 : timeSeries.size(), query); + if (timeSeries == null || timeSeries.isEmpty()) { + return new SingleValueArgumentEntry(); + } + return ArgumentEntry.createSingleValueArgument(timeSeries.get(0)); + }, calculatedFieldCallbackExecutor); + + // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. + // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. + // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, + // but this will significantly complicate the code. + return argumentEntryFut.get(1, TimeUnit.MINUTES); } -// protected ListenableFuture fetchArgumentValuesDuringInterval(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long startTs) { -// return switch (argument.getRefEntityKey().getType()) { -// case ATTRIBUTE -> fetchAttribute(tenantId, entityId, argument, startTs); -// case TS_LATEST -> fetchTsLatest(tenantId, entityId, argument, startTs); -// default -> throw new IllegalStateException("Unsupported argument key type for entity aggregation calculated field: " + argument.getRefEntityKey().getType()); -// }; -// } -// -// private ListenableFuture fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval) { -// long startInterval = System.currentTimeMillis() - interval.getIntervalDuration(); -// -// ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startInterval, System.currentTimeMillis(), 0, 1, Aggregation.NONE); -// -// log.trace("[{}][{}] Fetching timeseries for query {}", tenantId, entityId, query); -// ListenableFuture> fetchedTelemetryFut = timeseriesService.findAll(tenantId, entityId, List.of(query)); -// return Futures.transform(fetchedTelemetryFut, telemetry -> { -// log.debug("[{}][{}] Fetched {} timeseries for query {}", tenantId, entityId, telemetry == null ? 0 : telemetry.size(), query); -// return new SingleValueArgumentEntry(); -// }, calculatedFieldCallbackExecutor); -// } + private ListenableFuture fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval) { + long startInterval = interval.getCurrentIntervalStartTs(); + + String key = argument.getRefEntityKey().getKey(); + ReadTsKvQuery query = new BaseReadTsKvQuery(key, startInterval, System.currentTimeMillis(), 0, 1, Aggregation.NONE); + + log.trace("[{}][{}] Fetching timeseries for query {}", tenantId, entityId, query); + ListenableFuture> fetchedTelemetryFut = timeseriesService.findAll(tenantId, entityId, List.of(query)); + return Futures.transform(fetchedTelemetryFut, telemetry -> { + log.debug("[{}][{}] Fetched {} timeseries for query {}", tenantId, entityId, telemetry == null ? 0 : telemetry.size(), query); + Map aggIntervals = new HashMap<>(); + AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs()); + if (telemetry == null || telemetry.isEmpty()) { + aggIntervals.put(aggIntervalEntry, new AggIntervalEntryStatus()); + } else { + aggIntervals.put(aggIntervalEntry, new AggIntervalEntryStatus(System.currentTimeMillis())); + } + return new EntityAggregationArgumentEntry(aggIntervals); + }, calculatedFieldCallbackExecutor); + } private ListenableFuture fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) { long argTimeWindow = argument.getTimeWindow() == 0 ? queryEndTs : argument.getTimeWindow(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 3d75be44a7..54796e18eb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -38,7 +38,9 @@ public interface CalculatedFieldProcessingService { Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); - Map fetchArgumentValuesDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception; + Map fetchMetricsDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception; + + ArgumentEntry fetchMetricDuringInterval(EntityId entityId, AggIntervalEntry interval, String argName, CalculatedFieldCtx ctx) throws Exception; void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index d347b9701a..35f767fde2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -113,8 +113,13 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF } @Override - public Map fetchArgumentValuesDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception { - return super.fetchArgumentValuesDuringInterval(entityId, interval, ctx); + public Map fetchMetricsDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception { + return super.fetchMetricsDuringInterval(entityId, interval, ctx); + } + + @Override + public ArgumentEntry fetchMetricDuringInterval(EntityId entityId, AggIntervalEntry interval, String metricName, CalculatedFieldCtx ctx) throws Exception { + return super.fetchMetricDuringInterval(entityId, interval, metricName, ctx); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java index 427df2bf5b..457dd79686 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java @@ -16,5 +16,5 @@ package org.thingsboard.server.service.cf.ctx.state; public enum ArgumentEntryType { - SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES + SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES, ENTITY_AGGREGATION } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 6245435b92..0405687b4c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.cf.configuration.AlarmCalculatedFieldC import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ArgumentsBasedCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.ExpressionBasedCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration; @@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunctionInput; import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval; import org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; @@ -96,6 +98,8 @@ public class CalculatedFieldCtx implements Closeable { private String expression; private boolean useLatestTs; private boolean requiresScheduledReevaluation; +// +// private long lastReevaluationTs; private ActorSystemContext systemContext; private TbelInvokeService tbelInvokeService; @@ -194,9 +198,6 @@ public class CalculatedFieldCtx implements Closeable { if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) { this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L; } - if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) { - this.scheduledUpdateIntervalMillis = entityAggregationConfig.getInterval().getIntervalDuration(); - } this.requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation(); if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) { this.useLatestTs = aggConfig.isUseLatestTs(); @@ -211,6 +212,21 @@ public class CalculatedFieldCtx implements Closeable { this.maxStateSize = systemContext.getApiLimitService().getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes) * 1024; this.maxSingleValueArgumentSize = systemContext.getApiLimitService().getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxSingleValueArgumentSizeInKBytes) * 1024; } +// +// public boolean isRequiresScheduledReevaluation() { +// if (CalculatedFieldType.ENTITY_AGGREGATION.equals(calculatedField.getType())) { +// var configuration = (EntityAggregationCalculatedFieldConfiguration) calculatedField.getConfiguration(); +// AggInterval interval = configuration.getInterval(); +// long delayUntilIntervalEnd = interval.getDelayUntilIntervalEnd(); +// if (lastReevaluationTs < System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval())) { +// +// } +// if (TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval()) >= delayUntilIntervalEnd) { +// return true; +// } +// } +// return requiresScheduledReevaluation; +// } public void init() { switch (cfType) { @@ -252,6 +268,7 @@ public class CalculatedFieldCtx implements Closeable { }); initialized = true; } + case ENTITY_AGGREGATION -> initialized = true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java index 657cd002ac..fac4e403a8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java @@ -15,12 +15,18 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.single; +import lombok.AllArgsConstructor; import lombok.Data; @Data +@AllArgsConstructor public class AggIntervalEntry { - public Long startTs; - public Long endTs; + private Long startTs; + private Long endTs; + + public boolean belongsToInterval(long ts) { + return ts >= startTs && ts <= endTs; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java new file mode 100644 index 0000000000..09fa341961 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java @@ -0,0 +1,26 @@ +package org.thingsboard.server.service.cf.ctx.state.aggregation.single; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Data +@NoArgsConstructor +public class AggIntervalEntryStatus { + + @Setter + private long lastArgsRefreshTs = -1; + @Setter + private long lastMetricsEvalTs = -1; + + public AggIntervalEntryStatus(long lastArgsRefreshTs) { + this.lastArgsRefreshTs = lastArgsRefreshTs; + } + + public boolean shouldRecalculate(long checkInterval) { + boolean intervalPassed = lastMetricsEvalTs <= System.currentTimeMillis() - checkInterval; + boolean argsUpdatedDuringInterval = lastArgsRefreshTs > lastMetricsEvalTs; + return intervalPassed && argsUpdatedDuringInterval; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java new file mode 100644 index 0000000000..10941ebb19 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java @@ -0,0 +1,59 @@ +package org.thingsboard.server.service.cf.ctx.state.aggregation.single; + +import lombok.Data; +import org.thingsboard.script.api.tbel.TbelCfArg; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; +import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; + +import java.util.Map; + +@Data +public class EntityAggregationArgumentEntry implements ArgumentEntry { + + private Map aggIntervals; + + private boolean forceResetPrevious; + + public EntityAggregationArgumentEntry(Map aggIntervals) { + this.aggIntervals = aggIntervals; + } + + @Override + public ArgumentEntryType getType() { + return ArgumentEntryType.ENTITY_AGGREGATION; + } + + @Override + public Object getValue() { + return aggIntervals; + } + + @Override + public boolean updateEntry(ArgumentEntry entry) { + if (entry instanceof EntityAggregationArgumentEntry entityAggEntry) { + aggIntervals.putAll(entityAggEntry.getAggIntervals()); + } else if (entry instanceof SingleValueArgumentEntry singleValueArgEntry) { + long entryTs = singleValueArgEntry.getTs(); + for (Map.Entry aggIntervalEntry : aggIntervals.entrySet()) { + if (aggIntervalEntry.getKey().belongsToInterval(entryTs)) { + aggIntervalEntry.getValue().setLastArgsRefreshTs(System.currentTimeMillis()); + aggIntervals.put(aggIntervalEntry.getKey(), aggIntervalEntry.getValue()); + return true; + } + } + } + return false; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public TbelCfArg toTbelCfArg() { + return null; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java index 6349c635e0..0ae745068e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java @@ -15,15 +15,18 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.single; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import lombok.Setter; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.aggregation.AggKeyInput; +import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric; import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldResult; @@ -35,20 +38,16 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import java.util.HashMap; import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; - public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldState { - private Map aggIntervals = new HashMap<>(); - - @Setter - private long lastArgsRefreshTs = -1; - @Setter - private long lastMetricsEvalTs = -1; + private AggInterval interval; + private long intervalDuration; + private long watermarkDuration; + private long checkInterval; - private long deduplicationIntervalMs = -1; + private Map metrics; - CalculatedFieldProcessingService cfProcessingService; + private CalculatedFieldProcessingService cfProcessingService; public EntityAggregationCalculatedFieldState(EntityId entityId) { super(entityId); @@ -59,7 +58,11 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt super.setCtx(ctx, actorCtx); this.cfProcessingService = ctx.getCfProcessingService(); var configuration = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); - deduplicationIntervalMs = SECONDS.toMillis(configuration.getDeduplicationIntervalInSec()); + intervalDuration = configuration.getInterval().getIntervalDurationMillis(); + watermarkDuration = configuration.getWatermark().getDuration(); + checkInterval = configuration.getWatermark().getCheckInterval(); + interval = configuration.getInterval(); + metrics = configuration.getMetrics(); } @Override @@ -69,44 +72,141 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt @Override public ListenableFuture performCalculation(Map updatedArgs, CalculatedFieldCtx ctx) throws Exception { - long endTs = System.currentTimeMillis(); - long startTs = endTs - 1000; - AggIntervalEntry interval = new AggIntervalEntry(); - - Map metrics = cfProcessingService.fetchArgumentValuesDuringInterval(entityId, interval, ctx); - - Output output = ctx.getOutput(); - lastMetricsEvalTs = System.currentTimeMillis(); - ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); - ObjectNode result = toResult(endTs, metrics); - if (result != null) { - return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() - .type(output.getType()) - .scope(output.getScope()) - .result(result) - .build()); + long now = System.currentTimeMillis(); + AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs()); + boolean exists = false; + for (Map.Entry entry : arguments.entrySet()) { + ArgumentEntry argumentEntry = entry.getValue(); + EntityAggregationArgumentEntry entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry; + Map aggIntervals = entityAggEntry.getAggIntervals(); + exists |= aggIntervals.containsKey(aggIntervalEntry); + } + if (!exists) { + arguments.forEach((argName, argumentEntry) -> { + EntityAggregationArgumentEntry entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry; + entityAggEntry.getAggIntervals().put(aggIntervalEntry, new AggIntervalEntryStatus()); + }); + ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx); } - return Futures.immediateFuture(TelemetryCalculatedFieldResult.EMPTY); - } - - protected ObjectNode toResult(long endTs, Map metrics) { - ObjectNode metricsNode = JacksonUtil.newObjectNode(); - for (Map.Entry entry : metrics.entrySet()) { - String metricName = entry.getKey(); + Map> results = new HashMap<>(); + for (Map.Entry entry : arguments.entrySet()) { + String argName = entry.getKey(); ArgumentEntry argumentEntry = entry.getValue(); - if (!argumentEntry.isEmpty()) { - metricsNode.put(metricName, JacksonUtil.toString(argumentEntry.getValue())); + + EntityAggregationArgumentEntry entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry; + Map aggIntervals = entityAggEntry.getAggIntervals(); + for (Map.Entry aggInterval : aggIntervals.entrySet()) { + AggIntervalEntry intervalEntry = aggInterval.getKey(); + AggIntervalEntryStatus entryStatus = aggInterval.getValue(); + + Long startTs = intervalEntry.getStartTs(); + Long endTs = intervalEntry.getEndTs(); + if (now - endTs > watermarkDuration) { + if (entryStatus.getLastArgsRefreshTs() > entryStatus.getLastMetricsEvalTs()) { + String metricName = null; + for (Map.Entry metricEntry : metrics.entrySet()) { + if (((AggKeyInput) metricEntry.getValue().getInput()).getKey().equals(argName)) { + metricName = metricEntry.getKey(); + } + } + ArgumentEntry metric = cfProcessingService.fetchMetricDuringInterval(entityId, intervalEntry, metricName, ctx); + if (!metric.isEmpty()) { + results.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, metric); + } + } + aggIntervals.remove(intervalEntry); + continue; + } else if (now - startTs >= intervalDuration) { + if (entryStatus.shouldRecalculate(checkInterval)) { + String metricName = null; + for (Map.Entry metricEntry : metrics.entrySet()) { + if (((AggKeyInput) metricEntry.getValue().getInput()).getKey().equals(argName)) { + metricName = metricEntry.getKey(); + } + } + ArgumentEntry metric = cfProcessingService.fetchMetricDuringInterval(entityId, intervalEntry, metricName, ctx); + if (!metric.isEmpty()) { + results.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, metric); + } + } + } } } - ObjectNode resultNode = JacksonUtil.newObjectNode(); - if (!metricsNode.isEmpty()) { - resultNode.put("ts", endTs); - resultNode.set("values", metricsNode); + ArrayNode result = toResult(results); + if (result.isEmpty()) { + return Futures.immediateFuture(TelemetryCalculatedFieldResult.EMPTY); } - return resultNode; + Output output = ctx.getOutput(); + return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() + .type(output.getType()) + .scope(output.getScope()) + .result(result) + .build()); + +// long now = System.currentTimeMillis(); +// AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs(), false); +// if (!intervals.containsKey(aggIntervalEntry)) { +// intervals.put(aggIntervalEntry, new AggIntervalEntryStatus()); +// ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx); +// } +// ArrayNode results = JacksonUtil.newArrayNode(); +// for (Map.Entry entry : intervals.entrySet()) { +// AggIntervalEntry intervalEntry = entry.getKey(); +// AggIntervalEntryStatus entryStatus = entry.getValue(); +// +// Long startTs = intervalEntry.getStartTs(); +// Long endTs = intervalEntry.getEndTs(); +// if (now - endTs > watermarkDuration) { +// if (entryStatus.getLastArgsRefreshTs() > entryStatus.getLastMetricsEvalTs()) { +// ArgumentEntry metric = cfProcessingService.fetchMetricDuringInterval(entityId, intervalEntry, metricName, ctx); +// ObjectNode result = fetchMetrics(intervalEntry); +// if (result != null) { +// results.add(result); +// } +// } +// intervals.remove(intervalEntry); +// continue; +// } else if (now - startTs >= intervalDuration) { +// if (entryStatus.shouldRecalculate(checkInterval)) { +// ObjectNode result = fetchMetrics(intervalEntry); +// if (result != null) { +// results.add(result); +// } +// } +// } +// } +// if (results.isEmpty()) { +// return Futures.immediateFuture(TelemetryCalculatedFieldResult.EMPTY); +// } +// Output output = ctx.getOutput(); +// return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() +// .type(output.getType()) +// .scope(output.getScope()) +// .result(results) +// .build()); } + protected ArrayNode toResult(Map> results) { + ArrayNode result = JacksonUtil.newArrayNode(); + results.forEach((interval, args) -> { + ObjectNode metricsNode = JacksonUtil.newObjectNode(); + for (Map.Entry entry : args.entrySet()) { + String metricName = entry.getKey(); + ArgumentEntry argumentEntry = entry.getValue(); + if (!argumentEntry.isEmpty()) { + metricsNode.put(metricName, JacksonUtil.toString(argumentEntry.getValue())); + } + } + ObjectNode resultNode = JacksonUtil.newObjectNode(); + if (!metricsNode.isEmpty()) { + resultNode.put("ts", interval.getEndTs()); + resultNode.set("values", metricsNode); + } + result.add(resultNode); + }); + return result; + } @Override public boolean isReady() { diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 121febea7d..8a3e352826 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -204,6 +204,7 @@ public class CalculatedFieldUtils { case ALARM -> new AlarmCalculatedFieldState(id.entityId()); case PROPAGATION -> new PropagationCalculatedFieldState(id.entityId()); case RELATED_ENTITIES_AGGREGATION -> new RelatedEntitiesAggregationCalculatedFieldState(id.entityId()); + case ENTITY_AGGREGATION -> null; // todo }; if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 889df54848..88f2f017d3 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -529,9 +529,8 @@ actors: configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}" # Time in seconds to receive calculation result. calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}" - alarms: - # Interval in seconds to re-evaluate Alarm rules that have a time schedule. 2 minutes by default. - reevaluation_interval: "${ACTORS_ALARMS_REEVALUATION_INTERVAL_SEC:120}" + # Interval in seconds to re-evaluate calculated fields that have a time schedule. 2 minutes by default. + check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:120}" debug: settings: diff --git a/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java b/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java index 652e69781d..135b83496f 100644 --- a/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java @@ -86,7 +86,7 @@ import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @Slf4j @DaoSqlTest @TestPropertySource(properties = { - "actors.alarms.reevaluation_interval=1" + "actors.calculated_fields.check_interval=1" }) public class AlarmRulesTest extends AbstractControllerTest { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java index 931cb919ec..9d4c7bdaf6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation; -import com.fasterxml.jackson.annotation.JsonIgnore; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -57,10 +56,4 @@ public class RelatedEntitiesAggregationCalculatedFieldConfiguration implements A } } - @JsonIgnore - @Override - public boolean requiresScheduledReevaluation() { - return true; - } - } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java index 69f4b30d46..3f5fc7eb96 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentsBasedCalcula import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric; import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval; +import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.Watermark; import java.util.Map; @@ -36,8 +37,7 @@ public class EntityAggregationCalculatedFieldConfiguration implements ArgumentsB private Map metrics; private AggInterval interval; - private long deduplicationIntervalInSec; - private long watermark; + private Watermark watermark; private Output output; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggInterval.java index cf6f825560..966e778240 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggInterval.java @@ -31,14 +31,19 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type(value = WeekSunSatInterval.class, name = "WEEK_SUN_SAT"), @JsonSubTypes.Type(value = MonthInterval.class, name = "MONTH"), @JsonSubTypes.Type(value = YearInterval.class, name = "YEAR"), - @JsonSubTypes.Type(value = CustomInterval.class, name = "CUSTOM"), - @JsonSubTypes.Type(value = SpecificTimeInterval.class, name = "SPECIFIC_TIME"), + @JsonSubTypes.Type(value = CustomInterval.class, name = "CUSTOM") }) @JsonIgnoreProperties(ignoreUnknown = true) public interface AggInterval { AggIntervalType getType(); + long getIntervalDurationMillis(); + + long getCurrentIntervalStartTs(); + + long getCurrentIntervalEndTs(); + long getDelayUntilIntervalEnd(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggIntervalType.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggIntervalType.java index a5c29b4edf..62185127ed 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggIntervalType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggIntervalType.java @@ -23,7 +23,6 @@ public enum AggIntervalType { WEEK_SUN_SAT, MONTH, YEAR, - CUSTOM, - SPECIFIC_TIME + CUSTOM } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java index 5790d7545d..b878ef345a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java @@ -15,40 +15,124 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + import java.time.DayOfWeek; import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAdjusters; +@Data public abstract class BaseAggInterval implements AggInterval { protected long offsetMillis; // delay millis since start of interval + @Override + public long getIntervalDurationMillis() { + return getCurrentIntervalEndTs() - getCurrentIntervalStartTs(); + } + + @Override + public long getCurrentIntervalStartTs() { + return getCurrentIntervalStartTs(getType(), 1); + } + + protected long getCurrentIntervalStartTs(AggIntervalType type, int multiplier) { + return getAlignedBoundary(type, multiplier, false).toInstant().toEpochMilli(); + } + + @Override + public long getCurrentIntervalEndTs() { + return getCurrentIntervalEndTs(getType(), 1); + } + + protected long getCurrentIntervalEndTs(AggIntervalType type, int multiplier) { + return getAlignedBoundary(type, multiplier, true).toInstant().toEpochMilli(); + } + @Override public long getDelayUntilIntervalEnd() { return getDelayUntilIntervalEnd(getType(), 1); } - protected long getDelayUntilIntervalEnd(AggIntervalType type, long multiplier) { + protected long getDelayUntilIntervalEnd(AggIntervalType type, int multiplier) { + ZonedDateTime now = ZonedDateTime.now(); + ZonedDateTime currentStart = getAlignedBoundary(type, multiplier, false); + ZonedDateTime nextStart = getAlignedBoundary(type, multiplier, true); + + long periodMillis = Duration.between(currentStart, nextStart).toMillis(); + + // Apply offset: this shifts the grid + long off = offsetMillis % periodMillis; + if (off < 0) off += periodMillis; + + // Compute the offset-aligned start times + ZonedDateTime offsetCurrentStart = currentStart.plus(Duration.ofMillis(off)); + ZonedDateTime offsetNextStart = offsetCurrentStart.plus(Duration.ofMillis(periodMillis)); + + // If we are already past the current offset start, move to the next + ZonedDateTime target = offsetCurrentStart.isAfter(now) ? offsetCurrentStart : offsetNextStart; + // todo fix + return Math.max(Duration.between(now, target).toMillis(), 0); + } + + protected ZonedDateTime getAlignedBoundary(AggIntervalType type, int multiplier, boolean next) { ZonedDateTime now = ZonedDateTime.now(); - ZonedDateTime next; - - switch (getType()) { - case HOUR -> next = now.plusHours(multiplier).truncatedTo(ChronoUnit.HOURS); - case DAY -> next = now.plusDays(multiplier).truncatedTo(ChronoUnit.DAYS); - case WEEK -> next = now.plusWeeks(multiplier).with(DayOfWeek.MONDAY).truncatedTo(ChronoUnit.DAYS); - case WEEK_SUN_SAT -> next = now.plusWeeks(multiplier).with(DayOfWeek.SUNDAY).truncatedTo(ChronoUnit.DAYS); - case MONTH -> next = now.plusMonths(multiplier).withDayOfMonth(1).truncatedTo(ChronoUnit.DAYS); - case YEAR -> next = now.plusYears(multiplier).withDayOfYear(1).truncatedTo(ChronoUnit.DAYS); - default -> throw new IllegalArgumentException("Unsupported type: " + getType()); - } - - long delayMillis = Duration.between(now, next).toMillis(); - if (offsetMillis > 0) { - delayMillis += offsetMillis; - } - - return Math.max(delayMillis, 0); + + return switch (type) { + case HOUR -> alignByHour(now, multiplier, next); + case DAY -> alignByDay(now, multiplier, next); + case WEEK -> alignByWeek(now, multiplier, DayOfWeek.MONDAY, next); + case WEEK_SUN_SAT -> alignByWeek(now, multiplier, DayOfWeek.SUNDAY, next); + case MONTH -> alignByMonth(now, multiplier, next); + case YEAR -> alignByYear(now, multiplier, next); + default -> throw new IllegalArgumentException("Unsupported type: " + type); + }; + } + + private ZonedDateTime alignByHour(ZonedDateTime now, int multiplier, boolean next) { + ZonedDateTime startOfDay = now.truncatedTo(ChronoUnit.DAYS); + long hoursSinceMidnight = Duration.between(startOfDay, now).toHours(); + long aligned = (hoursSinceMidnight / multiplier) * multiplier; + if (next) aligned += multiplier; + return startOfDay.plusHours(aligned); + } + + private ZonedDateTime alignByDay(ZonedDateTime now, int multiplier, boolean next) { + long daysSinceEpoch = now.toLocalDate().toEpochDay(); + long aligned = (daysSinceEpoch / multiplier) * multiplier; + if (next) aligned += multiplier; + long diff = aligned - daysSinceEpoch; + return now.truncatedTo(ChronoUnit.DAYS).plusDays(diff); + } + + private ZonedDateTime alignByWeek(ZonedDateTime now, int multiplier, DayOfWeek startOfWeekDay, boolean next) { + ZonedDateTime startOfWeek = now.with(TemporalAdjusters.previousOrSame(startOfWeekDay)) + .truncatedTo(ChronoUnit.DAYS); + long weeksSinceEpoch = ChronoUnit.WEEKS.between( + ZonedDateTime.ofInstant(Instant.EPOCH, now.getZone()), startOfWeek); + long aligned = (weeksSinceEpoch / multiplier) * multiplier; + if (next) aligned += multiplier; + return startOfWeek.plusWeeks(aligned - weeksSinceEpoch); + } + + private ZonedDateTime alignByMonth(ZonedDateTime now, int multiplier, boolean next) { + ZonedDateTime startOfMonth = now.withDayOfMonth(1).truncatedTo(ChronoUnit.DAYS); + long monthsSinceEpoch = now.getYear() * 12L + now.getMonthValue() - 1; + long aligned = (monthsSinceEpoch / multiplier) * multiplier; + if (next) aligned += multiplier; + return startOfMonth.plusMonths(aligned - monthsSinceEpoch); + } + + private ZonedDateTime alignByYear(ZonedDateTime now, int multiplier, boolean next) { + int year = now.getYear(); + int aligned = (year / multiplier) * multiplier; + if (next) aligned += multiplier; + return ZonedDateTime.of(LocalDate.of(aligned, 1, 1), LocalTime.MIDNIGHT, now.getZone()); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/CustomInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/CustomInterval.java index 585e11d955..d875794a18 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/CustomInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/CustomInterval.java @@ -25,6 +25,21 @@ public class CustomInterval extends BaseAggInterval { return AggIntervalType.CUSTOM; } + @Override + public long getIntervalDurationMillis() { + return getCurrentIntervalEndTs() - getCurrentIntervalStartTs(); + } + + @Override + public long getCurrentIntervalStartTs() { + return super.getCurrentIntervalStartTs(internalIntervalType, multiplier); + } + + @Override + public long getCurrentIntervalEndTs() { + return super.getCurrentIntervalEndTs(internalIntervalType, multiplier); + } + @Override public long getDelayUntilIntervalEnd() { return super.getDelayUntilIntervalEnd(internalIntervalType, multiplier); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/DayInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/DayInterval.java index 19620c6fda..01cbdf2e97 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/DayInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/DayInterval.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + +@Data public class DayInterval extends BaseAggInterval { @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/HourInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/HourInterval.java index 303ff8305e..ce84b57ae1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/HourInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/HourInterval.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + +@Data public class HourInterval extends BaseAggInterval { @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/MonthInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/MonthInterval.java index 1d7eaf14bc..fe8d60f41c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/MonthInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/MonthInterval.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + +@Data public class MonthInterval extends BaseAggInterval { @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/SpecificTimeInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/SpecificTimeInterval.java deleted file mode 100644 index 2b44366f8b..0000000000 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/SpecificTimeInterval.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; - -import java.time.LocalTime; - -public class SpecificTimeInterval implements AggInterval { - - public Long startMillis; // start millis since start of day - public Long endMillis; // end millis since start of day - - @Override - public AggIntervalType getType() { - return AggIntervalType.SPECIFIC_TIME; - } - - @Override - public long getDelayUntilIntervalEnd() { - long nowMillis = LocalTime.now().toNanoOfDay() / 1_000_000L; - long delayMillis; - if (nowMillis < endMillis) { - delayMillis = endMillis - nowMillis; // later today - } else { - delayMillis = (24 * 60 * 60 * 1000L - nowMillis) + endMillis; // next day - } - return delayMillis; - } - -} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java new file mode 100644 index 0000000000..44dd6404f6 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java @@ -0,0 +1,11 @@ +package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; + +import lombok.Data; + +@Data +public class Watermark { + + private long duration; + private long checkInterval; + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekInterval.java index 4dd10a2747..5a93076772 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekInterval.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + +@Data public class WeekInterval extends BaseAggInterval { @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekSunSatInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekSunSatInterval.java index c12e7dc584..c70dd79a9f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekSunSatInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekSunSatInterval.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + +@Data public class WeekSunSatInterval extends BaseAggInterval { @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/YearInterval.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/YearInterval.java index 24c6c72ed1..3c600064d1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/YearInterval.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/YearInterval.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval; +import lombok.Data; + +@Data public class YearInterval extends BaseAggInterval { @Override