Browse Source

implemented state

pull/14253/head
IrynaMatveieva 7 months ago
parent
commit
322f0b444d
  1. 4
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 7
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  3. 2
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  4. 102
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  5. 4
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java
  6. 9
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  7. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java
  8. 23
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  9. 10
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java
  10. 26
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java
  11. 59
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java
  12. 184
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java
  13. 1
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java
  14. 5
      application/src/main/resources/thingsboard.yml
  15. 2
      application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java
  16. 7
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java
  17. 4
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java
  18. 9
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggInterval.java
  19. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggIntervalType.java
  20. 122
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java
  21. 15
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/CustomInterval.java
  22. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/DayInterval.java
  23. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/HourInterval.java
  24. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/MonthInterval.java
  25. 42
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/SpecificTimeInterval.java
  26. 11
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java
  27. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekInterval.java
  28. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekSunSatInterval.java
  29. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/YearInterval.java

4
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -664,9 +664,9 @@ public class ActorSystemContext {
@Getter
private long cfCalculationResultTimeout;
@Value("${actors.alarms.reevaluation_interval:120}")
@Value("${actors.calculated_fields.check_interval:120}")
@Getter
private long alarmRulesReevaluationInterval;
private long cfCheckInterval;
@Autowired
@Getter

7
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -55,7 +54,6 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState;
@ -447,10 +445,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
var configuration = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
long delayUntilIntervalEnd = configuration.getInterval().getDelayUntilIntervalEnd();
ctx.scheduleReevaluation(delayUntilIntervalEnd, actorCtx);
} else {
Map<String, ArgumentEntry> arguments = fetchArguments(ctx);
state.update(arguments, ctx);
}
Map<String, ArgumentEntry> arguments = fetchArguments(ctx);
state.update(arguments, ctx);
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize());
states.put(ctx.getCfId(), state);

