Browse Source

Merge pull request #14887 from irynamatveieva/lts-4.2-fix/cf-processing

[LTS-4.2] Fix processing calculated fields
pull/14922/head
Viacheslav Klimov 4 months ago
committed by GitHub
parent
commit
a5d95f7ca6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 15
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 10
      application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java
  3. 12
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  4. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  5. 93
      application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java
  6. 13
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

15
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -41,6 +41,7 @@ import org.thingsboard.rule.engine.api.notification.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
import org.thingsboard.script.api.js.JsInvokeService;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldException;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
import org.thingsboard.server.cache.limits.RateLimitService;
@ -97,8 +98,8 @@ import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.queue.QueueStatsService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.TbResourceDataCache;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.resource.TbResourceDataCache;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleNodeStateService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
@ -824,6 +825,18 @@ public class ActorSystemContext {
Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
}
public void persistCalculatedFieldDebugError(CalculatedFieldException cfe) {
String message;
if (cfe.getErrorMessage() != null) {
message = cfe.getErrorMessage();
} else if (cfe.getCause() != null) {
message = cfe.getCause().getMessage();
} else {
message = "N/A";
}
persistCalculatedFieldDebugEvent(cfe.getCtx().getTenantId(), cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message);
}
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, String errorMessage) {
if (checkLimits(tenantId)) {
try {

10
application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java

@ -41,15 +41,7 @@ public abstract class AbstractCalculatedFieldActor extends ContextAwareActor {
return doProcessCfMsg(cfm);
} catch (CalculatedFieldException cfe) {
if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) {
String message;
if (cfe.getErrorMessage() != null) {
message = cfe.getErrorMessage();
} else if (cfe.getCause() != null) {
message = cfe.getCause().getMessage();
} else {
message = "N/A";
}
systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message);
systemContext.persistCalculatedFieldDebugError(cfe);
}
cause = cfe.getCause();
} catch (Exception e) {

12
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -66,7 +66,6 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry.getValueForTsRecord;
/**
* @author Andrew Shvayka
*/
@ -131,7 +130,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state.isSizeOk()) {
processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, msg.getCallback());
} else {
throw new RuntimeException(ctx.getSizeExceedsLimitMessage());
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build();
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
@ -200,6 +199,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
throw cfe;
}
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
@ -223,7 +225,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
throw cfe;
if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) {
systemContext.persistCalculatedFieldDebugError(cfe);
}
callback.onSuccess();
return;
}
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -310,7 +310,7 @@ public class CalculatedFieldCtx {
}
public String getSizeExceedsLimitMessage() {
return "Failed to init CF state. State size exceeds limit of " + (maxStateSize / 1024) + "Kb!";
return "State size exceeds limit of " + (maxStateSize / 1024) + "Kb!";
}
}

93
application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java

@ -23,6 +23,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.cf.CalculatedField;
@ -36,7 +37,9 @@ import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedField
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.debug.DebugSettings;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.controller.CalculatedFieldControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
@ -875,6 +878,96 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
});
}
@Test
public void testCalculatedFieldsWhenOneIsInvalid() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
long now = System.currentTimeMillis();
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"a\":5}}", now - TimeUnit.MINUTES.toMillis(3))));
// Script CF - invalid
CalculatedField invalidCF = new CalculatedField();
invalidCF.setEntityId(testDevice.getId());
invalidCF.setType(CalculatedFieldType.SCRIPT);
invalidCF.setName("Script CF");
invalidCF.setDebugSettings(DebugSettings.all());
ScriptCalculatedFieldConfiguration scriptConfig = new ScriptCalculatedFieldConfiguration();
ReferencedEntityKey refEntityKeyA = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null);
Argument argumentA = new Argument();
argumentA.setRefEntityKey(refEntityKeyA);
scriptConfig.setArguments(Map.of("a", argumentA));
scriptConfig.setExpression("""
return {
"temperature": temp
};
""");
Output scriptOutput = new Output();
scriptOutput.setType(OutputType.TIME_SERIES);
scriptConfig.setOutput(scriptOutput);
invalidCF.setConfiguration(scriptConfig);
invalidCF = doPost("/api/calculatedField", invalidCF, CalculatedField.class);
CalculatedFieldId invalidCfId = invalidCF.getId();
await().alias("create invalid CF -> check error").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
PageData<EventInfo> debugEvents = getDebugEvents(tenantId, invalidCfId, 1);
if (!debugEvents.getData().isEmpty()) {
EventInfo eventInfo = debugEvents.getData().get(0);
assertThat(eventInfo.getBody().has("error")).isTrue();
}
});
// Simple CF - valid
CalculatedField validCF = new CalculatedField();
validCF.setEntityId(testDevice.getId());
validCF.setType(CalculatedFieldType.SIMPLE);
validCF.setName("Simple CF");
validCF.setDebugSettings(DebugSettings.all());
SimpleCalculatedFieldConfiguration simpleConfig = new SimpleCalculatedFieldConfiguration();
simpleConfig.setArguments(Map.of("a", argumentA));
simpleConfig.setExpression("a+1");
Output simpleOutput = new Output();
simpleOutput.setName("a+1");
simpleOutput.setType(OutputType.TIME_SERIES);
simpleOutput.setDecimalsByDefault(0);
simpleConfig.setOutput(simpleOutput);
validCF.setConfiguration(simpleConfig);
validCF = doPost("/api/calculatedField", validCF, CalculatedField.class);
CalculatedFieldId validCfId = validCF.getId();
await().alias("create CF -> check initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
PageData<EventInfo> debugEvents = getDebugEvents(tenantId, validCfId, 1);
if (!debugEvents.getData().isEmpty()) {
EventInfo eventInfo = debugEvents.getData().get(0);
assertThat(eventInfo.getBody().has("error")).isFalse();
}
ObjectNode result = getLatestTelemetry(testDevice.getId(), "a+1");
assertThat(result).isNotNull();
assertThat(result.get("a+1").get(0).get("value").asText()).isEqualTo("6");
});
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":6}"));
await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode result = getLatestTelemetry(testDevice.getId(), "a+1");
assertThat(result).isNotNull();
assertThat(result.get("a+1").get(0).get("value").asText()).isEqualTo("7");
});
}
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {
return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class);
}

13
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -80,6 +80,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TbResourceInfo;
@ -99,6 +100,7 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
@ -1298,4 +1300,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
doPost("/api/job/" + jobId + "/reprocess").andExpect(status().isOk());
}
protected PageData<EventInfo> getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception {
return getEvents(tenantId, entityId, EventType.DEBUG_RULE_NODE, limit);
}
protected PageData<EventInfo> getEvents(TenantId tenantId, EntityId entityId, EventType eventType, int limit) throws Exception {
TimePageLink pageLink = new TimePageLink(limit);
return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&",
new TypeReference<PageData<EventInfo>>() {
}, pageLink, entityId.getEntityType(), entityId.getId(), eventType, tenantId.getId());
}
}

Loading…
Cancel
Save