diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index d3c26e2640..d344a9d92d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -41,6 +41,7 @@ import org.thingsboard.rule.engine.api.notification.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; import org.thingsboard.script.api.js.JsInvokeService; import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldException; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.actors.tenant.DebugTbRateLimits; import org.thingsboard.server.cache.limits.RateLimitService; @@ -97,8 +98,8 @@ import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.queue.QueueStatsService; import org.thingsboard.server.dao.relation.RelationService; -import org.thingsboard.server.dao.resource.TbResourceDataCache; import org.thingsboard.server.dao.resource.ResourceService; +import org.thingsboard.server.dao.resource.TbResourceDataCache; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleNodeStateService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; @@ -824,6 +825,18 @@ public class ActorSystemContext { Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); } + public void persistCalculatedFieldDebugError(CalculatedFieldException cfe) { + String message; + if (cfe.getErrorMessage() != null) { + message = cfe.getErrorMessage(); + } else if (cfe.getCause() != null) { + message = cfe.getCause().getMessage(); + } else { + message = "N/A"; + } + persistCalculatedFieldDebugEvent(cfe.getCtx().getTenantId(), cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message); + } + public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, String errorMessage) { if (checkLimits(tenantId)) { try { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java index 9f82b536fd..29c7fe6587 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java @@ -41,15 +41,7 @@ public abstract class AbstractCalculatedFieldActor extends ContextAwareActor { return doProcessCfMsg(cfm); } catch (CalculatedFieldException cfe) { if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) { - String message; - if (cfe.getErrorMessage() != null) { - message = cfe.getErrorMessage(); - } else if (cfe.getCause() != null) { - message = cfe.getCause().getMessage(); - } else { - message = "N/A"; - } - systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message); + systemContext.persistCalculatedFieldDebugError(cfe); } cause = cfe.getCause(); } catch (Exception e) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 752e8a1c11..4f59628656 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -66,7 +66,6 @@ import java.util.stream.Collectors; import static org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry.getValueForTsRecord; - /** * @author Andrew Shvayka */ @@ -131,7 +130,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state.isSizeOk()) { processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, msg.getCallback()); } else { - throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(); } } catch (Exception e) { if (e instanceof CalculatedFieldException cfe) { @@ -200,6 +199,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } } catch (Exception e) { + if (e instanceof CalculatedFieldException cfe) { + throw cfe; + } throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } } @@ -223,7 +225,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } catch (Exception e) { if (e instanceof CalculatedFieldException cfe) { - throw cfe; + if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) { + systemContext.persistCalculatedFieldDebugError(cfe); + } + callback.onSuccess(); + return; } throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 5d0ff5c2ff..373e2f588f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -310,7 +310,7 @@ public class CalculatedFieldCtx { } public String getSizeExceedsLimitMessage() { - return "Failed to init CF state. State size exceeds limit of " + (maxStateSize / 1024) + "Kb!"; + return "State size exceeds limit of " + (maxStateSize / 1024) + "Kb!"; } } diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index a21b081086..7f905ec0ae 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -23,6 +23,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.cf.CalculatedField; @@ -36,7 +37,9 @@ import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedField import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.debug.DebugSettings; import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.controller.CalculatedFieldControllerTest; import org.thingsboard.server.dao.service.DaoSqlTest; @@ -875,6 +878,96 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes }); } + @Test + public void testCalculatedFieldsWhenOneIsInvalid() throws Exception { + Device testDevice = createDevice("Test device", "1234567890"); + long now = System.currentTimeMillis(); + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"a\":5}}", now - TimeUnit.MINUTES.toMillis(3)))); + + // Script CF - invalid + CalculatedField invalidCF = new CalculatedField(); + invalidCF.setEntityId(testDevice.getId()); + invalidCF.setType(CalculatedFieldType.SCRIPT); + invalidCF.setName("Script CF"); + invalidCF.setDebugSettings(DebugSettings.all()); + + ScriptCalculatedFieldConfiguration scriptConfig = new ScriptCalculatedFieldConfiguration(); + + ReferencedEntityKey refEntityKeyA = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null); + Argument argumentA = new Argument(); + argumentA.setRefEntityKey(refEntityKeyA); + scriptConfig.setArguments(Map.of("a", argumentA)); + scriptConfig.setExpression(""" + return { + "temperature": temp + }; + """); + + Output scriptOutput = new Output(); + scriptOutput.setType(OutputType.TIME_SERIES); + scriptConfig.setOutput(scriptOutput); + + invalidCF.setConfiguration(scriptConfig); + + invalidCF = doPost("/api/calculatedField", invalidCF, CalculatedField.class); + CalculatedFieldId invalidCfId = invalidCF.getId(); + + await().alias("create invalid CF -> check error").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + PageData debugEvents = getDebugEvents(tenantId, invalidCfId, 1); + if (!debugEvents.getData().isEmpty()) { + EventInfo eventInfo = debugEvents.getData().get(0); + assertThat(eventInfo.getBody().has("error")).isTrue(); + } + }); + + // Simple CF - valid + CalculatedField validCF = new CalculatedField(); + validCF.setEntityId(testDevice.getId()); + validCF.setType(CalculatedFieldType.SIMPLE); + validCF.setName("Simple CF"); + validCF.setDebugSettings(DebugSettings.all()); + + SimpleCalculatedFieldConfiguration simpleConfig = new SimpleCalculatedFieldConfiguration(); + simpleConfig.setArguments(Map.of("a", argumentA)); + simpleConfig.setExpression("a+1"); + + Output simpleOutput = new Output(); + simpleOutput.setName("a+1"); + simpleOutput.setType(OutputType.TIME_SERIES); + simpleOutput.setDecimalsByDefault(0); + simpleConfig.setOutput(simpleOutput); + + validCF.setConfiguration(simpleConfig); + + validCF = doPost("/api/calculatedField", validCF, CalculatedField.class); + CalculatedFieldId validCfId = validCF.getId(); + + await().alias("create CF -> check initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + PageData debugEvents = getDebugEvents(tenantId, validCfId, 1); + if (!debugEvents.getData().isEmpty()) { + EventInfo eventInfo = debugEvents.getData().get(0); + assertThat(eventInfo.getBody().has("error")).isFalse(); + } + ObjectNode result = getLatestTelemetry(testDevice.getId(), "a+1"); + assertThat(result).isNotNull(); + assertThat(result.get("a+1").get(0).get("value").asText()).isEqualTo("6"); + }); + + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":6}")); + + await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode result = getLatestTelemetry(testDevice.getId(), "a+1"); + assertThat(result).isNotNull(); + assertThat(result.get("a+1").get(0).get("value").asText()).isEqualTo("7"); + }); + } + private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception { return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class); } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 44c383a5f0..3b7286cf01 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -80,6 +80,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileType; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TbResourceInfo; @@ -99,6 +100,7 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -1298,4 +1300,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { doPost("/api/job/" + jobId + "/reprocess").andExpect(status().isOk()); } + protected PageData getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception { + return getEvents(tenantId, entityId, EventType.DEBUG_RULE_NODE, limit); + } + + protected PageData getEvents(TenantId tenantId, EntityId entityId, EventType eventType, int limit) throws Exception { + TimePageLink pageLink = new TimePageLink(limit); + return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&", + new TypeReference>() { + }, pageLink, entityId.getEntityType(), entityId.getId(), eventType, tenantId.getId()); + } + }