2
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -186,7 +186,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} catch (Exception e) {
log.warn("[{}] Failed to trigger CFs reevaluation", tenantId, e);
}
}, systemContext.getAlarmRulesReevaluationInterval(), systemContext.getAlarmRulesReevaluationInterval(), TimeUnit.SECONDS);
}, systemContext.getCfCheckInterval(), systemContext.getCfCheckInterval(), TimeUnit.SECONDS);
}
public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException {

102
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.cf.configuration.aggregation.AggKeyInp
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric;
import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
@ -39,6 +40,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
@ -53,6 +55,8 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntryStatus;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationArgumentEntry;
import java.util.Collections;
import java.util.HashMap;
@ -63,7 +67,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.cf.CalculatedFieldType.PROPAGATION;
@ -106,7 +109,7 @@ public abstract class AbstractCalculatedFieldProcessingService {
case GEOFENCING -> fetchGeofencingCalculatedFieldArguments(ctx, entityId, false, ts);
case SIMPLE, SCRIPT, ALARM, PROPAGATION -> getBaseCalculatedFieldArguments(ctx, entityId, ts);
case RELATED_ENTITIES_AGGREGATION -> fetchRelatedEntitiesAggArguments(ctx, entityId, ts);
case ENTITY_AGGREGATION -> null;
case ENTITY_AGGREGATION -> fetchEntityAggArguments(ctx, entityId, ts);
};
if (ctx.getCfType() == PROPAGATION) {
argFutures.put(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId));
@ -201,6 +204,16 @@ public abstract class AbstractCalculatedFieldProcessingService {
));
}
protected Map<String, ListenableFuture<ArgumentEntry>> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
EntityAggregationCalculatedFieldConfiguration aggConfig = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
return aggConfig.getArguments().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> fetchTimeSeries(ctx.getTenantId(), entityId, entry.getValue(), aggConfig.getInterval())
));
}
private ListenableFuture<List<EntityId>> resolveRelatedEntities(TenantId tenantId, EntityId entityId, RelationPathLevel relation) {
ListenableFuture<List<EntityRelation>> relationsFut = relationService.findByRelationPathQueryAsync(tenantId, new EntityRelationPathQuery(entityId, List.of(relation)));
@ -294,15 +307,21 @@ public abstract class AbstractCalculatedFieldProcessingService {
};
}
protected Map<String, ArgumentEntry> fetchArgumentValuesDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception {
protected Map<String, ArgumentEntry> fetchMetricsDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception {
var config = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
Map<String, ArgumentEntry> argumentValues = new HashMap<>();
Map<String, ArgumentEntry> metricsResult = new HashMap<>();
for (Entry<String, AggMetric> entry : config.getMetrics().entrySet()) {
String metricName = entry.getKey();
AggMetric metric = entry.getValue();
AggFunction function = metric.getFunction();
BaseReadTsKvQuery query = new BaseReadTsKvQuery(((AggKeyInput) metric.getInput()).getKey(), interval.getStartTs(), interval.getEndTs(), 0, 1, Aggregation.valueOf(function.name()));
AggKeyInput input = (AggKeyInput) metric.getInput();
String argName = input.getKey();
Argument argument = ctx.getArguments().get(argName);
String key = argument.getRefEntityKey().getKey();
BaseReadTsKvQuery query = new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), 0, 1, Aggregation.valueOf(function.name()));
log.trace("[{}][{}] Fetching timeseries for query {}", ctx.getTenantId(), entityId, query);
ListenableFuture<List<TsKvEntry>> tsFuture = timeseriesService.findAll(ctx.getTenantId(), entityId, List.of(query));
ListenableFuture<ArgumentEntry> argumentEntryFut = Futures.transform(tsFuture, timeSeries -> {
@ -318,32 +337,61 @@ public abstract class AbstractCalculatedFieldProcessingService {
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
ArgumentEntry argumentEntry = argumentEntryFut.get(1, TimeUnit.MINUTES);
argumentValues.put(metricName, argumentEntry);
metricsResult.put(metricName, argumentEntry);
}
return argumentValues;
return metricsResult;
}
protected ArgumentEntry fetchMetricDuringInterval(EntityId entityId, AggIntervalEntry interval, String metricName, CalculatedFieldCtx ctx) throws Exception {
var config = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
AggMetric metric = config.getMetrics().get(metricName);
AggFunction function = metric.getFunction();
AggKeyInput input = (AggKeyInput) metric.getInput();
String argName = input.getKey();
Argument argument = ctx.getArguments().get(argName);
String key = argument.getRefEntityKey().getKey();
BaseReadTsKvQuery query = new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), 0, 1, Aggregation.valueOf(function.name()));
log.trace("[{}][{}] Fetching timeseries for query {}", ctx.getTenantId(), entityId, query);
ListenableFuture<List<TsKvEntry>> tsFuture = timeseriesService.findAll(ctx.getTenantId(), entityId, List.of(query));
ListenableFuture<ArgumentEntry> argumentEntryFut = Futures.transform(tsFuture, timeSeries -> {
log.debug("[{}][{}] Fetched {} timeseries for query {}", ctx.getTenantId(), entityId, timeSeries == null ? 0 : timeSeries.size(), query);
if (timeSeries == null || timeSeries.isEmpty()) {
return new SingleValueArgumentEntry();
}
return ArgumentEntry.createSingleValueArgument(timeSeries.get(0));
}, calculatedFieldCallbackExecutor);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
return argumentEntryFut.get(1, TimeUnit.MINUTES);
}
// protected ListenableFuture<ArgumentEntry> fetchArgumentValuesDuringInterval(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long startTs) {
// return switch (argument.getRefEntityKey().getType()) {
// case ATTRIBUTE -> fetchAttribute(tenantId, entityId, argument, startTs);
// case TS_LATEST -> fetchTsLatest(tenantId, entityId, argument, startTs);
// default -> throw new IllegalStateException("Unsupported argument key type for entity aggregation calculated field: " + argument.getRefEntityKey().getType());
// };
// }
//
// private ListenableFuture<ArgumentEntry> fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval) {
// long startInterval = System.currentTimeMillis() - interval.getIntervalDuration();
//
// ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startInterval, System.currentTimeMillis(), 0, 1, Aggregation.NONE);
//
// log.trace("[{}][{}] Fetching timeseries for query {}", tenantId, entityId, query);
// ListenableFuture<List<TsKvEntry>> fetchedTelemetryFut = timeseriesService.findAll(tenantId, entityId, List.of(query));
// return Futures.transform(fetchedTelemetryFut, telemetry -> {
// log.debug("[{}][{}] Fetched {} timeseries for query {}", tenantId, entityId, telemetry == null ? 0 : telemetry.size(), query);
// return new SingleValueArgumentEntry();
// }, calculatedFieldCallbackExecutor);
// }
private ListenableFuture<ArgumentEntry> fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval) {
long startInterval = interval.getCurrentIntervalStartTs();
String key = argument.getRefEntityKey().getKey();
ReadTsKvQuery query = new BaseReadTsKvQuery(key, startInterval, System.currentTimeMillis(), 0, 1, Aggregation.NONE);
log.trace("[{}][{}] Fetching timeseries for query {}", tenantId, entityId, query);
ListenableFuture<List<TsKvEntry>> fetchedTelemetryFut = timeseriesService.findAll(tenantId, entityId, List.of(query));
return Futures.transform(fetchedTelemetryFut, telemetry -> {
log.debug("[{}][{}] Fetched {} timeseries for query {}", tenantId, entityId, telemetry == null ? 0 : telemetry.size(), query);
Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals = new HashMap<>();
AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs());
if (telemetry == null || telemetry.isEmpty()) {
aggIntervals.put(aggIntervalEntry, new AggIntervalEntryStatus());
} else {
aggIntervals.put(aggIntervalEntry, new AggIntervalEntryStatus(System.currentTimeMillis()));
}
return new EntityAggregationArgumentEntry(aggIntervals);
}, calculatedFieldCallbackExecutor);
}
private ListenableFuture<ArgumentEntry> fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) {
long argTimeWindow = argument.getTimeWindow() == 0 ? queryEndTs : argument.getTimeWindow();

4
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java

@ -38,7 +38,9 @@ public interface CalculatedFieldProcessingService {
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments);
Map<String, ArgumentEntry> fetchArgumentValuesDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception;
Map<String, ArgumentEntry> fetchMetricsDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception;
ArgumentEntry fetchMetricDuringInterval(EntityId entityId, AggIntervalEntry interval, String argName, CalculatedFieldCtx ctx) throws Exception;
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback);

