Browse Source

changed scheduling implementation

pull/14253/head
IrynaMatveieva 7 months ago
parent
commit
31dc4ec8d7
  1. 2
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 10
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  3. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java
  4. 28
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  5. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java
  6. 5
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java
  7. 58
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java
  8. 2
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java
  9. 4
      application/src/main/resources/thingsboard.yml
  10. 55
      application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java

2
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -128,7 +128,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
relatedEntitiesAggState.scheduleReevaluation();
}
if (state instanceof EntityAggregationCalculatedFieldState entityAggState) {
entityAggState.scheduleReevaluation();
entityAggState.fillMissingIntervals();
}
states.put(cfId, state);
} else {

10
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -23,7 +23,6 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
@ -88,9 +87,6 @@ public abstract class AbstractCalculatedFieldProcessingService {
protected ListeningExecutorService calculatedFieldCallbackExecutor;
@Value("${actors.calculated_fields.max_datapoints_limit}")
private int aggArgumentMaxDatapointsLimit;
@PostConstruct
public void init() {
calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
@ -311,10 +307,10 @@ public abstract class AbstractCalculatedFieldProcessingService {
}
private ListenableFuture<ArgumentEntry> fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) {
long startInterval = interval.getCurrentIntervalStartTs();
long intervalStartTs = interval.getCurrentIntervalStartTs();
long intervalEndTs = interval.getCurrentIntervalEndTs();
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startInterval, queryEndTs, 0, aggArgumentMaxDatapointsLimit, Aggregation.NONE);
return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, startInterval, intervalEndTs));
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), intervalStartTs, queryEndTs, 0, 1, Aggregation.NONE);
return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, intervalStartTs, intervalEndTs));
}
private ListenableFuture<ArgumentEntry> fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) {

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java

@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry;
@ -39,7 +40,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING"),
@JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING"),
@JsonSubTypes.Type(value = PropagationArgumentEntry.class, name = "PROPAGATION"),
@JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "RELATED_ENTITIES")
@JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "RELATED_ENTITIES"),
@JsonSubTypes.Type(value = EntityAggregationArgumentEntry.class, name = "ENTITY_AGGREGATION")
})
public interface ArgumentEntry {

28
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -45,6 +45,8 @@ import org.thingsboard.server.common.data.cf.configuration.ScheduledUpdateSuppor
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunctionInput;
import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.Watermark;
import org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
@ -95,6 +97,8 @@ public class CalculatedFieldCtx implements Closeable {
private boolean useLatestTs;
private boolean requiresScheduledReevaluation;
private long lastReevaluationTs;
private ActorSystemContext systemContext;
private TbelInvokeService tbelInvokeService;
private RelationService relationService;
@ -192,7 +196,6 @@ public class CalculatedFieldCtx implements Closeable {
if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) {
this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L;
}
this.requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation();
if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) {
this.useLatestTs = aggConfig.isUseLatestTs();
}
@ -207,6 +210,29 @@ public class CalculatedFieldCtx implements Closeable {
this.maxSingleValueArgumentSize = systemContext.getApiLimitService().getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxSingleValueArgumentSizeInKBytes) * 1024;
}
public boolean isRequiresScheduledReevaluation() {
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) {
long now = System.currentTimeMillis();
long cfCheckIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval());
Watermark watermark = entityAggregationConfig.getWatermark();
if (watermark != null) {
long checkIntervalMillis = TimeUnit.SECONDS.toMillis(watermark.getCheckInterval());
if (cfCheckIntervalMillis == checkIntervalMillis) {
return true;
}
if (now + cfCheckIntervalMillis >= lastReevaluationTs + checkIntervalMillis) {
lastReevaluationTs = now;
return true;
}
}
long intervalEndTsMillis = entityAggregationConfig.getInterval().getCurrentIntervalEndTs();
if (now + cfCheckIntervalMillis >= intervalEndTsMillis) {
return true;
}
}
return calculatedField.getConfiguration().requiresScheduledReevaluation();
}
public void init() {
switch (cfType) {
case SCRIPT -> {

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntryStatus.java

@ -38,8 +38,8 @@ public class AggIntervalEntryStatus {
boolean intervalPassed = lastMetricsEvalTs <= System.currentTimeMillis() - checkInterval;
boolean argsUpdatedDuringInterval = lastArgsRefreshTs > -1;
if (intervalPassed && argsUpdatedDuringInterval) {
lastMetricsEvalTs = System.currentTimeMillis();
lastArgsRefreshTs = -1;
setLastMetricsEvalTs(System.currentTimeMillis());
setLastArgsRefreshTs(-1);
return true;
}
return false;

5
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java

@ -50,9 +50,10 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry {
aggIntervals.putAll(entityAggEntry.getAggIntervals());
} else if (entry instanceof SingleValueArgumentEntry singleValueArgEntry) {
long entryTs = singleValueArgEntry.getTs();
long argUpdateTs = System.currentTimeMillis();
for (Map.Entry<AggIntervalEntry, AggIntervalEntryStatus> aggIntervalEntry : aggIntervals.entrySet()) {
if (aggIntervalEntry.getKey().belongsToInterval(entryTs)) {
aggIntervalEntry.getValue().setLastArgsRefreshTs(System.currentTimeMillis());
aggIntervalEntry.getValue().setLastArgsRefreshTs(argUpdateTs);
return true;
}
}
@ -62,7 +63,7 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry {
@Override
public boolean isEmpty() {
return true;
return aggIntervals.isEmpty();
}
@Override

58
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java

@ -53,30 +53,16 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
private long checkInterval;
private Map<String, AggMetric> metrics;
private final Map<AggIntervalEntry, Map<String, AggIntervalEntryStatus>> intervals = new HashMap<>();
private CalculatedFieldProcessingService cfProcessingService;
public EntityAggregationCalculatedFieldState(EntityId entityId) {
super(entityId);
}
public void scheduleReevaluation() {
prepareIntervals();
fillMissingIntervals(interval.getCurrentIntervalEndTs(), intervalDuration);
long now = System.currentTimeMillis();
intervals.forEach((intervalEntry, argumentIntervalStatuses) -> {
if (intervalEntry.belongsToInterval(now)) {
ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx);
} else {
if (intervalEntry.getEndTs() <= now) {
ctx.scheduleReevaluation(checkInterval, actorCtx);
}
}
});
}
private void fillMissingIntervals(long currentIntervalEndTs, long intervalDuration) {
public void fillMissingIntervals() {
long currentIntervalEndTs = interval.getCurrentIntervalEndTs();
long intervalDuration = interval.getIntervalDurationMillis();
Map<AggIntervalEntry, Map<String, AggIntervalEntryStatus>> intervals = getIntervals();
AggIntervalEntry lastIntervalEntry = intervals.keySet().stream().max(Comparator.comparing(AggIntervalEntry::getEndTs)).orElse(null);
if (lastIntervalEntry == null) {
return;
@ -91,8 +77,7 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
arguments.forEach((argName, argumentEntry) -> {
var entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry;
AggIntervalEntryStatus intervalEntryStatus = new AggIntervalEntryStatus(System.currentTimeMillis());
entityAggEntry.getAggIntervals().put(missingAggIntervalEntry, intervalEntryStatus);
intervals.computeIfAbsent(missingAggIntervalEntry, i -> new HashMap<>()).put(argName, intervalEntryStatus);
entityAggEntry.getAggIntervals().computeIfAbsent(missingAggIntervalEntry, missingInterval -> intervalEntryStatus);
});
nextStartTs = nextEndTs;
@ -100,6 +85,17 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
}
}
private Map<AggIntervalEntry, Map<String, AggIntervalEntryStatus>> getIntervals() {
Map<AggIntervalEntry, Map<String, AggIntervalEntryStatus>> intervals = new HashMap<>();
arguments.forEach((argName, entry) -> {
var argEntry = (EntityAggregationArgumentEntry) entry;
argEntry.getAggIntervals().forEach((intervalEntry, status) ->
intervals.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, status)
);
});
return intervals;
}
@Override
public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) {
super.setCtx(ctx, actorCtx);
@ -121,11 +117,11 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(Map<String, ArgumentEntry> updatedArgs, CalculatedFieldCtx ctx) throws Exception {
createIntervalIfNotExist();
prepareIntervals();
long now = System.currentTimeMillis();
Map<AggIntervalEntry, Map<String, ArgumentEntry>> results = new HashMap<>();
List<AggIntervalEntry> expiredIntervals = new ArrayList<>();
Map<AggIntervalEntry, Map<String, AggIntervalEntryStatus>> intervals = getIntervals();
intervals.forEach((intervalEntry, argIntervalStatuses) -> {
processInterval(now, intervalEntry, argIntervalStatuses, expiredIntervals, results);
});
@ -143,37 +139,22 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
.build());
}
private void prepareIntervals() {
arguments.forEach((argName, entry) -> {
var argEntry = (EntityAggregationArgumentEntry) entry;
argEntry.getAggIntervals().forEach((intervalEntry, status) ->
intervals.computeIfAbsent(intervalEntry, i -> new HashMap<>()).put(argName, status)
);
});
}
private void removeExpiredIntervals(List<AggIntervalEntry> expiredIntervals) {
expiredIntervals.forEach(expiredInterval -> {
arguments.values().stream()
.map(EntityAggregationArgumentEntry.class::cast)
.forEach(arg -> arg.getAggIntervals().remove(expiredInterval));
intervals.remove(expiredInterval);
});
}
private void createIntervalIfNotExist() {
AggIntervalEntry currentInterval = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs());
if (intervals.containsKey(currentInterval)) {
return;
}
arguments.forEach((argName, argumentEntry) -> {
var entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry;
if (!entityAggEntry.getAggIntervals().containsKey(currentInterval)) {
entityAggEntry.getAggIntervals().put(currentInterval, new AggIntervalEntryStatus());
intervals.computeIfAbsent(currentInterval, i -> new HashMap<>()).put(argName, new AggIntervalEntryStatus());
entityAggEntry.getAggIntervals().computeIfAbsent(currentInterval, current -> new AggIntervalEntryStatus());
}
});
ctx.scheduleReevaluation(interval.getDelayUntilIntervalEnd(), actorCtx);
}
private void processInterval(long now,
@ -208,9 +189,6 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
args.forEach((argName, argEntryIntervalStatus) -> {
if (argEntryIntervalStatus.shouldRecalculate(checkInterval)) {
processMetric(intervalEntry, argName, false, results);
ctx.scheduleReevaluation(checkInterval, actorCtx);
} else if (argEntryIntervalStatus.intervalPassed(checkInterval)) {
processMetric(intervalEntry, argName, true, results);
}
});
}

2
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java

@ -80,7 +80,7 @@ public class CalculatedFieldArgumentUtils {
if (defaultValue != null) {
ArgumentEntry.createSingleValueArgument(new DoubleDataEntry(argKey, defaultValue.doubleValue()));
}
return ArgumentEntry.createSingleValueArgument(new StringDataEntry(argKey, null));
return new SingleValueArgumentEntry();
}
public static ArgumentEntry transformAggregationArgument(List<TsKvEntry> timeSeries, long startIntervalTs, long endIntervalTs) {

4
application/src/main/resources/thingsboard.yml

@ -530,9 +530,7 @@ actors:
# Time in seconds to receive calculation result.
calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}"
# Interval in seconds to re-evaluate calculated fields that have a time schedule. 2 minutes by default.
check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:120}"
# Maximum allowed datapoints fetched by aggregation calculated fields
max_datapoints_limit: "${CF_AGG_MAX_DATAPOINTS_LIMIT:50000}"
check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:30}"
debug:
settings:

55
application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java

@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
@ -53,6 +54,9 @@ import static org.thingsboard.server.cf.CalculatedFieldIntegrationTest.POLL_INTE
@DaoSqlTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@TestPropertySource(properties = {
"actors.calculated_fields.check_interval=1"
})
public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest {
private Tenant savedTenant;
@ -91,7 +95,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
public void testCreateCf_checkAggregation() throws Exception {
Device device = createDevice("Device", "1234567890111");
CustomInterval customInterval = new CustomInterval(60L, 0L, "Europe/Kyiv");
CustomInterval customInterval = new CustomInterval(30L, 0L, "Europe/Kyiv");
long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs();
long currentIntervalEndTs = customInterval.getCurrentIntervalEndTs();
@ -105,7 +109,8 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsInInterval_3));
long interval = customInterval.getIntervalDurationMillis();
CalculatedField totalConsumptionCF = createTotalConsumptionCF(device.getId(), customInterval);
Watermark watermark = new Watermark(60, 10);
CalculatedField totalConsumptionCF = createTotalConsumptionCF(device.getId(), customInterval, watermark);
await().alias("create CF and perform aggregation after interval end")
.atMost(2 * interval, TimeUnit.MILLISECONDS)
@ -117,7 +122,49 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
});
}
private CalculatedField createTotalConsumptionCF(EntityId entityId, AggInterval aggInterval) {
@Test
public void testCreateCf_checkAggregationDuringWatermark() throws Exception {
Device device = createDevice("Device", "1234567890111");
CustomInterval customInterval = new CustomInterval(30L, 0L, "Europe/Kyiv");
long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs();
long currentIntervalEndTs = customInterval.getCurrentIntervalEndTs();
long tsBeforeInterval = currentIntervalStartTs - 1000L;
long tsInInterval_1 = currentIntervalStartTs + 1000L;
long tsInInterval_2 = currentIntervalStartTs + 500L;
long tsInInterval_3 = currentIntervalStartTs + 200L;
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsBeforeInterval));
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":100}}", tsInInterval_1));
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":180}}", tsInInterval_2));
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsInInterval_3));
long interval = customInterval.getIntervalDurationMillis();
Watermark watermark = new Watermark(60, 10);
CalculatedField totalConsumptionCF = createTotalConsumptionCF(device.getId(), customInterval, watermark);
await().alias("create CF and perform aggregation after interval end")
.atMost(2 * interval, TimeUnit.MILLISECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode result = getLatestTelemetry(device.getId(), "consumptionPerMin");
assertThat(result).isNotNull();
assertThat(result.get("consumptionPerMin").get(0).get("value").asText()).isEqualTo("400");
});
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":300}}", tsInInterval_1));
await().alias("create CF and perform aggregation after interval end")
.atMost(2 * watermark.getCheckInterval(), TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode result = getLatestTelemetry(device.getId(), "consumptionPerMin");
assertThat(result).isNotNull();
assertThat(result.get("consumptionPerMin").get(0).get("value").asText()).isEqualTo("600");
});
}
private CalculatedField createTotalConsumptionCF(EntityId entityId, AggInterval aggInterval, Watermark watermark) {
Map<String, Argument> arguments = new HashMap<>();
Argument argument = new Argument();
argument.setRefEntityKey(new ReferencedEntityKey("energy", ArgumentType.TS_LATEST, null));
@ -137,7 +184,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
return createAggCf("Consumption per minute", entityId,
aggInterval,
new Watermark(TimeUnit.MINUTES.toMillis(1), TimeUnit.SECONDS.toMillis(10)),
watermark,
arguments,
aggMetrics,
output);

Loading…
Cancel
Save