From 287c1fd48a882b9c5ec04ac1145d42b6f82db86a Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Wed, 5 Oct 2022 10:09:01 +0300 Subject: [PATCH] add to msgData --- .../metadata/TbAbstractGetAttributesNode.java | 64 +++++++++++++++---- .../engine/metadata/TbGetAttributesNode.java | 6 +- .../TbGetAttributesNodeConfiguration.java | 2 + .../TbGetDeviceAttrNodeConfiguration.java | 1 + 4 files changed, 56 insertions(+), 17 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index b2c029589c..6f90078cdb 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.json.JsonWriteFeature; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; @@ -25,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParseException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.BooleanUtils; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -57,10 +59,12 @@ public abstract class TbAbstractGetAttributesNode> failuresMap = new ConcurrentHashMap<>(); ListenableFuture> allFutures = Futures.allAsList( - putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap), - putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_"), - putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_"), - putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_") + putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap, msgNewData), + putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_", msgNewData), + putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_", msgNewData), + putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_", msgNewData) ); withCallback(allFutures, i -> { if (!failuresMap.isEmpty()) { throw reportFailures(failuresMap); } - ctx.tellSuccess(msg); + if (fetchToData) { + ObjectNode msgDataNode = (ObjectNode) JacksonUtil.toJsonNode(msg.getData()); + msgDataNode.setAll(msgNewData); + ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(msgDataNode))); + } else { + ctx.tellSuccess(msg); + } }, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); } - private ListenableFuture putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List keys, ConcurrentHashMap> failuresMap, String prefix) { + private ListenableFuture putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List keys, ConcurrentHashMap> failuresMap, String prefix, ObjectNode msgData) { if (CollectionUtils.isEmpty(keys)) { return Futures.immediateFuture(null); } @@ -109,7 +127,13 @@ public abstract class TbAbstractGetAttributesNode { if (!CollectionUtils.isEmpty(attributeKvEntryList)) { List existingAttributesKvEntry = attributeKvEntryList.stream().filter(attributeKvEntry -> keys.contains(attributeKvEntry.getKey())).collect(Collectors.toList()); - existingAttributesKvEntry.forEach(kvEntry -> msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString())); + existingAttributesKvEntry.forEach(kvEntry -> { + if (fetchToData) { + msgData.put(prefix + kvEntry.getKey(), kvEntry.getValueAsString()); + } else { + msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()); + } + }); if (existingAttributesKvEntry.size() != keys.size() && BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) { getNotExistingKeys(existingAttributesKvEntry, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key)); } @@ -122,7 +146,7 @@ public abstract class TbAbstractGetAttributesNode putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List keys, ConcurrentHashMap> failuresMap) { + private ListenableFuture putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List keys, ConcurrentHashMap> failuresMap, ObjectNode msgData) { if (CollectionUtils.isEmpty(keys)) { return Futures.immediateFuture(null); } @@ -134,16 +158,24 @@ public abstract class TbAbstractGetAttributesNode getNotExistingKeys(List existingAttributesKvEntry, List allKeys) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java index 431fe64c09..6388f9c44e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java @@ -34,9 +34,9 @@ import org.thingsboard.server.common.msg.TbMsg; @RuleNode(type = ComponentType.ENRICHMENT, name = "originator attributes", configClazz = TbGetAttributesNodeConfiguration.class, - nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata", - nodeDetails = "If Attributes enrichment configured, CLIENT/SHARED/SERVER attributes are added into Message metadata " + - "with specific prefix: cs/shared/ss. Latest telemetry value added into metadata without prefix. " + + nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata/data", + nodeDetails = "If Attributes enrichment configured, CLIENT/SHARED/SERVER attributes are added into Message metadata or data " + + "with specific prefix: cs/shared/ss. Latest telemetry value added into metadata/data without prefix. " + "To access those attributes in other nodes this template can be used " + "metadata.cs_temperature or metadata.shared_limit ", uiResources = {"static/rulenode/rulenode-core-config.js"}, diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java index 4fc892296f..67766e5b51 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java @@ -35,6 +35,7 @@ public class TbGetAttributesNodeConfiguration implements NodeConfiguration