9
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -113,8 +113,13 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
}
@Override
public Map<String, ArgumentEntry> fetchArgumentValuesDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception {
return super.fetchArgumentValuesDuringInterval(entityId, interval, ctx);
public Map<String, ArgumentEntry> fetchMetricsDuringInterval(EntityId entityId, AggIntervalEntry interval, CalculatedFieldCtx ctx) throws Exception {
return super.fetchMetricsDuringInterval(entityId, interval, ctx);
}
@Override
public ArgumentEntry fetchMetricDuringInterval(EntityId entityId, AggIntervalEntry interval, String metricName, CalculatedFieldCtx ctx) throws Exception {
return super.fetchMetricDuringInterval(entityId, interval, metricName, ctx);
}
@Override

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java

@ -16,5 +16,5 @@
package org.thingsboard.server.service.cf.ctx.state;
public enum ArgumentEntryType {
SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES
SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES, ENTITY_AGGREGATION
}

23
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.cf.configuration.AlarmCalculatedFieldC
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ArgumentsBasedCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ExpressionBasedCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration;
@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunctionInput;
import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval;
import org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
@ -96,6 +98,8 @@ public class CalculatedFieldCtx implements Closeable {
private String expression;
private boolean useLatestTs;
private boolean requiresScheduledReevaluation;
//
// private long lastReevaluationTs;
private ActorSystemContext systemContext;
private TbelInvokeService tbelInvokeService;
@ -194,9 +198,6 @@ public class CalculatedFieldCtx implements Closeable {
if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) {
this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L;
}
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) {
this.scheduledUpdateIntervalMillis = entityAggregationConfig.getInterval().getIntervalDuration();
}
this.requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation();
if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) {
this.useLatestTs = aggConfig.isUseLatestTs();
@ -211,6 +212,21 @@ public class CalculatedFieldCtx implements Closeable {
this.maxStateSize = systemContext.getApiLimitService().getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes) * 1024;
this.maxSingleValueArgumentSize = systemContext.getApiLimitService().getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxSingleValueArgumentSizeInKBytes) * 1024;
}
//
// public boolean isRequiresScheduledReevaluation() {
// if (CalculatedFieldType.ENTITY_AGGREGATION.equals(calculatedField.getType())) {
// var configuration = (EntityAggregationCalculatedFieldConfiguration) calculatedField.getConfiguration();
// AggInterval interval = configuration.getInterval();
// long delayUntilIntervalEnd = interval.getDelayUntilIntervalEnd();
// if (lastReevaluationTs < System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval())) {
//
// }
// if (TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval()) >= delayUntilIntervalEnd) {
// return true;
// }
// }
// return requiresScheduledReevaluation;
// }
public void init() {
switch (cfType) {
@ -252,6 +268,7 @@ public class CalculatedFieldCtx implements Closeable {
});
initialized = true;
}
case ENTITY_AGGREGATION -> initialized = true;
}
}

10
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java

@ -15,12 +15,18 @@
*/
package org.thingsboard.server.service.cf.ctx.state.aggregation.single;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class AggIntervalEntry {
public Long startTs;
public Long endTs;
private Long startTs;
private Long endTs;
public boolean belongsToInterval(long ts) {
return ts >= startTs && ts <= endTs;
}
}

26
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java

@ -0,0 +1,26 @@
package org.thingsboard.server.service.cf.ctx.state.aggregation.single;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Data
@NoArgsConstructor
public class AggIntervalEntryStatus {
@Setter
private long lastArgsRefreshTs = -1;
@Setter
private long lastMetricsEvalTs = -1;
public AggIntervalEntryStatus(long lastArgsRefreshTs) {
this.lastArgsRefreshTs = lastArgsRefreshTs;
}
public boolean shouldRecalculate(long checkInterval) {
boolean intervalPassed = lastMetricsEvalTs <= System.currentTimeMillis() - checkInterval;
boolean argsUpdatedDuringInterval = lastArgsRefreshTs > lastMetricsEvalTs;
return intervalPassed && argsUpdatedDuringInterval;
}
}

59
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java

@ -0,0 +1,59 @@
package org.thingsboard.server.service.cf.ctx.state.aggregation.single;
import lombok.Data;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import java.util.Map;
@Data
public class EntityAggregationArgumentEntry implements ArgumentEntry {
private Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals;
private boolean forceResetPrevious;
public EntityAggregationArgumentEntry(Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals) {
this.aggIntervals = aggIntervals;
}
@Override
public ArgumentEntryType getType() {
return ArgumentEntryType.ENTITY_AGGREGATION;
}
@Override
public Object getValue() {
return aggIntervals;
}
@Override
public boolean updateEntry(ArgumentEntry entry) {
if (entry instanceof EntityAggregationArgumentEntry entityAggEntry) {
aggIntervals.putAll(entityAggEntry.getAggIntervals());
} else if (entry instanceof SingleValueArgumentEntry singleValueArgEntry) {
long entryTs = singleValueArgEntry.getTs();
for (Map.Entry<AggIntervalEntry, AggIntervalEntryStatus> aggIntervalEntry : aggIntervals.entrySet()) {
if (aggIntervalEntry.getKey().belongsToInterval(entryTs)) {
aggIntervalEntry.getValue().setLastArgsRefreshTs(System.currentTimeMillis());
aggIntervals.put(aggIntervalEntry.getKey(), aggIntervalEntry.getValue());
return true;
}
}
}
return false;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public TbelCfArg toTbelCfArg() {
return null;
}
}

184
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java

@ -15,15 +15,18 @@
*/
package org.thingsboard.server.service.cf.ctx.state.aggregation.single;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Setter;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggKeyInput;
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
@ -35,20 +38,16 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.HashMap;
import java.util.Map;
import static java.util.concurrent.TimeUnit.SECONDS;
public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldState {
private Map<AggIntervalEntry, Boolean> aggIntervals = new HashMap<>();
@Setter
private long lastArgsRefreshTs = -1;
@Setter
private long lastMetricsEvalTs = -1;
private AggInterval interval;
private long intervalDuration;
private long watermarkDuration;
private long checkInterval;
private long deduplicationIntervalMs = -1;
private Map<String, AggMetric> metrics;
CalculatedFieldProcessingService cfProcessingService;
private CalculatedFieldProcessingService cfProcessingService;
public EntityAggregationCalculatedFieldState(EntityId entityId) {
super(entityId);
@ -59,7 +58,11 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
super.setCtx(ctx, actorCtx);
this.cfProcessingService = ctx.getCfProcessingService();
var configuration = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
deduplicationIntervalMs = SECONDS.toMillis(configuration.getDeduplicationIntervalInSec());
intervalDuration = configuration.getInterval().getIntervalDurationMillis();
watermarkDuration = configuration.getWatermark().getDuration();
checkInterval = configuration.getWatermark().getCheckInterval();
interval = configuration.getInterval();
metrics = configuration.getMetrics();
}
@Override
@ -69,44 +72,141 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(Map<String, ArgumentEntry> updatedArgs, CalculatedFieldCtx ctx) throws Exception {
long endTs = System.currentTimeMillis();
long startTs = endTs - 1000;
AggIntervalEntry interval = new AggIntervalEntry();
Map<String, ArgumentEntry> metrics = cfProcessingService.fetchArgumentValuesDuringInterval(entityId, interval, ctx);
Output output = ctx.getOutput();
lastMetricsEvalTs = System.currentTimeMillis();
ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx);
ObjectNode result = toResult(endTs, metrics);
if (result != null) {
return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder()
.type(output.getType())
.scope(output.getScope())
.result(result)
.build());
long now = System.currentTimeMillis();
AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs());
boolean exists = false;
for (Map.Entry<String, ArgumentEntry> entry : arguments.entrySet()) {
ArgumentEntry argumentEntry = entry.getValue();
EntityAggregationArgumentEntry entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry;
Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals = entityAggEntry.getAggIntervals();
exists |= aggIntervals.containsKey(aggIntervalEntry);
}
if (!exists) {
arguments.forEach((argName, argumentEntry) -> {
EntityAggregationArgumentEntry entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry;
entityAggEntry.getAggIntervals().put(aggIntervalEntry, new AggIntervalEntryStatus());
});
ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx);
}
return Futures.immediateFuture(TelemetryCalculatedFieldResult.EMPTY);
}
protected ObjectNode toResult(long endTs, Map<String, ArgumentEntry> metrics) {
ObjectNode metricsNode = JacksonUtil.newObjectNode();
for (Map.Entry<String, ArgumentEntry> entry : metrics.entrySet()) {
String metricName = entry.getKey();
Map<AggIntervalEntry, Map<String, ArgumentEntry>> results = new HashMap<>();
for (Map.Entry<String, ArgumentEntry> entry : arguments.entrySet()) {
String argName = entry.getKey();
ArgumentEntry argumentEntry = entry.getValue();
if (!argumentEntry.isEmpty()) {
metricsNode.put(metricName, JacksonUtil.toString(argumentEntry.getValue()));
EntityAggregationArgumentEntry entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry;
Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals = entityAggEntry.getAggIntervals();
for (Map.Entry<AggIntervalEntry, AggIntervalEntryStatus> aggInterval : aggIntervals.entrySet()) {
AggIntervalEntry intervalEntry = aggInterval.getKey();
AggIntervalEntryStatus entryStatus = aggInterval.getValue();
Long startTs = intervalEntry.getStartTs();
Long endTs = intervalEntry.getEndTs();
if (now - endTs > watermarkDuration) {
if (entryStatus.getLastArgsRefreshTs() > entryStatus.getLastMetricsEvalTs()) {
String metricName = null;
for (Map.Entry<String, AggMetric> metricEntry : metrics.entrySet()) {
if (((AggKeyInput) metricEntry.getValue().getInput()).getKey().equals(argName)) {
metricName = metricEntry.getKey();
}
}
ArgumentEntry metric = cfProcessingService.fetchMetricDuringInterval(entityId, intervalEntry, metricName, ctx);
if (!metric.isEmpty()) {
results.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, metric);
}
}
aggIntervals.remove(intervalEntry);
continue;
} else if (now - startTs >= intervalDuration) {
if (entryStatus.shouldRecalculate(checkInterval)) {
String metricName = null;
for (Map.Entry<String, AggMetric> metricEntry : metrics.entrySet()) {
if (((AggKeyInput) metricEntry.getValue().getInput()).getKey().equals(argName)) {
metricName = metricEntry.getKey();
}
}
ArgumentEntry metric = cfProcessingService.fetchMetricDuringInterval(entityId, intervalEntry, metricName, ctx);
if (!metric.isEmpty()) {
results.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, metric);
}
}
}
}
}
ObjectNode resultNode = JacksonUtil.newObjectNode();
if (!metricsNode.isEmpty()) {
resultNode.put("ts", endTs);
resultNode.set("values", metricsNode);
ArrayNode result = toResult(results);
if (result.isEmpty()) {
return Futures.immediateFuture(TelemetryCalculatedFieldResult.EMPTY);
}
return resultNode;
Output output = ctx.getOutput();
return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder()
.type(output.getType())
.scope(output.getScope())
.result(result)
.build());
// long now = System.currentTimeMillis();
// AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs(), false);
// if (!intervals.containsKey(aggIntervalEntry)) {
// intervals.put(aggIntervalEntry, new AggIntervalEntryStatus());
// ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx);
// }
// ArrayNode results = JacksonUtil.newArrayNode();
// for (Map.Entry<AggIntervalEntry, AggIntervalEntryStatus> entry : intervals.entrySet()) {
// AggIntervalEntry intervalEntry = entry.getKey();
// AggIntervalEntryStatus entryStatus = entry.getValue();
//
// Long startTs = intervalEntry.getStartTs();
// Long endTs = intervalEntry.getEndTs();
// if (now - endTs > watermarkDuration) {
// if (entryStatus.getLastArgsRefreshTs() > entryStatus.getLastMetricsEvalTs()) {
// ArgumentEntry metric = cfProcessingService.fetchMetricDuringInterval(entityId, intervalEntry, metricName, ctx);
// ObjectNode result = fetchMetrics(intervalEntry);
// if (result != null) {
// results.add(result);
// }
// }
// intervals.remove(intervalEntry);
// continue;
// } else if (now - startTs >= intervalDuration) {
// if (entryStatus.shouldRecalculate(checkInterval)) {
// ObjectNode result = fetchMetrics(intervalEntry);
// if (result != null) {
// results.add(result);
// }
// }
// }
// }
// if (results.isEmpty()) {
// return Futures.immediateFuture(TelemetryCalculatedFieldResult.EMPTY);
// }
// Output output = ctx.getOutput();
// return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder()
// .type(output.getType())
// .scope(output.getScope())
// .result(results)
// .build());
}
protected ArrayNode toResult(Map<AggIntervalEntry, Map<String, ArgumentEntry>> results) {
ArrayNode result = JacksonUtil.newArrayNode();
results.forEach((interval, args) -> {
ObjectNode metricsNode = JacksonUtil.newObjectNode();
for (Map.Entry<String, ArgumentEntry> entry : args.entrySet()) {
String metricName = entry.getKey();
ArgumentEntry argumentEntry = entry.getValue();
if (!argumentEntry.isEmpty()) {
metricsNode.put(metricName, JacksonUtil.toString(argumentEntry.getValue()));
}
}
ObjectNode resultNode = JacksonUtil.newObjectNode();
if (!metricsNode.isEmpty()) {
resultNode.put("ts", interval.getEndTs());
resultNode.set("values", metricsNode);
}
result.add(resultNode);
});
return result;
}
@Override
public boolean isReady() {

1
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java

@ -204,6 +204,7 @@ public class CalculatedFieldUtils {
case ALARM -> new AlarmCalculatedFieldState(id.entityId());
case PROPAGATION -> new PropagationCalculatedFieldState(id.entityId());
case RELATED_ENTITIES_AGGREGATION -> new RelatedEntitiesAggregationCalculatedFieldState(id.entityId());
case ENTITY_AGGREGATION -> null; // todo
};
if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) {

5
application/src/main/resources/thingsboard.yml

@ -529,9 +529,8 @@ actors:
configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}"
# Time in seconds to receive calculation result.
calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}"
alarms:
# Interval in seconds to re-evaluate Alarm rules that have a time schedule. 2 minutes by default.
reevaluation_interval: "${ACTORS_ALARMS_REEVALUATION_INTERVAL_SEC:120}"
# Interval in seconds to re-evaluate calculated fields that have a time schedule. 2 minutes by default.
check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:120}"
debug:
settings:

