Browse Source

fixed integration tests and added debug events creation

pull/12678/head
IrynaMatveieva 1 year ago
parent
commit
eae72065ee
  1. 55
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 2
      application/src/main/java/org/thingsboard/server/exception/CalculatedFieldStateException.java
  3. 22
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java
  4. 284
      application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java

55
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -197,12 +197,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
state = getOrInitState(ctx);
justRestored = true;
}
if (state.updateState(newArgValues) || justRestored) {
cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId());
processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
try {
if (state.updateState(newArgValues) || justRestored) {
cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId());
processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e);
}
}
}
@ -212,37 +218,38 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state != null) {
return state;
} else {
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
states.put(ctx.getCfId(), state);
try {
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
states.put(ctx.getCfId(), state);
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e);
}
throw new RuntimeException(e);
}
}
return state;
}
@SneakyThrows
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
if (state.isReady() && ctx.isInitialized()) {
try {
try {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
if (state.isReady() && ctx.isInitialized()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResultMap()), null);
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e);
}
} else {
callback.onSuccess(); // State was updated but no calculation performed;
}
} else {
callback.onSuccess(); // State was updated but no calculation performed;
}
try {
cfStateService.persistState(ctxId, state, callback);
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {

2
application/src/main/java/org/thingsboard/server/exception/CalculatedFieldStateException.java

@ -15,7 +15,7 @@
*/
package org.thingsboard.server.exception;
public class CalculatedFieldStateException extends Exception {
public class CalculatedFieldStateException extends RuntimeException {
public CalculatedFieldStateException(String message) {
super(message);

22
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java

@ -25,6 +25,7 @@ import org.thingsboard.script.api.tbel.TbelCfTsDoubleVal;
import org.thingsboard.script.api.tbel.TbelCfTsRollingArg;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.exception.CalculatedFieldStateException;
import java.util.ArrayList;
import java.util.List;
@ -85,7 +86,7 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
}
@Override
public boolean updateEntry(ArgumentEntry entry) {
public boolean updateEntry(ArgumentEntry entry) throws CalculatedFieldStateException {
if (entry instanceof TsRollingArgumentEntry tsRollingEntry) {
updateTsRollingEntry(tsRollingEntry);
} else if (entry instanceof SingleValueArgumentEntry singleValueEntry) {
@ -107,15 +108,18 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
}
private void addTsRecord(Long ts, KvEntry value) {
switch (value.getDataType()) {
case LONG -> value.getLongValue().ifPresent(aLong -> tsRecords.put(ts, aLong.doubleValue()));
case DOUBLE -> value.getDoubleValue().ifPresent(aDouble -> tsRecords.put(ts, aDouble));
case BOOLEAN -> value.getBooleanValue().ifPresent(aBoolean -> tsRecords.put(ts, aBoolean ? 1.0 : 0.0));
case STRING -> value.getStrValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString)));
case JSON -> value.getJsonValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString)));
//TODO: try catch
try {
switch (value.getDataType()) {
case LONG -> value.getLongValue().ifPresent(aLong -> tsRecords.put(ts, aLong.doubleValue()));
case DOUBLE -> value.getDoubleValue().ifPresent(aDouble -> tsRecords.put(ts, aDouble));
case BOOLEAN -> value.getBooleanValue().ifPresent(aBoolean -> tsRecords.put(ts, aBoolean ? 1.0 : 0.0));
case STRING -> value.getStrValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString)));
case JSON -> value.getJsonValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString)));
}
cleanupExpiredRecords();
} catch (Exception e) {
throw new IllegalArgumentException("Time series rolling arguments supports only numeric values.");
}
cleanupExpiredRecords();
}
private void addTsRecord(Long ts, double value) {

284
application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java

@ -40,8 +40,10 @@ import org.thingsboard.server.controller.CalculatedFieldControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@DaoSqlTest
public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTest {
@ -84,20 +86,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
// create CF -> perform initial calculation
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
Thread.sleep(300);
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("77.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("77.0");
});
// update telemetry -> recalculate state
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
Thread.sleep(300);
fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
// update CF output -> perform calculation with updated output
Output savedOutput = savedCalculatedField.getConfiguration().getOutput();
@ -106,32 +110,35 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
savedOutput.setName("temperatureF");
savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
Thread.sleep(300);
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("86.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("86.0");
});
// update CF argument -> perform calculation with new argument
Argument savedArgument = savedCalculatedField.getConfiguration().getArguments().get("T");
savedArgument.setRefEntityKey(new ReferencedEntityKey("deviceTemperature", ArgumentType.ATTRIBUTE, AttributeScope.SERVER_SCOPE));
savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
Thread.sleep(300);
temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0");
});
// update CF expression -> perform calculation with new expression
savedCalculatedField.getConfiguration().setExpression("1.8 * T + 32");
savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
Thread.sleep(300);
temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0");
});
}
@Test
@ -164,20 +171,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
// create CF -> state is not ready -> no calculation performed
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
Thread.sleep(300);
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
});
// update telemetry -> perform calculation
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
Thread.sleep(300);
fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
}
@Test
@ -211,20 +220,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
// create CF -> perform initial calculation with default value
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
Thread.sleep(300);
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("53.6");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("53.6");
});
// update telemetry -> recalculate state
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
Thread.sleep(300);
fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
}
@Test
@ -275,93 +286,100 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
// create CF and perform initial calculation
doPost("/api/calculatedField", calculatedField, CalculatedField.class);
Thread.sleep(300);
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("51.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("51.0");
// result of asset 2
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("52.0");
// result of asset 2
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("52.0");
});
// update device telemetry -> recalculate state for all assets
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":25}"));
Thread.sleep(300);
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("36.0");
// result of asset 1
z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("36.0");
// result of asset 2
z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0");
// result of asset 2
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0");
});
// update asset 1 telemetry -> recalculate state only for asset 1
doPost("/api/plugins/telemetry/ASSET/" + asset1.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":15}"));
Thread.sleep(300);
// result of asset 1
z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0");
// result of asset 2 (no changes)
z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0");
// result of asset 2 (no changes)
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0");
});
// update asset 2 telemetry -> recalculate state only for asset 2
doPost("/api/plugins/telemetry/ASSET/" + asset2.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":5}"));
Thread.sleep(300);
// result of asset 1 (no changes)
z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1 (no changes)
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0");
// result of asset 2
z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("30.0");
// result of asset 2
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("30.0");
});
// add new entity to profile -> calculate state for new entity
Asset asset3 = createAsset("Test asset 3", assetProfile.getId());
doPost("/api/plugins/telemetry/ASSET/" + asset3.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":13}"));
Thread.sleep(300);
// result of asset 3
ArrayNode z3 = getServerAttributes(asset3.getId(), "z");
assertThat(z3).isNotNull();
assertThat(z3.get(0).get("value").asText()).isEqualTo("38.0");
Asset finalAsset3 = asset3;
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 3
ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z");
assertThat(z3).isNotNull();
assertThat(z3.get(0).get("value").asText()).isEqualTo("38.0");
});
// update device telemetry -> recalculate state for all assets
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":20}"));
Thread.sleep(300);
// result of asset 1
z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("35.0");
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("35.0");
// result of asset 2
z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("25.0");
// result of asset 2
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("25.0");
// result of asset 3
z3 = getServerAttributes(asset3.getId(), "z");
assertThat(z3).isNotNull();
assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0");
// result of asset 3
ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z");
assertThat(z3).isNotNull();
assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0");
});
// update profile for asset 3 -> delete state for asset 3
AssetProfile newAssetProfile = doPost("/api/assetProfile", createAssetProfile("New Asset Profile"), AssetProfile.class);
@ -371,22 +389,24 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
// update device telemetry -> recalculate state for asset 1 and asset 2
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":15}"));
Thread.sleep(300);
// result of asset 1
z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("30.0");
// result of asset 2
z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("20.0");
// no changes for asset 3
z3 = getServerAttributes(asset3.getId(), "z");
assertThat(z3).isNotNull();
assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0");
Asset updatedAsset3 = asset3;
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
assertThat(z1).isNotNull();
assertThat(z1.get(0).get("value").asText()).isEqualTo("30.0");
// result of asset 2
ArrayNode z2 = getServerAttributes(asset2.getId(), "z");
assertThat(z2).isNotNull();
assertThat(z2.get(0).get("value").asText()).isEqualTo("20.0");
// no changes for asset 3
ArrayNode z3 = getServerAttributes(updatedAsset3.getId(), "z");
assertThat(z3).isNotNull();
assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0");
});
}
@Test
@ -421,20 +441,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
// create CF -> ctx is not initialized -> no calculation perform
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
Thread.sleep(300);
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
});
// update telemetry -> ctx is not initialized -> no calculation perform
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
Thread.sleep(300);
fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
});
}
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {

Loading…
Cancel
Save