From 4e2b4fc9216b554343704fa5b4ddb9d5113b5710 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 8 Dec 2025 10:08:18 +0200 Subject: [PATCH] added getLatestTs method and added tests --- ...faultCalculatedFieldProcessingService.java | 5 +- .../service/cf/ctx/state/ArgumentEntry.java | 2 + .../ctx/state/BaseCalculatedFieldState.java | 22 ++--- .../ctx/state/SimpleCalculatedFieldState.java | 5 +- .../ctx/state/SingleValueArgumentEntry.java | 17 +++- .../cf/ctx/state/TsRollingArgumentEntry.java | 8 ++ .../cf/CalculatedFieldIntegrationTest.java | 98 ++++++++++++++++--- .../script/api/tbel/TbelCfCtx.java | 2 +- 8 files changed, 124 insertions(+), 35 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 9ff185bb52..b6e1193cfb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -74,7 +74,6 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.SCOPE; -import static org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry.DEFAULT_TS; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultTsKvEntry; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createStateByType; @@ -239,11 +238,11 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument); case ATTRIBUTE -> Futures.transform( attributesService.find(tenantId, entityId, argument.getRefEntityKey().getScope(), argument.getRefEntityKey().getKey()), - result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultAttributeEntry(argument, DEFAULT_TS))), + result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultAttributeEntry(argument, System.currentTimeMillis()))), calculatedFieldCallbackExecutor); case TS_LATEST -> Futures.transform( timeseriesService.findLatest(tenantId, entityId, argument.getRefEntityKey().getKey()), - result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultTsKvEntry(argument, DEFAULT_TS))), + result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultTsKvEntry(argument, System.currentTimeMillis()))), calculatedFieldCallbackExecutor); }; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 83e10b8194..3df43d8c2b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -40,6 +40,8 @@ public interface ArgumentEntry { Object getValue(); + long getLatestTs(); + boolean updateEntry(ArgumentEntry entry); boolean isEmpty(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 856081d7c5..b6d4ad0fac 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -48,11 +48,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { this(new ArrayList<>(), new HashMap<>(), false, DEFAULT_LAST_UPDATE_TS); } - - public long getLatestTimestamp() { - return latestTimestamp == DEFAULT_LAST_UPDATE_TS ? System.currentTimeMillis() : latestTimestamp; - } - @Override public boolean updateState(CalculatedFieldCtx ctx, Map argumentValues) { if (arguments == null) { @@ -80,7 +75,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { if (entryUpdated) { stateUpdated = true; - updateLastUpdateTimestamp(newEntry); } } @@ -116,15 +110,13 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected abstract void validateNewEntry(ArgumentEntry newEntry); - private void updateLastUpdateTimestamp(ArgumentEntry entry) { - long newTs = this.latestTimestamp; - if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - newTs = singleValueArgumentEntry.getTs(); - } else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) { - Map.Entry lastEntry = tsRollingArgumentEntry.getTsRecords().lastEntry(); - newTs = (lastEntry != null) ? lastEntry.getKey() : DEFAULT_LAST_UPDATE_TS; - } - this.latestTimestamp = Math.max(this.latestTimestamp, newTs); + public long getLatestTimestamp() { + long currentLatestTs = arguments.values().stream() + .mapToLong(ArgumentEntry::getLatestTs) + .max() + .orElse(DEFAULT_LAST_UPDATE_TS); + latestTimestamp = Math.max(currentLatestTs, latestTimestamp); + return latestTimestamp; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index e80939a952..80f5964582 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -99,9 +99,10 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { valuesNode.set(outputName, JacksonUtil.valueToTree(result)); } - if (useLatestTs) { + long latestTs = getLatestTimestamp(); + if (useLatestTs && latestTs != DEFAULT_LAST_UPDATE_TS) { ObjectNode resultNode = JacksonUtil.newObjectNode(); - resultNode.put("ts", getLatestTimestamp()); + resultNode.put("ts", latestTs); resultNode.set("values", valuesNode); return resultNode; } else { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index b26e53ef3c..186af92f38 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -31,12 +31,13 @@ import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import static org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState.DEFAULT_LAST_UPDATE_TS; + @Data @AllArgsConstructor public class SingleValueArgumentEntry implements ArgumentEntry { public static final Long DEFAULT_VERSION = -1L; - public static final Long DEFAULT_TS = -1L; private long ts; private BasicKvEntry kvEntryValue; @@ -45,7 +46,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry { private boolean forceResetPrevious; public SingleValueArgumentEntry() { - this.ts = DEFAULT_TS; + this.ts = DEFAULT_LAST_UPDATE_TS; this.version = DEFAULT_VERSION; } @@ -97,6 +98,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry { return isEmpty() ? null : kvEntryValue.getValue(); } + @Override + public long getLatestTs() { + return !isDefaultValue() ? ts : DEFAULT_LAST_UPDATE_TS; + } + @Override public TbelCfArg toTbelCfArg() { Object value = kvEntryValue.getValue(); @@ -118,7 +124,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry { @Override public boolean updateEntry(ArgumentEntry entry) { if (entry instanceof SingleValueArgumentEntry singleValueEntry) { - if (singleValueEntry.getTs() <= this.ts) { + if (singleValueEntry.getTs() < this.ts) { return false; } @@ -134,4 +140,9 @@ public class SingleValueArgumentEntry implements ArgumentEntry { } return false; } + + public boolean isDefaultValue() { + return DEFAULT_VERSION.equals(this.version); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index b5a680a072..ada46a841e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import static org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState.DEFAULT_LAST_UPDATE_TS; + @Data @NoArgsConstructor @AllArgsConstructor @@ -83,6 +85,12 @@ public class TsRollingArgumentEntry implements ArgumentEntry { return tsRecords; } + @Override + public long getLatestTs() { + var lastEntry = tsRecords.lastEntry(); + return (lastEntry != null) ? lastEntry.getKey() : DEFAULT_LAST_UPDATE_TS; + } + @Override public TbelCfArg toTbelCfArg() { List values = new ArrayList<>(tsRecords.size()); diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index 03052382f0..c83e8c5ee3 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @DaoSqlTest public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTest { @@ -571,6 +572,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes public void testScriptCalculatedFieldWhenUsedLatestTsInScript() throws Exception { Device testDevice = createDevice("Test device", "1234567890"); + long ts = System.currentTimeMillis() - 300000L; + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"temperature\":30}}", ts))); + CalculatedField calculatedField = new CalculatedField(); calculatedField.setEntityId(testDevice.getId()); calculatedField.setType(CalculatedFieldType.SCRIPT); @@ -583,7 +587,6 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes Argument argument = new Argument(); ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null); argument.setRefEntityKey(refEntityKey); - argument.setDefaultValue("20"); config.setArguments(Map.of("T", argument)); config.setExpression("return {\"ts\": ctx.latestTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};"); @@ -595,25 +598,98 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); - await().alias("create CF -> perform initial calculation with default value").atMost(TIMEOUT, TimeUnit.SECONDS) + await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isNotEqualTo("-1"); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("68.0"); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isEqualTo(Long.toString(ts)); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); }); + } - long ts = System.currentTimeMillis() - 10L; - doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"temperature\":30}}", ts))); + @Test + public void testSimpleCalculatedFieldWhenUseLatestTsIsTrueAndDefaultArguments() throws Exception { + Device testDevice = createDevice("Test device", "1234567890"); - await().alias("update telemetry -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + CalculatedField calculatedField = new CalculatedField(); + calculatedField.setEntityId(testDevice.getId()); + calculatedField.setType(CalculatedFieldType.SIMPLE); + calculatedField.setName("a + b + c"); + calculatedField.setDebugSettings(DebugSettings.all()); + calculatedField.setConfigurationVersion(1); + + SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration(); + + Argument argument1 = new Argument(); + ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null); + argument1.setRefEntityKey(refEntityKey1); + argument1.setDefaultValue("100"); + Argument argument2 = new Argument(); + ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("b", ArgumentType.TS_LATEST, null); + argument2.setRefEntityKey(refEntityKey2); + argument2.setDefaultValue("200"); + Argument argument3 = new Argument(); + ReferencedEntityKey refEntityKey3 = new ReferencedEntityKey("c", ArgumentType.TS_LATEST, null); + argument3.setRefEntityKey(refEntityKey3); + argument3.setDefaultValue("300"); + config.setArguments(Map.of("a", argument1, "b", argument2, "c", argument3)); + config.setExpression("a + b + c"); + + Output output = new Output(); + output.setName("d"); + output.setType(OutputType.TIME_SERIES); + output.setDecimalsByDefault(0); + config.setOutput(output); + + config.setUseLatestTs(true); + + calculatedField.setConfiguration(config); + + CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); + + await().alias("create CF -> perform initial calculation with default arguments").atMost(TIMEOUT, TimeUnit.SECONDS) .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { - ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isEqualTo(Long.toString(ts)); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + ObjectNode d = getLatestTelemetry(testDevice.getId(), "d"); + assertThat(d).isNotNull(); + assertThat(d.get("d").get(0).get("value").asText()).isEqualTo("600"); + }); + + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":10}")); + + await().alias("update telemetry -> save result with ts of 'a' argument").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode keys = getLatestTelemetry(testDevice.getId(), "d", "a"); + assertThat(keys).isNotNull(); + String aTs = keys.get("a").get(0).get("ts").asText(); + assertThat(keys.get("d").get(0).get("ts").asText()).isEqualTo(aTs); + assertThat(keys.get("d").get(0).get("value").asText()).isEqualTo("510"); + }); + + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"b\":20}")); + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"c\":30}")); + + await().alias("update telemetry -> save result with latest ts of updated arguments").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode keys = getLatestTelemetry(testDevice.getId(), "d"); + assertThat(keys).isNotNull(); + assertThat(keys.get("d").get(0).get("value").asText()).isEqualTo("60"); + }); + + String latestTs = getLatestTelemetry(testDevice.getId(), "d").get("d").get(0).get("ts").asText(); + + doDelete("/api/plugins/telemetry/DEVICE/" + testDevice.getId() + "/timeseries/delete?keys=b&deleteAllDataForKeys=true").andExpect(status().isOk()); + + await().alias("delete telemetry -> save result with previous latest ts and default argument").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode keys = getLatestTelemetry(testDevice.getId(), "d"); + assertThat(keys).isNotNull(); + assertThat(keys.get("d").get(0).get("ts").asText()).isEqualTo(latestTs); + assertThat(keys.get("d").get(0).get("value").asText()).isEqualTo("240"); }); } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java index 2fe861ba81..c6023154ea 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java @@ -29,7 +29,7 @@ public class TbelCfCtx implements TbelCfObject { public TbelCfCtx(Map args, long latestTs) { this.args = Collections.unmodifiableMap(args); - this.latestTs = latestTs; + this.latestTs = latestTs != -1 ? latestTs : System.currentTimeMillis(); } @Override