diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index bfed515eb5..66b32dfbfe 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -197,12 +197,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM state = getOrInitState(ctx); justRestored = true; } - if (state.updateState(newArgValues) || justRestored) { - cfIdList = new ArrayList<>(cfIdList); - cfIdList.add(ctx.getCfId()); - processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback); - } else { - callback.onSuccess(CALLBACKS_PER_CF); + try { + if (state.updateState(newArgValues) || justRestored) { + cfIdList = new ArrayList<>(cfIdList); + cfIdList.add(ctx.getCfId()); + processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback); + } else { + callback.onSuccess(CALLBACKS_PER_CF); + } + } catch (Exception e) { + if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e); + } } } @@ -212,37 +218,38 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state != null) { return state; } else { - ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); - // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. - // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. - // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, - // but this will significantly complicate the code. - state = stateFuture.get(1, TimeUnit.MINUTES); - states.put(ctx.getCfId(), state); + try { + ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); + // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. + // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. + // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, + // but this will significantly complicate the code. + state = stateFuture.get(1, TimeUnit.MINUTES); + states.put(ctx.getCfId(), state); + } catch (Exception e) { + if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e); + } + throw new RuntimeException(e); + } } return state; } @SneakyThrows private void processStateIfReady(CalculatedFieldCtx ctx, List cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); - if (state.isReady() && ctx.isInitialized()) { - try { + try { + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); + if (state.isReady() && ctx.isInitialized()) { CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes()); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResultMap()), null); } - } catch (Exception e) { - if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e); - } + } else { + callback.onSuccess(); // State was updated but no calculation performed; } - } else { - callback.onSuccess(); // State was updated but no calculation performed; - } - try { cfStateService.persistState(ctxId, state, callback); } catch (Exception e) { if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { diff --git a/application/src/main/java/org/thingsboard/server/exception/CalculatedFieldStateException.java b/application/src/main/java/org/thingsboard/server/exception/CalculatedFieldStateException.java index 6248ac1536..50ab512cb9 100644 --- a/application/src/main/java/org/thingsboard/server/exception/CalculatedFieldStateException.java +++ b/application/src/main/java/org/thingsboard/server/exception/CalculatedFieldStateException.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.exception; -public class CalculatedFieldStateException extends Exception { +public class CalculatedFieldStateException extends RuntimeException { public CalculatedFieldStateException(String message) { super(message); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index 0834b5583e..cdad1145e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -25,6 +25,7 @@ import org.thingsboard.script.api.tbel.TbelCfTsDoubleVal; import org.thingsboard.script.api.tbel.TbelCfTsRollingArg; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.exception.CalculatedFieldStateException; import java.util.ArrayList; import java.util.List; @@ -85,7 +86,7 @@ public class TsRollingArgumentEntry implements ArgumentEntry { } @Override - public boolean updateEntry(ArgumentEntry entry) { + public boolean updateEntry(ArgumentEntry entry) throws CalculatedFieldStateException { if (entry instanceof TsRollingArgumentEntry tsRollingEntry) { updateTsRollingEntry(tsRollingEntry); } else if (entry instanceof SingleValueArgumentEntry singleValueEntry) { @@ -107,15 +108,18 @@ public class TsRollingArgumentEntry implements ArgumentEntry { } private void addTsRecord(Long ts, KvEntry value) { - switch (value.getDataType()) { - case LONG -> value.getLongValue().ifPresent(aLong -> tsRecords.put(ts, aLong.doubleValue())); - case DOUBLE -> value.getDoubleValue().ifPresent(aDouble -> tsRecords.put(ts, aDouble)); - case BOOLEAN -> value.getBooleanValue().ifPresent(aBoolean -> tsRecords.put(ts, aBoolean ? 1.0 : 0.0)); - case STRING -> value.getStrValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString))); - case JSON -> value.getJsonValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString))); - //TODO: try catch + try { + switch (value.getDataType()) { + case LONG -> value.getLongValue().ifPresent(aLong -> tsRecords.put(ts, aLong.doubleValue())); + case DOUBLE -> value.getDoubleValue().ifPresent(aDouble -> tsRecords.put(ts, aDouble)); + case BOOLEAN -> value.getBooleanValue().ifPresent(aBoolean -> tsRecords.put(ts, aBoolean ? 1.0 : 0.0)); + case STRING -> value.getStrValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString))); + case JSON -> value.getJsonValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString))); + } + cleanupExpiredRecords(); + } catch (Exception e) { + throw new IllegalArgumentException("Time series rolling arguments supports only numeric values."); } - cleanupExpiredRecords(); } private void addTsRecord(Long ts, double value) { diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index 91aad7b1cd..f544f463fa 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -40,8 +40,10 @@ import org.thingsboard.server.controller.CalculatedFieldControllerTest; import org.thingsboard.server.dao.service.DaoSqlTest; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; @DaoSqlTest public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTest { @@ -84,20 +86,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes // create CF -> perform initial calculation CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); - Thread.sleep(300); - - ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("77.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("77.0"); + }); // update telemetry -> recalculate state doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); - Thread.sleep(300); - - fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + }); // update CF output -> perform calculation with updated output Output savedOutput = savedCalculatedField.getConfiguration().getOutput(); @@ -106,32 +110,35 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes savedOutput.setName("temperatureF"); savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class); - Thread.sleep(300); - - ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); - assertThat(temperatureF).isNotNull(); - assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("86.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); + assertThat(temperatureF).isNotNull(); + assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("86.0"); + }); // update CF argument -> perform calculation with new argument Argument savedArgument = savedCalculatedField.getConfiguration().getArguments().get("T"); savedArgument.setRefEntityKey(new ReferencedEntityKey("deviceTemperature", ArgumentType.ATTRIBUTE, AttributeScope.SERVER_SCOPE)); savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class); - Thread.sleep(300); - - temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); - assertThat(temperatureF).isNotNull(); - assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); + assertThat(temperatureF).isNotNull(); + assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0"); + }); // update CF expression -> perform calculation with new expression savedCalculatedField.getConfiguration().setExpression("1.8 * T + 32"); savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class); - Thread.sleep(300); - - temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); - assertThat(temperatureF).isNotNull(); - assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); + assertThat(temperatureF).isNotNull(); + assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0"); + }); } @Test @@ -164,20 +171,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes // create CF -> state is not ready -> no calculation performed CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); - Thread.sleep(300); - - ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue(); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue(); + }); // update telemetry -> perform calculation doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); - Thread.sleep(300); - - fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + }); } @Test @@ -211,20 +220,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes // create CF -> perform initial calculation with default value CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); - Thread.sleep(300); - - ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("53.6"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("53.6"); + }); // update telemetry -> recalculate state doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); - Thread.sleep(300); - - fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0"); + }); } @Test @@ -275,93 +286,100 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes // create CF and perform initial calculation doPost("/api/calculatedField", calculatedField, CalculatedField.class); - Thread.sleep(300); - - // result of asset 1 - ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); - assertThat(z1).isNotNull(); - assertThat(z1.get(0).get("value").asText()).isEqualTo("51.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 1 + ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); + assertThat(z1).isNotNull(); + assertThat(z1.get(0).get("value").asText()).isEqualTo("51.0"); - // result of asset 2 - ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); - assertThat(z2).isNotNull(); - assertThat(z2.get(0).get("value").asText()).isEqualTo("52.0"); + // result of asset 2 + ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); + assertThat(z2).isNotNull(); + assertThat(z2.get(0).get("value").asText()).isEqualTo("52.0"); + }); // update device telemetry -> recalculate state for all assets doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":25}")); - Thread.sleep(300); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 1 + ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); + assertThat(z1).isNotNull(); + assertThat(z1.get(0).get("value").asText()).isEqualTo("36.0"); - // result of asset 1 - z1 = getServerAttributes(asset1.getId(), "z"); - assertThat(z1).isNotNull(); - assertThat(z1.get(0).get("value").asText()).isEqualTo("36.0"); - - // result of asset 2 - z2 = getServerAttributes(asset2.getId(), "z"); - assertThat(z2).isNotNull(); - assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0"); + // result of asset 2 + ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); + assertThat(z2).isNotNull(); + assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0"); + }); // update asset 1 telemetry -> recalculate state only for asset 1 doPost("/api/plugins/telemetry/ASSET/" + asset1.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":15}")); - Thread.sleep(300); - - // result of asset 1 - z1 = getServerAttributes(asset1.getId(), "z"); - assertThat(z1).isNotNull(); - assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 1 + ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); + assertThat(z1).isNotNull(); + assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0"); - // result of asset 2 (no changes) - z2 = getServerAttributes(asset2.getId(), "z"); - assertThat(z2).isNotNull(); - assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0"); + // result of asset 2 (no changes) + ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); + assertThat(z2).isNotNull(); + assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0"); + }); // update asset 2 telemetry -> recalculate state only for asset 2 doPost("/api/plugins/telemetry/ASSET/" + asset2.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":5}")); - Thread.sleep(300); - - // result of asset 1 (no changes) - z1 = getServerAttributes(asset1.getId(), "z"); - assertThat(z1).isNotNull(); - assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 1 (no changes) + ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); + assertThat(z1).isNotNull(); + assertThat(z1.get(0).get("value").asText()).isEqualTo("40.0"); - // result of asset 2 - z2 = getServerAttributes(asset2.getId(), "z"); - assertThat(z2).isNotNull(); - assertThat(z2.get(0).get("value").asText()).isEqualTo("30.0"); + // result of asset 2 + ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); + assertThat(z2).isNotNull(); + assertThat(z2.get(0).get("value").asText()).isEqualTo("30.0"); + }); // add new entity to profile -> calculate state for new entity Asset asset3 = createAsset("Test asset 3", assetProfile.getId()); doPost("/api/plugins/telemetry/ASSET/" + asset3.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":13}")); - Thread.sleep(300); - - // result of asset 3 - ArrayNode z3 = getServerAttributes(asset3.getId(), "z"); - assertThat(z3).isNotNull(); - assertThat(z3.get(0).get("value").asText()).isEqualTo("38.0"); + Asset finalAsset3 = asset3; + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 3 + ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z"); + assertThat(z3).isNotNull(); + assertThat(z3.get(0).get("value").asText()).isEqualTo("38.0"); + }); // update device telemetry -> recalculate state for all assets doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":20}")); - Thread.sleep(300); - - // result of asset 1 - z1 = getServerAttributes(asset1.getId(), "z"); - assertThat(z1).isNotNull(); - assertThat(z1.get(0).get("value").asText()).isEqualTo("35.0"); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 1 + ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); + assertThat(z1).isNotNull(); + assertThat(z1.get(0).get("value").asText()).isEqualTo("35.0"); - // result of asset 2 - z2 = getServerAttributes(asset2.getId(), "z"); - assertThat(z2).isNotNull(); - assertThat(z2.get(0).get("value").asText()).isEqualTo("25.0"); + // result of asset 2 + ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); + assertThat(z2).isNotNull(); + assertThat(z2.get(0).get("value").asText()).isEqualTo("25.0"); - // result of asset 3 - z3 = getServerAttributes(asset3.getId(), "z"); - assertThat(z3).isNotNull(); - assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0"); + // result of asset 3 + ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z"); + assertThat(z3).isNotNull(); + assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0"); + }); // update profile for asset 3 -> delete state for asset 3 AssetProfile newAssetProfile = doPost("/api/assetProfile", createAssetProfile("New Asset Profile"), AssetProfile.class); @@ -371,22 +389,24 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes // update device telemetry -> recalculate state for asset 1 and asset 2 doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":15}")); - Thread.sleep(300); - - // result of asset 1 - z1 = getServerAttributes(asset1.getId(), "z"); - assertThat(z1).isNotNull(); - assertThat(z1.get(0).get("value").asText()).isEqualTo("30.0"); - - // result of asset 2 - z2 = getServerAttributes(asset2.getId(), "z"); - assertThat(z2).isNotNull(); - assertThat(z2.get(0).get("value").asText()).isEqualTo("20.0"); - - // no changes for asset 3 - z3 = getServerAttributes(asset3.getId(), "z"); - assertThat(z3).isNotNull(); - assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0"); + Asset updatedAsset3 = asset3; + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // result of asset 1 + ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); + assertThat(z1).isNotNull(); + assertThat(z1.get(0).get("value").asText()).isEqualTo("30.0"); + + // result of asset 2 + ArrayNode z2 = getServerAttributes(asset2.getId(), "z"); + assertThat(z2).isNotNull(); + assertThat(z2.get(0).get("value").asText()).isEqualTo("20.0"); + + // no changes for asset 3 + ArrayNode z3 = getServerAttributes(updatedAsset3.getId(), "z"); + assertThat(z3).isNotNull(); + assertThat(z3.get(0).get("value").asText()).isEqualTo("33.0"); + }); } @Test @@ -421,20 +441,22 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes // create CF -> ctx is not initialized -> no calculation perform CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); - Thread.sleep(300); - - ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue(); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue(); + }); // update telemetry -> ctx is not initialized -> no calculation perform doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); - Thread.sleep(300); - - fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); - assertThat(fahrenheitTemp).isNotNull(); - assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue(); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); + assertThat(fahrenheitTemp).isNotNull(); + assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue(); + }); } private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {