diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index c1ffb10e79..fc92b70e07 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -93,10 +93,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS @Override public void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback) { - var tenantId = request.getTenantId(); - var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), - () -> toCalculatedFieldTelemetryMsgProto(request), callback); + pushRequestToQueue(request, null, callback); } @Override @@ -109,10 +106,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS @Override public void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback) { - var tenantId = request.getTenantId(); - var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), - () -> toCalculatedFieldTelemetryMsgProto(request), callback); + pushRequestToQueue(request, null, callback); } @Override @@ -170,10 +164,6 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS }; } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request) { - return toCalculatedFieldTelemetryMsgProto(request, null); - } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); @@ -194,10 +184,6 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS return msg.build(); } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request) { - return toCalculatedFieldTelemetryMsgProto(request, null); - } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index ac0e61d9be..1f2c7f135d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -16,7 +16,6 @@ package org.thingsboard.rule.engine.telemetry; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.gson.JsonParser; import jakarta.annotation.Nullable; @@ -43,24 +42,23 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.thingsboard.server.common.data.DataConstants.SCOPE; -import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; -import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; -import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; -import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_DELETED; @Slf4j @RuleNode( type = ComponentType.ACTION, - name = "push to calculated fields", + name = "apply to calculated fields", configClazz = EmptyNodeConfiguration.class, - nodeDescription = "Pushes messages to the calculated fields for further processing", - nodeDetails = "Forwards incoming messages to the calculated fields, where they will be processed to compute values based on predefined calculation rules without persisting any data in the database.", + nodeDescription = "Processes incoming messages for calculated fields", + nodeDetails = "This node processes incoming messages to update telemetry or attributes for predefined calculated fields without storing the original telemetry or attributes in the database. " + + "It ensures that calculated fields receive and process the necessary data without persisting the incoming values.", configDirective = "tbNodeEmptyConfig", - icon = "send" + icon = "call_made" ) public class TbCalculatedFieldsNode implements TbNode { @@ -73,29 +71,20 @@ public class TbCalculatedFieldsNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (!msg.isTypeOneOf(POST_ATTRIBUTES_REQUEST, POST_TELEMETRY_REQUEST, ATTRIBUTES_DELETED, TIMESERIES_DELETED)) { - ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); - return; - } - - String src = msg.getData(); - - if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { - processPostTelemetryRequest(ctx, msg, src); - } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { - processPostAttributesRequest(ctx, msg, src); - } else if (msg.isTypeOf(TIMESERIES_DELETED)) { - processTimeSeriesDeleted(ctx, msg, src); - } else { - processAttributesDeleted(ctx, msg, src); + switch (msg.getInternalType()) { + case POST_TELEMETRY_REQUEST -> processPostTelemetryRequest(ctx, msg); + case POST_ATTRIBUTES_REQUEST -> processPostAttributesRequest(ctx, msg); + case TIMESERIES_DELETED -> processTimeSeriesDeleted(ctx, msg); + case ATTRIBUTES_DELETED -> processAttributesDeleted(ctx, msg); + default -> ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); } - } - private void processPostTelemetryRequest(TbContext ctx, TbMsg msg, String src) { - Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), System.currentTimeMillis()); + private void processPostTelemetryRequest(TbContext ctx, TbMsg msg) { + Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(msg.getData()), System.currentTimeMillis()); + if (tsKvMap.isEmpty()) { - ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src)); + ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + msg.getData())); return; } @@ -120,8 +109,8 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); } - private void processPostAttributesRequest(TbContext ctx, TbMsg msg, String src) { - List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); + private void processPostAttributesRequest(TbContext ctx, TbMsg msg) { + List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()))); if (newAttributes.isEmpty()) { ctx.tellSuccess(msg); @@ -141,10 +130,11 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); } - private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg, String src) { - JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); - List keysToDelete = JacksonUtil.convertValue(dataJson.get("timeSeries"), new TypeReference<>() { - }); + private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg) { + List keysToDelete = Optional.ofNullable( + JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("timeseries"), new TypeReference>() { + }) + ).orElse(Collections.emptyList()); if (keysToDelete.isEmpty()) { ctx.tellSuccess(msg); @@ -161,10 +151,12 @@ public class TbCalculatedFieldsNode implements TbNode { .callback(new FutureCallback>() { @Override public void onSuccess(@Nullable List tmp) { + ctx.tellSuccess(msg); } @Override public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); } }) .build(); @@ -172,10 +164,11 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete)); } - private void processAttributesDeleted(TbContext ctx, TbMsg msg, String src) { - JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); - List keysToDelete = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { - }); + private void processAttributesDeleted(TbContext ctx, TbMsg msg) { + List keysToDelete = Optional.ofNullable( + JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("attributes"), new TypeReference>() { + }) + ).orElse(Collections.emptyList()); if (keysToDelete.isEmpty()) { ctx.tellSuccess(msg);