From d568ee8536d51ceb8fa47ed199b82244b8c5bba3 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 23 Jan 2026 16:55:46 +0200 Subject: [PATCH] fix: process cfs that uses the same key and some of cfs throws error --- ...CalculatedFieldEntityMessageProcessor.java | 27 +++++- .../cf/ctx/state/CalculatedFieldCtx.java | 2 +- .../cf/CalculatedFieldIntegrationTest.java | 93 +++++++++++++++++++ .../server/controller/AbstractWebTest.java | 13 +++ 4 files changed, 131 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 752e8a1c11..b95beb81a1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -131,11 +131,12 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state.isSizeOk()) { processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, msg.getCallback()); } else { - throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(); } } catch (Exception e) { if (e instanceof CalculatedFieldException cfe) { - throw cfe; + persistDebugErrorIfEnabled(cfe, msg.getCallback()); + return; } throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } @@ -200,6 +201,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } } catch (Exception e) { + if (e instanceof CalculatedFieldException cfe) { + persistDebugErrorIfEnabled(cfe, callback); + return; + } throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } } @@ -223,7 +228,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } catch (Exception e) { if (e instanceof CalculatedFieldException cfe) { - throw cfe; + persistDebugErrorIfEnabled(cfe, callback); + return; } throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } @@ -485,4 +491,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return null; } + private void persistDebugErrorIfEnabled(CalculatedFieldException cfe, TbCallback callback) { + if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) { + String message; + if (cfe.getErrorMessage() != null) { + message = cfe.getErrorMessage(); + } else if (cfe.getCause() != null) { + message = cfe.getCause().getMessage(); + } else { + message = "N/A"; + } + systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message); + } + callback.onSuccess(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 5d0ff5c2ff..373e2f588f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -310,7 +310,7 @@ public class CalculatedFieldCtx { } public String getSizeExceedsLimitMessage() { - return "Failed to init CF state. State size exceeds limit of " + (maxStateSize / 1024) + "Kb!"; + return "State size exceeds limit of " + (maxStateSize / 1024) + "Kb!"; } } diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index a21b081086..7f905ec0ae 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -23,6 +23,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.cf.CalculatedField; @@ -36,7 +37,9 @@ import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedField import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.debug.DebugSettings; import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.controller.CalculatedFieldControllerTest; import org.thingsboard.server.dao.service.DaoSqlTest; @@ -875,6 +878,96 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes }); } + @Test + public void testCalculatedFieldsWhenOneIsInvalid() throws Exception { + Device testDevice = createDevice("Test device", "1234567890"); + long now = System.currentTimeMillis(); + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"a\":5}}", now - TimeUnit.MINUTES.toMillis(3)))); + + // Script CF - invalid + CalculatedField invalidCF = new CalculatedField(); + invalidCF.setEntityId(testDevice.getId()); + invalidCF.setType(CalculatedFieldType.SCRIPT); + invalidCF.setName("Script CF"); + invalidCF.setDebugSettings(DebugSettings.all()); + + ScriptCalculatedFieldConfiguration scriptConfig = new ScriptCalculatedFieldConfiguration(); + + ReferencedEntityKey refEntityKeyA = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null); + Argument argumentA = new Argument(); + argumentA.setRefEntityKey(refEntityKeyA); + scriptConfig.setArguments(Map.of("a", argumentA)); + scriptConfig.setExpression(""" + return { + "temperature": temp + }; + """); + + Output scriptOutput = new Output(); + scriptOutput.setType(OutputType.TIME_SERIES); + scriptConfig.setOutput(scriptOutput); + + invalidCF.setConfiguration(scriptConfig); + + invalidCF = doPost("/api/calculatedField", invalidCF, CalculatedField.class); + CalculatedFieldId invalidCfId = invalidCF.getId(); + + await().alias("create invalid CF -> check error").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + PageData debugEvents = getDebugEvents(tenantId, invalidCfId, 1); + if (!debugEvents.getData().isEmpty()) { + EventInfo eventInfo = debugEvents.getData().get(0); + assertThat(eventInfo.getBody().has("error")).isTrue(); + } + }); + + // Simple CF - valid + CalculatedField validCF = new CalculatedField(); + validCF.setEntityId(testDevice.getId()); + validCF.setType(CalculatedFieldType.SIMPLE); + validCF.setName("Simple CF"); + validCF.setDebugSettings(DebugSettings.all()); + + SimpleCalculatedFieldConfiguration simpleConfig = new SimpleCalculatedFieldConfiguration(); + simpleConfig.setArguments(Map.of("a", argumentA)); + simpleConfig.setExpression("a+1"); + + Output simpleOutput = new Output(); + simpleOutput.setName("a+1"); + simpleOutput.setType(OutputType.TIME_SERIES); + simpleOutput.setDecimalsByDefault(0); + simpleConfig.setOutput(simpleOutput); + + validCF.setConfiguration(simpleConfig); + + validCF = doPost("/api/calculatedField", validCF, CalculatedField.class); + CalculatedFieldId validCfId = validCF.getId(); + + await().alias("create CF -> check initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + PageData debugEvents = getDebugEvents(tenantId, validCfId, 1); + if (!debugEvents.getData().isEmpty()) { + EventInfo eventInfo = debugEvents.getData().get(0); + assertThat(eventInfo.getBody().has("error")).isFalse(); + } + ObjectNode result = getLatestTelemetry(testDevice.getId(), "a+1"); + assertThat(result).isNotNull(); + assertThat(result.get("a+1").get(0).get("value").asText()).isEqualTo("6"); + }); + + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":6}")); + + await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode result = getLatestTelemetry(testDevice.getId(), "a+1"); + assertThat(result).isNotNull(); + assertThat(result.get("a+1").get(0).get("value").asText()).isEqualTo("7"); + }); + } + private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception { return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class); } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 44c383a5f0..3b7286cf01 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -80,6 +80,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileType; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TbResourceInfo; @@ -99,6 +100,7 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -1298,4 +1300,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { doPost("/api/job/" + jobId + "/reprocess").andExpect(status().isOk()); } + protected PageData getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception { + return getEvents(tenantId, entityId, EventType.DEBUG_RULE_NODE, limit); + } + + protected PageData getEvents(TenantId tenantId, EntityId entityId, EventType eventType, int limit) throws Exception { + TimePageLink pageLink = new TimePageLink(limit); + return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&", + new TypeReference>() { + }, pageLink, entityId.getEntityType(), entityId.getId(), eventType, tenantId.getId()); + } + }