Browse Source

handle future telemetry and fill intervals after restart based on watermark

pull/14681/head
IrynaMatveieva 5 months ago
parent
commit
a35bbcc1de
  1. 6
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  2. 74
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java
  3. 18
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java
  4. 4
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java
  5. 53
      application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java

6
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -233,7 +233,7 @@ public abstract class AbstractCalculatedFieldProcessingService {
return config.getArguments().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> fetchTimeSeries(ctx.getTenantId(), entityId, entry.getValue(), config.getInterval(), ts)
entry -> fetchTimeSeries(ctx, entityId, entry.getValue(), config.getInterval(), ts)
));
}
@ -341,11 +341,11 @@ public abstract class AbstractCalculatedFieldProcessingService {
return resolveArgumentValue(argKey, argumentEntryFut);
}
private ListenableFuture<ArgumentEntry> fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) {
private ListenableFuture<ArgumentEntry> fetchTimeSeries(CalculatedFieldCtx ctx, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) {
long intervalStartTs = interval.getCurrentIntervalStartTs();
long intervalEndTs = interval.getCurrentIntervalEndTs();
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), intervalStartTs, queryEndTs, 0, 1, Aggregation.NONE);
return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, intervalStartTs, intervalEndTs));
return fetchTimeSeriesInternal(ctx.getTenantId(), entityId, query, timeSeries -> transformAggregationArgument(timeSeries, intervalStartTs, intervalEndTs, ctx));
}
private ListenableFuture<ArgumentEntry> fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) {

74
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java

@ -19,11 +19,18 @@ import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.EntityAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.AggInterval;
import org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval.Watermark;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Data
public class EntityAggregationArgumentEntry implements ArgumentEntry {
@ -32,10 +39,18 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry {
private boolean forceResetPrevious;
private AggInterval interval;
private long watermarkDuration;
public EntityAggregationArgumentEntry(Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals) {
this.aggIntervals = aggIntervals;
}
public EntityAggregationArgumentEntry(Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals, CalculatedFieldCtx ctx) {
this(aggIntervals);
setCtx(ctx);
}
@Override
public ArgumentEntryType getType() {
return ArgumentEntryType.ENTITY_AGGREGATION;
@ -46,29 +61,64 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry {
return aggIntervals;
}
public void setCtx(CalculatedFieldCtx ctx) {
var configuration = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
interval = configuration.getInterval();
Watermark watermark = configuration.getWatermark();
watermarkDuration = watermark == null ? 0 : TimeUnit.SECONDS.toMillis(watermark.getDuration());
}
@Override
public boolean updateEntry(ArgumentEntry entry) {
boolean updated = false;
if (entry instanceof EntityAggregationArgumentEntry entityAggEntry) {
aggIntervals.putAll(entityAggEntry.getAggIntervals());
return true;
} else if (entry instanceof SingleValueArgumentEntry singleValueArgEntry) {
long entryTs = singleValueArgEntry.getTs();
long argUpdateTs = System.currentTimeMillis();
for (Map.Entry<AggIntervalEntry, AggIntervalEntryStatus> aggIntervalEntry : aggIntervals.entrySet()) {
if (singleValueArgEntry.isForceResetPrevious()) {
aggIntervalEntry.getValue().setLastArgsRefreshTs(argUpdateTs);
updated = true;
continue;
}
if (aggIntervalEntry.getKey().belongsToInterval(entryTs)) {
aggIntervalEntry.getValue().setLastArgsRefreshTs(argUpdateTs);
return true;
}
long now = System.currentTimeMillis();
if (updateExistingIntervals(singleValueArgEntry, entryTs, now)) {
return true;
}
return createNewInterval(entryTs, now);
}
return false;
}
private boolean updateExistingIntervals(SingleValueArgumentEntry entry, long entryTs, long now) {
boolean updated = false;
for (Map.Entry<AggIntervalEntry, AggIntervalEntryStatus> aggIntervalEntry : aggIntervals.entrySet()) {
AggIntervalEntry interval = aggIntervalEntry.getKey();
AggIntervalEntryStatus status = aggIntervalEntry.getValue();
if (entry.isForceResetPrevious()) {
status.setLastArgsRefreshTs(now);
updated = true;
continue;
}
if (interval.belongsToInterval(entryTs)) {
status.setLastArgsRefreshTs(now);
return true;
}
}
return updated;
}
private boolean createNewInterval(long entryTs, long now) {
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(entryTs), interval.getZoneId());
long startTs = interval.getDateTimeIntervalStartTs(zdt);
long endTs = interval.getDateTimeIntervalEndTs(zdt);
if (now - endTs > watermarkDuration) {
return false;
}
AggIntervalEntry newInterval = new AggIntervalEntry(startTs, endTs);
aggIntervals.computeIfAbsent(newInterval, i -> new AggIntervalEntryStatus(now));
return true;
}
@Override
public boolean isEmpty() {
return aggIntervals.isEmpty();

18
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java

@ -82,6 +82,15 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
interval = configuration.getInterval();
metrics = configuration.getMetrics();
produceIntermediateResult = configuration.isProduceIntermediateResult();
setCtxToArguments();
}
private void setCtxToArguments() {
arguments.values().forEach(argument -> {
if (argument instanceof EntityAggregationArgumentEntry entityAggArgument) {
entityAggArgument.setCtx(ctx);
}
});
}
@Override
@ -154,8 +163,10 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
}
private void fillMissingIntervals() {
long now = System.currentTimeMillis();
ZoneId zoneId = interval.getZoneId();
long currentIntervalEndTs = interval.getCurrentIntervalEndTs();
long watermarkThresholdTs = now - watermarkDuration;
Map<AggIntervalEntry, Map<String, AggIntervalEntryStatus>> intervals = getIntervals();
AggIntervalEntry lastIntervalEntry = intervals.keySet().stream().max(Comparator.comparing(AggIntervalEntry::getEndTs)).orElse(null);
@ -169,6 +180,13 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
while (nextEnd.toInstant().toEpochMilli() <= currentIntervalEndTs) {
long nextStartTs = nextStart.toInstant().toEpochMilli();
long nextEndTs = nextEnd.toInstant().toEpochMilli();
if (nextEndTs < watermarkThresholdTs) {
nextStart = nextEnd;
nextEnd = interval.getNextIntervalStart(nextStart);
continue;
}
AggIntervalEntry missing = new AggIntervalEntry(nextStartTs, nextEndTs);
arguments.forEach((argName, argumentEntry) -> {

4
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java

@ -75,7 +75,7 @@ public class CalculatedFieldArgumentUtils {
return new SingleValueArgumentEntry();
}
public static ArgumentEntry transformAggregationArgument(List<TsKvEntry> timeSeries, long startIntervalTs, long endIntervalTs) {
public static ArgumentEntry transformAggregationArgument(List<TsKvEntry> timeSeries, long startIntervalTs, long endIntervalTs, CalculatedFieldCtx ctx) {
Map<AggIntervalEntry, AggIntervalEntryStatus> aggIntervals = new HashMap<>();
AggIntervalEntry aggIntervalEntry = new AggIntervalEntry(startIntervalTs, endIntervalTs);
if (timeSeries == null || timeSeries.isEmpty()) {
@ -83,7 +83,7 @@ public class CalculatedFieldArgumentUtils {
} else {
aggIntervals.put(aggIntervalEntry, new AggIntervalEntryStatus(System.currentTimeMillis()));
}
return new EntityAggregationArgumentEntry(aggIntervals);
return new EntityAggregationArgumentEntry(aggIntervals, ctx);
}
private static KvEntry createDefaultKvEntry(Argument argument) {

53
application/src/test/java/org/thingsboard/server/cf/EntityAggregationCalculatedFieldTest.java

@ -55,6 +55,8 @@ import static org.thingsboard.server.cf.CalculatedFieldIntegrationTest.POLL_INTE
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest {
private final String TZ = "Europe/Kyiv";
private Tenant savedTenant;
@Before
@ -93,7 +95,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
public void testCreateCfAndNoTelemetryDuringInterval_checkAggregation() throws Exception {
Device device = createDevice("Device", "1234567890111");
CustomInterval customInterval = new CustomInterval("Europe/Kyiv", 0L, 5L);
CustomInterval customInterval = new CustomInterval(TZ, 0L, 5L);
createConsumptionCF(device.getId(), customInterval, null);
long interval = customInterval.getCurrentIntervalDurationMillis();
@ -113,7 +115,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
public void testCreateCfWithoutWatermark_checkAggregation() throws Exception {
Device device = createDevice("Device", "1234567890111");
CustomInterval customInterval = new CustomInterval("Europe/Kyiv", 0L, 5L);
CustomInterval customInterval = new CustomInterval(TZ, 0L, 5L);
createConsumptionCF(device.getId(), customInterval, null);
long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs();
@ -156,7 +158,7 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
public void testCreateCfWithWatermark_checkAggregationDuringWatermark() throws Exception {
Device device = createDevice("Device", "1234567890111");
CustomInterval customInterval = new CustomInterval("Europe/Kyiv", 0L, 5L);
CustomInterval customInterval = new CustomInterval(TZ, 0L, 5L);
Watermark watermark = new Watermark(10);
createConsumptionCF(device.getId(), customInterval, watermark);
@ -196,6 +198,51 @@ public class EntityAggregationCalculatedFieldTest extends AbstractControllerTest
});
}
@Test
public void testSendFutureTelemetry_checkAggregation() throws Exception {
Device device = createDevice("Device", "1234567890111");
CustomInterval customInterval = new CustomInterval(TZ, 0L, 2L);
createConsumptionCF(device.getId(), customInterval, null);
long currentIntervalStartTs = customInterval.getCurrentIntervalStartTs();
long tsBeforeInterval = currentIntervalStartTs - 1000;
long tsInInterval_1 = currentIntervalStartTs + 1000;
long tsInInterval_2 = currentIntervalStartTs + 500;
long tsInInterval_3 = currentIntervalStartTs + 200;
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsBeforeInterval));
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":100}}", tsInInterval_1));
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":180}}", tsInInterval_2));
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":120}}", tsInInterval_3));
long interval = customInterval.getCurrentIntervalDurationMillis();
await().alias("create CF -> perform aggregation after interval end")
.atMost(2 * interval, TimeUnit.MILLISECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode result = getLatestTelemetry(device.getId(), "consumption", "avgConsumption");
assertThat(result).isNotNull();
assertThat(result.get("consumption").get(0).get("value").asText()).isEqualTo("400");
assertThat(result.get("avgConsumption").get(0).get("value").asText()).isEqualTo("133");
});
postTelemetry(device.getId(), String.format("{\"ts\": \"%s\", \"values\": {\"energy\":500}}", currentIntervalStartTs + 4500L));
await().alias("update telemetry that belongs to future interval -> check aggregation ")
.atMost(3 * interval, TimeUnit.MILLISECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode result = getLatestTelemetry(device.getId(), "consumption", "avgConsumption");
assertThat(result).isNotNull();
assertThat(result.get("consumption").get(0).get("value").asText()).isEqualTo("500");
assertThat(result.get("consumption").get(0).get("ts").asLong()).isEqualTo(currentIntervalStartTs + 4000L);
assertThat(result.get("avgConsumption").get(0).get("value").asText()).isEqualTo("500");
assertThat(result.get("avgConsumption").get(0).get("ts").asLong()).isEqualTo(currentIntervalStartTs + 4000L);
});
}
private CalculatedField createConsumptionCF(EntityId entityId, AggInterval aggInterval, Watermark watermark) {
Map<String, Argument> arguments = new HashMap<>();
Argument argument = new Argument();

Loading…
Cancel
Save