2
application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java

@ -86,7 +86,7 @@ import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@Slf4j
@DaoSqlTest
@TestPropertySource(properties = {
"actors.alarms.reevaluation_interval=1"
"actors.calculated_fields.check_interval=1"
})
public class AlarmRulesTest extends AbstractControllerTest {

7
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation;
import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@ -57,10 +56,4 @@ public class RelatedEntitiesAggregationCalculatedFieldConfiguration implements A
}
}
@JsonIgnore
@Override
public boolean requiresScheduledReevaluation() {
return true;
}
}

4
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java

@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentsBasedCalcula
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.Watermark;
import java.util.Map;
@ -36,8 +37,7 @@ public class EntityAggregationCalculatedFieldConfiguration implements ArgumentsB
private Map<String, AggMetric> metrics;
private AggInterval interval;
private long deduplicationIntervalInSec;
private long watermark;
private Watermark watermark;
private Output output;

9
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggInterval.java

@ -31,14 +31,19 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(value = WeekSunSatInterval.class, name = "WEEK_SUN_SAT"),
@JsonSubTypes.Type(value = MonthInterval.class, name = "MONTH"),
@JsonSubTypes.Type(value = YearInterval.class, name = "YEAR"),
@JsonSubTypes.Type(value = CustomInterval.class, name = "CUSTOM"),
@JsonSubTypes.Type(value = SpecificTimeInterval.class, name = "SPECIFIC_TIME"),
@JsonSubTypes.Type(value = CustomInterval.class, name = "CUSTOM")
})
@JsonIgnoreProperties(ignoreUnknown = true)
public interface AggInterval {
AggIntervalType getType();
long getIntervalDurationMillis();
long getCurrentIntervalStartTs();
long getCurrentIntervalEndTs();
long getDelayUntilIntervalEnd();
}

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/AggIntervalType.java

@ -23,7 +23,6 @@ public enum AggIntervalType {
WEEK_SUN_SAT,
MONTH,
YEAR,
CUSTOM,
SPECIFIC_TIME
CUSTOM
}

122
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java

@ -15,40 +15,124 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAdjusters;
@Data
public abstract class BaseAggInterval implements AggInterval {
protected long offsetMillis; // delay millis since start of interval
@Override
public long getIntervalDurationMillis() {
return getCurrentIntervalEndTs() - getCurrentIntervalStartTs();
}
@Override
public long getCurrentIntervalStartTs() {
return getCurrentIntervalStartTs(getType(), 1);
}
protected long getCurrentIntervalStartTs(AggIntervalType type, int multiplier) {
return getAlignedBoundary(type, multiplier, false).toInstant().toEpochMilli();
}
@Override
public long getCurrentIntervalEndTs() {
return getCurrentIntervalEndTs(getType(), 1);
}
protected long getCurrentIntervalEndTs(AggIntervalType type, int multiplier) {
return getAlignedBoundary(type, multiplier, true).toInstant().toEpochMilli();
}
@Override
public long getDelayUntilIntervalEnd() {
return getDelayUntilIntervalEnd(getType(), 1);
}
protected long getDelayUntilIntervalEnd(AggIntervalType type, long multiplier) {
protected long getDelayUntilIntervalEnd(AggIntervalType type, int multiplier) {
ZonedDateTime now = ZonedDateTime.now();
ZonedDateTime currentStart = getAlignedBoundary(type, multiplier, false);
ZonedDateTime nextStart = getAlignedBoundary(type, multiplier, true);
long periodMillis = Duration.between(currentStart, nextStart).toMillis();
// Apply offset: this shifts the grid
long off = offsetMillis % periodMillis;
if (off < 0) off += periodMillis;
// Compute the offset-aligned start times
ZonedDateTime offsetCurrentStart = currentStart.plus(Duration.ofMillis(off));
ZonedDateTime offsetNextStart = offsetCurrentStart.plus(Duration.ofMillis(periodMillis));
// If we are already past the current offset start, move to the next
ZonedDateTime target = offsetCurrentStart.isAfter(now) ? offsetCurrentStart : offsetNextStart;
// todo fix
return Math.max(Duration.between(now, target).toMillis(), 0);
}
protected ZonedDateTime getAlignedBoundary(AggIntervalType type, int multiplier, boolean next) {
ZonedDateTime now = ZonedDateTime.now();
ZonedDateTime next;
switch (getType()) {
case HOUR -> next = now.plusHours(multiplier).truncatedTo(ChronoUnit.HOURS);
case DAY -> next = now.plusDays(multiplier).truncatedTo(ChronoUnit.DAYS);
case WEEK -> next = now.plusWeeks(multiplier).with(DayOfWeek.MONDAY).truncatedTo(ChronoUnit.DAYS);
case WEEK_SUN_SAT -> next = now.plusWeeks(multiplier).with(DayOfWeek.SUNDAY).truncatedTo(ChronoUnit.DAYS);
case MONTH -> next = now.plusMonths(multiplier).withDayOfMonth(1).truncatedTo(ChronoUnit.DAYS);
case YEAR -> next = now.plusYears(multiplier).withDayOfYear(1).truncatedTo(ChronoUnit.DAYS);
default -> throw new IllegalArgumentException("Unsupported type: " + getType());
}
long delayMillis = Duration.between(now, next).toMillis();
if (offsetMillis > 0) {
delayMillis += offsetMillis;
}
return Math.max(delayMillis, 0);
return switch (type) {
case HOUR -> alignByHour(now, multiplier, next);
case DAY -> alignByDay(now, multiplier, next);
case WEEK -> alignByWeek(now, multiplier, DayOfWeek.MONDAY, next);
case WEEK_SUN_SAT -> alignByWeek(now, multiplier, DayOfWeek.SUNDAY, next);
case MONTH -> alignByMonth(now, multiplier, next);
case YEAR -> alignByYear(now, multiplier, next);
default -> throw new IllegalArgumentException("Unsupported type: " + type);
};
}
private ZonedDateTime alignByHour(ZonedDateTime now, int multiplier, boolean next) {
ZonedDateTime startOfDay = now.truncatedTo(ChronoUnit.DAYS);
long hoursSinceMidnight = Duration.between(startOfDay, now).toHours();
long aligned = (hoursSinceMidnight / multiplier) * multiplier;
if (next) aligned += multiplier;
return startOfDay.plusHours(aligned);
}
private ZonedDateTime alignByDay(ZonedDateTime now, int multiplier, boolean next) {
long daysSinceEpoch = now.toLocalDate().toEpochDay();
long aligned = (daysSinceEpoch / multiplier) * multiplier;
if (next) aligned += multiplier;
long diff = aligned - daysSinceEpoch;
return now.truncatedTo(ChronoUnit.DAYS).plusDays(diff);
}
private ZonedDateTime alignByWeek(ZonedDateTime now, int multiplier, DayOfWeek startOfWeekDay, boolean next) {
ZonedDateTime startOfWeek = now.with(TemporalAdjusters.previousOrSame(startOfWeekDay))
.truncatedTo(ChronoUnit.DAYS);
long weeksSinceEpoch = ChronoUnit.WEEKS.between(
ZonedDateTime.ofInstant(Instant.EPOCH, now.getZone()), startOfWeek);
long aligned = (weeksSinceEpoch / multiplier) * multiplier;
if (next) aligned += multiplier;
return startOfWeek.plusWeeks(aligned - weeksSinceEpoch);
}
private ZonedDateTime alignByMonth(ZonedDateTime now, int multiplier, boolean next) {
ZonedDateTime startOfMonth = now.withDayOfMonth(1).truncatedTo(ChronoUnit.DAYS);
long monthsSinceEpoch = now.getYear() * 12L + now.getMonthValue() - 1;
long aligned = (monthsSinceEpoch / multiplier) * multiplier;
if (next) aligned += multiplier;
return startOfMonth.plusMonths(aligned - monthsSinceEpoch);
}
private ZonedDateTime alignByYear(ZonedDateTime now, int multiplier, boolean next) {
int year = now.getYear();
int aligned = (year / multiplier) * multiplier;
if (next) aligned += multiplier;
return ZonedDateTime.of(LocalDate.of(aligned, 1, 1), LocalTime.MIDNIGHT, now.getZone());
}
}

15
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/CustomInterval.java

@ -25,6 +25,21 @@ public class CustomInterval extends BaseAggInterval {
return AggIntervalType.CUSTOM;
}
@Override
public long getIntervalDurationMillis() {
return getCurrentIntervalEndTs() - getCurrentIntervalStartTs();
}
@Override
public long getCurrentIntervalStartTs() {
return super.getCurrentIntervalStartTs(internalIntervalType, multiplier);
}
@Override
public long getCurrentIntervalEndTs() {
return super.getCurrentIntervalEndTs(internalIntervalType, multiplier);
}
@Override
public long getDelayUntilIntervalEnd() {
return super.getDelayUntilIntervalEnd(internalIntervalType, multiplier);

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/DayInterval.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class DayInterval extends BaseAggInterval {
@Override

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/HourInterval.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class HourInterval extends BaseAggInterval {
@Override

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/MonthInterval.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class MonthInterval extends BaseAggInterval {
@Override

42
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/SpecificTimeInterval.java

@ -1,42 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import java.time.LocalTime;
public class SpecificTimeInterval implements AggInterval {
public Long startMillis; // start millis since start of day
public Long endMillis; // end millis since start of day
@Override
public AggIntervalType getType() {
return AggIntervalType.SPECIFIC_TIME;
}
@Override
public long getDelayUntilIntervalEnd() {
long nowMillis = LocalTime.now().toNanoOfDay() / 1_000_000L;
long delayMillis;
if (nowMillis < endMillis) {
delayMillis = endMillis - nowMillis; // later today
} else {
delayMillis = (24 * 60 * 60 * 1000L - nowMillis) + endMillis; // next day
}
return delayMillis;
}
}

11
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java

@ -0,0 +1,11 @@
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class Watermark {
private long duration;
private long checkInterval;
}

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekInterval.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class WeekInterval extends BaseAggInterval {
@Override

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/WeekSunSatInterval.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class WeekSunSatInterval extends BaseAggInterval {
@Override

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/YearInterval.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import lombok.Data;
@Data
public class YearInterval extends BaseAggInterval {
@Override

Loading…
Cancel
Save