Browse Source

added getLatestTs method and added tests

pull/14526/head
IrynaMatveieva 6 months ago
parent
commit
4e2b4fc921
  1. 5
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  2. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java
  3. 22
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  4. 5
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java
  5. 17
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java
  6. 8
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java
  7. 98
      application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java
  8. 2
      common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java

5
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -74,7 +74,6 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
import static org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry.DEFAULT_TS;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultTsKvEntry;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createStateByType;
@ -239,11 +238,11 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument);
case ATTRIBUTE -> Futures.transform(
attributesService.find(tenantId, entityId, argument.getRefEntityKey().getScope(), argument.getRefEntityKey().getKey()),
result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultAttributeEntry(argument, DEFAULT_TS))),
result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultAttributeEntry(argument, System.currentTimeMillis()))),
calculatedFieldCallbackExecutor);
case TS_LATEST -> Futures.transform(
timeseriesService.findLatest(tenantId, entityId, argument.getRefEntityKey().getKey()),
result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultTsKvEntry(argument, DEFAULT_TS))),
result -> transformSingleValueArgument(result.orElseGet(() -> createDefaultTsKvEntry(argument, System.currentTimeMillis()))),
calculatedFieldCallbackExecutor);
};
}

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java

@ -40,6 +40,8 @@ public interface ArgumentEntry {
Object getValue();
long getLatestTs();
boolean updateEntry(ArgumentEntry entry);
boolean isEmpty();

22
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -48,11 +48,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
this(new ArrayList<>(), new HashMap<>(), false, DEFAULT_LAST_UPDATE_TS);
}
public long getLatestTimestamp() {
return latestTimestamp == DEFAULT_LAST_UPDATE_TS ? System.currentTimeMillis() : latestTimestamp;
}
@Override
public boolean updateState(CalculatedFieldCtx ctx, Map<String, ArgumentEntry> argumentValues) {
if (arguments == null) {
@ -80,7 +75,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
if (entryUpdated) {
stateUpdated = true;
updateLastUpdateTimestamp(newEntry);
}
}
@ -116,15 +110,13 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected abstract void validateNewEntry(ArgumentEntry newEntry);
private void updateLastUpdateTimestamp(ArgumentEntry entry) {
long newTs = this.latestTimestamp;
if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
newTs = singleValueArgumentEntry.getTs();
} else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) {
Map.Entry<Long, Double> lastEntry = tsRollingArgumentEntry.getTsRecords().lastEntry();
newTs = (lastEntry != null) ? lastEntry.getKey() : DEFAULT_LAST_UPDATE_TS;
}
this.latestTimestamp = Math.max(this.latestTimestamp, newTs);
public long getLatestTimestamp() {
long currentLatestTs = arguments.values().stream()
.mapToLong(ArgumentEntry::getLatestTs)
.max()
.orElse(DEFAULT_LAST_UPDATE_TS);
latestTimestamp = Math.max(currentLatestTs, latestTimestamp);
return latestTimestamp;
}
}

5
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java

@ -99,9 +99,10 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
}
if (useLatestTs) {
long latestTs = getLatestTimestamp();
if (useLatestTs && latestTs != DEFAULT_LAST_UPDATE_TS) {
ObjectNode resultNode = JacksonUtil.newObjectNode();
resultNode.put("ts", getLatestTimestamp());
resultNode.put("ts", latestTs);
resultNode.set("values", valuesNode);
return resultNode;
} else {

17
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java

@ -31,12 +31,13 @@ import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import static org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState.DEFAULT_LAST_UPDATE_TS;
@Data
@AllArgsConstructor
public class SingleValueArgumentEntry implements ArgumentEntry {
public static final Long DEFAULT_VERSION = -1L;
public static final Long DEFAULT_TS = -1L;
private long ts;
private BasicKvEntry kvEntryValue;
@ -45,7 +46,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
private boolean forceResetPrevious;
public SingleValueArgumentEntry() {
this.ts = DEFAULT_TS;
this.ts = DEFAULT_LAST_UPDATE_TS;
this.version = DEFAULT_VERSION;
}
@ -97,6 +98,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
return isEmpty() ? null : kvEntryValue.getValue();
}
@Override
public long getLatestTs() {
return !isDefaultValue() ? ts : DEFAULT_LAST_UPDATE_TS;
}
@Override
public TbelCfArg toTbelCfArg() {
Object value = kvEntryValue.getValue();
@ -118,7 +124,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
@Override
public boolean updateEntry(ArgumentEntry entry) {
if (entry instanceof SingleValueArgumentEntry singleValueEntry) {
if (singleValueEntry.getTs() <= this.ts) {
if (singleValueEntry.getTs() < this.ts) {
return false;
}
@ -134,4 +140,9 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
}
return false;
}
public boolean isDefaultValue() {
return DEFAULT_VERSION.equals(this.version);
}
}

8
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java

@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState.DEFAULT_LAST_UPDATE_TS;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ -83,6 +85,12 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
return tsRecords;
}
@Override
public long getLatestTs() {
var lastEntry = tsRecords.lastEntry();
return (lastEntry != null) ? lastEntry.getKey() : DEFAULT_LAST_UPDATE_TS;
}
@Override
public TbelCfArg toTbelCfArg() {
List<TbelCfTsDoubleVal> values = new ArrayList<>(tsRecords.size());

98
application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java

@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTest {
@ -571,6 +572,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
public void testScriptCalculatedFieldWhenUsedLatestTsInScript() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
long ts = System.currentTimeMillis() - 300000L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"temperature\":30}}", ts)));
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SCRIPT);
@ -583,7 +587,6 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
Argument argument = new Argument();
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null);
argument.setRefEntityKey(refEntityKey);
argument.setDefaultValue("20");
config.setArguments(Map.of("T", argument));
config.setExpression("return {\"ts\": ctx.latestTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};");
@ -595,25 +598,98 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().alias("create CF -> perform initial calculation with default value").atMost(TIMEOUT, TimeUnit.SECONDS)
await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isNotEqualTo("-1");
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("68.0");
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isEqualTo(Long.toString(ts));
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
}
long ts = System.currentTimeMillis() - 10L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"temperature\":30}}", ts)));
@Test
public void testSimpleCalculatedFieldWhenUseLatestTsIsTrueAndDefaultArguments() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
await().alias("update telemetry -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SIMPLE);
calculatedField.setName("a + b + c");
calculatedField.setDebugSettings(DebugSettings.all());
calculatedField.setConfigurationVersion(1);
SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration();
Argument argument1 = new Argument();
ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null);
argument1.setRefEntityKey(refEntityKey1);
argument1.setDefaultValue("100");
Argument argument2 = new Argument();
ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("b", ArgumentType.TS_LATEST, null);
argument2.setRefEntityKey(refEntityKey2);
argument2.setDefaultValue("200");
Argument argument3 = new Argument();
ReferencedEntityKey refEntityKey3 = new ReferencedEntityKey("c", ArgumentType.TS_LATEST, null);
argument3.setRefEntityKey(refEntityKey3);
argument3.setDefaultValue("300");
config.setArguments(Map.of("a", argument1, "b", argument2, "c", argument3));
config.setExpression("a + b + c");
Output output = new Output();
output.setName("d");
output.setType(OutputType.TIME_SERIES);
output.setDecimalsByDefault(0);
config.setOutput(output);
config.setUseLatestTs(true);
calculatedField.setConfiguration(config);
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().alias("create CF -> perform initial calculation with default arguments").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isEqualTo(Long.toString(ts));
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
ObjectNode d = getLatestTelemetry(testDevice.getId(), "d");
assertThat(d).isNotNull();
assertThat(d.get("d").get(0).get("value").asText()).isEqualTo("600");
});
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":10}"));
await().alias("update telemetry -> save result with ts of 'a' argument").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode keys = getLatestTelemetry(testDevice.getId(), "d", "a");
assertThat(keys).isNotNull();
String aTs = keys.get("a").get(0).get("ts").asText();
assertThat(keys.get("d").get(0).get("ts").asText()).isEqualTo(aTs);
assertThat(keys.get("d").get(0).get("value").asText()).isEqualTo("510");
});
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"b\":20}"));
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"c\":30}"));
await().alias("update telemetry -> save result with latest ts of updated arguments").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode keys = getLatestTelemetry(testDevice.getId(), "d");
assertThat(keys).isNotNull();
assertThat(keys.get("d").get(0).get("value").asText()).isEqualTo("60");
});
String latestTs = getLatestTelemetry(testDevice.getId(), "d").get("d").get(0).get("ts").asText();
doDelete("/api/plugins/telemetry/DEVICE/" + testDevice.getId() + "/timeseries/delete?keys=b&deleteAllDataForKeys=true").andExpect(status().isOk());
await().alias("delete telemetry -> save result with previous latest ts and default argument").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode keys = getLatestTelemetry(testDevice.getId(), "d");
assertThat(keys).isNotNull();
assertThat(keys.get("d").get(0).get("ts").asText()).isEqualTo(latestTs);
assertThat(keys.get("d").get(0).get("value").asText()).isEqualTo("240");
});
}

2
common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java

@ -29,7 +29,7 @@ public class TbelCfCtx implements TbelCfObject {
public TbelCfCtx(Map<String, TbelCfArg> args, long latestTs) {
this.args = Collections.unmodifiableMap(args);
this.latestTs = latestTs;
this.latestTs = latestTs != -1 ? latestTs : System.currentTimeMillis();
}
@Override

Loading…
Cancel
Save