From 6ba8a390bfaafd5cd724a952261181757796dc89 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 18 Aug 2023 11:59:28 +0200 Subject: [PATCH 1/9] Added functionality to update attributes on value change Volodymyr Babak 06.07.23, 14:57 --- .../engine/telemetry/TbMsgAttributesNode.java | 64 +++++++++++++++---- .../TbMsgAttributesNodeConfiguration.java | 2 + 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index ef49558631..04971abae2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -15,8 +15,12 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.util.Pair; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -25,12 +29,15 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; @@ -70,23 +77,56 @@ public class TbMsgAttributesNode implements TbNode { return; } String src = msg.getData(); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); - if (attributes.isEmpty()) { + List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); + if (newAttributes.isEmpty()) { ctx.tellSuccess(msg); return; } String scope = getScope(msg.getMetaData().getValue(SCOPE)); boolean sendAttributesUpdateNotification = checkSendNotification(scope); - ctx.getTelemetryService().saveAndNotify( - ctx.getTenantId(), - msg.getOriginator(), - scope, - attributes, - checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), - sendAttributesUpdateNotification ? - new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : - new TelemetryNodeCallback(ctx, msg) - ); + ListenableFuture> findFuture; + if (config.isUpdateAttributesOnValueChange()) { + List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); + findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); + } else { + findFuture = Futures.immediateFuture(null); + } + Futures.addCallback(findFuture, new FutureCallback<>() { + @Override + public void onSuccess(List currentAttributes) { + List attributes = newAttributes; + if (config.isUpdateAttributesOnValueChange() + && currentAttributes != null + && !currentAttributes.isEmpty()) { + Set> currentKeyValuePairs = currentAttributes.stream() + .map(item -> Pair.of(item.getKey(), item.getValue())) + .collect(Collectors.toSet()); + attributes = attributes.stream() + .filter(item -> !currentKeyValuePairs.contains(Pair.of(item.getKey(), item.getValue()))) + .collect(Collectors.toList()); + } + if (attributes.isEmpty()) { + ctx.tellSuccess(msg); + } else { + ctx.getTelemetryService().saveAndNotify( + ctx.getTenantId(), + msg.getOriginator(), + scope, + attributes, + checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), + sendAttributesUpdateNotification ? + new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : + new TelemetryNodeCallback(ctx, msg) + ); + } + } + + @Override + public void onFailure(Throwable throwable) { + ctx.tellFailure(msg, throwable); + } + }, ctx.getDbCallbackExecutor()); + } private boolean checkSendNotification(String scope) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java index dd6140c4b5..6512c45cdf 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java @@ -26,6 +26,7 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration Date: Thu, 6 Jul 2023 16:35:36 +0200 Subject: [PATCH 2/9] save attribute node will skip attributes overwrites - refactor and filter improvement, description added --- .../engine/telemetry/TbMsgAttributesNode.java | 86 +++++++++++-------- 1 file changed, 51 insertions(+), 35 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 04971abae2..c63af3262d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -18,9 +18,9 @@ package org.thingsboard.rule.engine.telemetry; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.util.Pair; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -36,7 +36,9 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; @@ -53,7 +55,8 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " + "If upsert(update/insert) operation is completed successfully rule node will send the incoming message via Success chain, otherwise, Failure chain is used. " + "Additionally if checkbox Send attributes updated notification is set to true, rule node will put the \"Attributes Updated\" " + - "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue.", + "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue." + + "Performance checkbox 'Update Attributes On Value Change' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes).", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeAttributesConfig", icon = "file_upload" @@ -84,49 +87,62 @@ public class TbMsgAttributesNode implements TbNode { } String scope = getScope(msg.getMetaData().getValue(SCOPE)); boolean sendAttributesUpdateNotification = checkSendNotification(scope); - ListenableFuture> findFuture; - if (config.isUpdateAttributesOnValueChange()) { - List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); - findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); - } else { - findFuture = Futures.immediateFuture(null); + + if (!config.isUpdateAttributesOnValueChange()) { + saveAttr(newAttributes, ctx, msg, scope, sendAttributesUpdateNotification); + return; } + + List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); + ListenableFuture> findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); + Futures.addCallback(findFuture, new FutureCallback<>() { @Override public void onSuccess(List currentAttributes) { - List attributes = newAttributes; - if (config.isUpdateAttributesOnValueChange() - && currentAttributes != null - && !currentAttributes.isEmpty()) { - Set> currentKeyValuePairs = currentAttributes.stream() - .map(item -> Pair.of(item.getKey(), item.getValue())) - .collect(Collectors.toSet()); - attributes = attributes.stream() - .filter(item -> !currentKeyValuePairs.contains(Pair.of(item.getKey(), item.getValue()))) - .collect(Collectors.toList()); - } - if (attributes.isEmpty()) { - ctx.tellSuccess(msg); - } else { - ctx.getTelemetryService().saveAndNotify( - ctx.getTenantId(), - msg.getOriginator(), - scope, - attributes, - checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), - sendAttributesUpdateNotification ? - new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : - new TelemetryNodeCallback(ctx, msg) - ); - } + List attributesChanged = filterChangedAttr(currentAttributes, newAttributes); + saveAttr(attributesChanged, ctx, msg, scope, sendAttributesUpdateNotification); } @Override public void onFailure(Throwable throwable) { ctx.tellFailure(msg, throwable); } - }, ctx.getDbCallbackExecutor()); + }, MoreExecutors.directExecutor()); + } + + void saveAttr(List attributes, TbContext ctx, TbMsg msg, String scope, boolean sendAttributesUpdateNotification) { + if (attributes.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + ctx.getTelemetryService().saveAndNotify( + ctx.getTenantId(), + msg.getOriginator(), + scope, + attributes, + checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), + sendAttributesUpdateNotification ? + new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : + new TelemetryNodeCallback(ctx, msg) + ); + } + + List filterChangedAttr(List currentAttributes, List newAttributes) { + if (currentAttributes == null || currentAttributes.isEmpty()) { + return newAttributes; + } + + Map currentAttrMap = currentAttributes.stream() + .collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing)); + return newAttributes.stream() + .filter(item -> { + AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey()); + return cacheAttr == null + || !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type + || !Objects.equals(item.getDataType(), cacheAttr.getDataType()); + }) + .collect(Collectors.toList()); } private boolean checkSendNotification(String scope) { From 1569ac6dafb4bf6ddbb2bcd257fbaaeacbfa2bc6 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 18 Aug 2023 12:16:23 +0200 Subject: [PATCH 3/9] updateAttributesOnValueChange is true for rule chain templates --- .../data/json/edge/rule_chains/edge_root_rule_chain.json | 4 +++- .../data/json/tenant/device_profile/rule_chain_template.json | 5 ++++- .../main/data/json/tenant/rule_chains/root_rule_chain.json | 4 +++- .../engine/telemetry/TbMsgAttributesNodeConfiguration.java | 1 + 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json index ec1341cc71..8d3c9a9af4 100644 --- a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json @@ -50,7 +50,9 @@ "debugMode": false, "configuration": { "scope": "CLIENT_SCOPE", - "notifyDevice": "false" + "notifyDevice": "false", + "sendAttributesUpdatedNotification": "false", + "updateAttributesOnValueChange": "true" }, "externalId": null }, diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index 4776ef2aae..a11cba1b9e 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -33,7 +33,10 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { - "scope": "CLIENT_SCOPE" + "scope": "CLIENT_SCOPE", + "notifyDevice": "false", + "sendAttributesUpdatedNotification": "false", + "updateAttributesOnValueChange": "true" } }, { diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index 88ef27e2f8..faf411680e 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -33,7 +33,9 @@ "debugMode": false, "configuration": { "scope": "CLIENT_SCOPE", - "notifyDevice": "false" + "notifyDevice": "false", + "sendAttributesUpdatedNotification": "false", + "updateAttributesOnValueChange": "true" } }, { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java index 6512c45cdf..7da27e7fe9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java @@ -34,6 +34,7 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration Date: Mon, 4 Sep 2023 16:51:41 +0200 Subject: [PATCH 4/9] TbMsgAttributesNode implements TbVersionedNode with upgrade method and tests from version 0 --- .../engine/telemetry/TbMsgAttributesNode.java | 23 +++++- .../telemetry/TbMsgAttributesNodeTest.java | 73 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index c63af3262d..79cbf87274 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -26,11 +28,13 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbVersionedNode; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.transport.adaptor.JsonConverter; @@ -51,6 +55,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R type = ComponentType.ACTION, name = "save attributes", configClazz = TbMsgAttributesNodeConfiguration.class, + version = 1, nodeDescription = "Saves attributes data", nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " + "If upsert(update/insert) operation is completed successfully rule node will send the incoming message via Success chain, otherwise, Failure chain is used. " + @@ -61,8 +66,9 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R configDirective = "tbActionNodeAttributesConfig", icon = "file_upload" ) -public class TbMsgAttributesNode implements TbNode { +public class TbMsgAttributesNode implements TbNode, TbVersionedNode { + static final String UPDATE_ATTRIBUTES_ON_VALUE_CHANGE_KEY = "updateAttributesOnValueChange"; private TbMsgAttributesNodeConfiguration config; @Override @@ -160,4 +166,19 @@ public class TbMsgAttributesNode implements TbNode { return config.getScope(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (!oldConfiguration.has(UPDATE_ATTRIBUTES_ON_VALUE_CHANGE_KEY)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(UPDATE_ATTRIBUTES_ON_VALUE_CHANGE_KEY, false); + } + break; + default: + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java new file mode 100644 index 0000000000..28c762c107 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.util.TbPair; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.Mockito.mock; + +@Slf4j +class TbMsgAttributesNodeTest { + + @Test + void testUpgrade_fromVersion0() throws TbNodeException { + final String updateAttributesOnValueChangeKey = "updateAttributesOnValueChange"; + TbMsgAttributesNode node = mock(TbMsgAttributesNode.class); + willCallRealMethod().given(node).upgrade(anyInt(), any()); + + ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration()); + jsonNode.remove(updateAttributesOnValueChangeKey); + assertThat(jsonNode.has(updateAttributesOnValueChangeKey)).as("pre condition has no " + updateAttributesOnValueChangeKey).isFalse(); + + TbPair upgradeResult = node.upgrade(0, jsonNode); + + ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isTrue(); + assertThat(resultNode.has(updateAttributesOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnValueChangeKey).isTrue(); + assertThat(resultNode.get(updateAttributesOnValueChangeKey).asBoolean()).as("upgrade result value [false] for key " + updateAttributesOnValueChangeKey).isFalse(); + } + + @Test + void testUpgrade_fromVersion0_alreadyHasUpdateAttributesOnValueChange() throws TbNodeException { + final String updateAttributesOnValueChangeKey = "updateAttributesOnValueChange"; + TbMsgAttributesNode node = mock(TbMsgAttributesNode.class); + willCallRealMethod().given(node).upgrade(anyInt(), any()); + + ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration()); + jsonNode.remove(updateAttributesOnValueChangeKey); + jsonNode.put(updateAttributesOnValueChangeKey, true); + assertThat(jsonNode.has(updateAttributesOnValueChangeKey)).as("pre condition has no " + updateAttributesOnValueChangeKey).isTrue(); + assertThat(jsonNode.get(updateAttributesOnValueChangeKey).asBoolean()).as("pre condition has [true] for key " + updateAttributesOnValueChangeKey).isTrue(); + + TbPair upgradeResult = node.upgrade(0, jsonNode); + + ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isFalse(); + assertThat(resultNode.has(updateAttributesOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnValueChangeKey).isTrue(); + assertThat(resultNode.get(updateAttributesOnValueChangeKey).asBoolean()).as("upgrade result value [true] for key " + updateAttributesOnValueChangeKey).isTrue(); + } + +} From 125385b58245c0467ce5eb441dc574ca0bfda237 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 4 Sep 2023 17:35:24 +0200 Subject: [PATCH 5/9] TbMsgAttributesNode default config setUpdateAttributesOnValueChange(true), rule-chains template version 1 --- .../rule_chains/edge_root_rule_chain.json | 1 + .../device_profile/rule_chain_template.json | 1 + .../tenant/rule_chains/root_rule_chain.json | 1 + .../TbMsgAttributesNodeConfiguration.java | 4 +-- .../TbMsgAttributesNodeConfigurationTest.java | 29 +++++++++++++++++++ 5 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfigurationTest.java diff --git a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json index 8d3c9a9af4..47a811d402 100644 --- a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json @@ -49,6 +49,7 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { + "version": 1, "scope": "CLIENT_SCOPE", "notifyDevice": "false", "sendAttributesUpdatedNotification": "false", diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index a11cba1b9e..7e1325d412 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -33,6 +33,7 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { + "version": 1, "scope": "CLIENT_SCOPE", "notifyDevice": "false", "sendAttributesUpdatedNotification": "false", diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index faf411680e..b850de3e70 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -32,6 +32,7 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { + "version": 1, "scope": "CLIENT_SCOPE", "notifyDevice": "false", "sendAttributesUpdatedNotification": "false", diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java index 7da27e7fe9..1c3112c4de 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java @@ -34,8 +34,8 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration Date: Tue, 5 Sep 2023 17:06:09 +0200 Subject: [PATCH 6/9] TbMsgAttributesNode updateAttributesOnValueChange renamed with updateAttributesOnlyOnValueChange --- .../rule_chains/edge_root_rule_chain.json | 2 +- .../device_profile/rule_chain_template.json | 2 +- .../tenant/rule_chains/root_rule_chain.json | 2 +- .../engine/telemetry/TbMsgAttributesNode.java | 10 +++---- .../TbMsgAttributesNodeConfiguration.java | 4 +-- .../TbMsgAttributesNodeConfigurationTest.java | 4 +-- .../telemetry/TbMsgAttributesNodeTest.java | 27 ++++++++++--------- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json index 47a811d402..de71ced3a5 100644 --- a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json @@ -53,7 +53,7 @@ "scope": "CLIENT_SCOPE", "notifyDevice": "false", "sendAttributesUpdatedNotification": "false", - "updateAttributesOnValueChange": "true" + "updateAttributesOnlyOnValueChange": "true" }, "externalId": null }, diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index 7e1325d412..7d1deb7e41 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -37,7 +37,7 @@ "scope": "CLIENT_SCOPE", "notifyDevice": "false", "sendAttributesUpdatedNotification": "false", - "updateAttributesOnValueChange": "true" + "updateAttributesOnlyOnValueChange": "true" } }, { diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index b850de3e70..57c3a86dfb 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -36,7 +36,7 @@ "scope": "CLIENT_SCOPE", "notifyDevice": "false", "sendAttributesUpdatedNotification": "false", - "updateAttributesOnValueChange": "true" + "updateAttributesOnlyOnValueChange": "true" } }, { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 79cbf87274..923abe444c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -61,14 +61,14 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R "If upsert(update/insert) operation is completed successfully rule node will send the incoming message via Success chain, otherwise, Failure chain is used. " + "Additionally if checkbox Send attributes updated notification is set to true, rule node will put the \"Attributes Updated\" " + "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue." + - "Performance checkbox 'Update Attributes On Value Change' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes).", + "Performance checkbox 'Save attributes only if the value changes' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes).", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeAttributesConfig", icon = "file_upload" ) public class TbMsgAttributesNode implements TbNode, TbVersionedNode { - static final String UPDATE_ATTRIBUTES_ON_VALUE_CHANGE_KEY = "updateAttributesOnValueChange"; + static final String UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY = "updateAttributesOnlyOnValueChange"; private TbMsgAttributesNodeConfiguration config; @Override @@ -94,7 +94,7 @@ public class TbMsgAttributesNode implements TbNode, TbVersionedNode { String scope = getScope(msg.getMetaData().getValue(SCOPE)); boolean sendAttributesUpdateNotification = checkSendNotification(scope); - if (!config.isUpdateAttributesOnValueChange()) { + if (!config.isUpdateAttributesOnlyOnValueChange()) { saveAttr(newAttributes, ctx, msg, scope, sendAttributesUpdateNotification); return; } @@ -171,9 +171,9 @@ public class TbMsgAttributesNode implements TbNode, TbVersionedNode { boolean hasChanges = false; switch (fromVersion) { case 0: - if (!oldConfiguration.has(UPDATE_ATTRIBUTES_ON_VALUE_CHANGE_KEY)) { + if (!oldConfiguration.has(UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY)) { hasChanges = true; - ((ObjectNode) oldConfiguration).put(UPDATE_ATTRIBUTES_ON_VALUE_CHANGE_KEY, false); + ((ObjectNode) oldConfiguration).put(UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY, false); } break; default: diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java index 1c3112c4de..1dd98feb16 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java @@ -26,7 +26,7 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration upgradeResult = node.upgrade(0, jsonNode); ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond(); assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isTrue(); - assertThat(resultNode.has(updateAttributesOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnValueChangeKey).isTrue(); - assertThat(resultNode.get(updateAttributesOnValueChangeKey).asBoolean()).as("upgrade result value [false] for key " + updateAttributesOnValueChangeKey).isFalse(); + assertThat(resultNode.has(updateAttributesOnlyOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnlyOnValueChangeKey).isTrue(); + assertThat(resultNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("upgrade result value [false] for key " + updateAttributesOnlyOnValueChangeKey).isFalse(); } @Test - void testUpgrade_fromVersion0_alreadyHasUpdateAttributesOnValueChange() throws TbNodeException { - final String updateAttributesOnValueChangeKey = "updateAttributesOnValueChange"; + void testUpgrade_fromVersion0_alreadyHasupdateAttributesOnlyOnValueChange() throws TbNodeException { TbMsgAttributesNode node = mock(TbMsgAttributesNode.class); willCallRealMethod().given(node).upgrade(anyInt(), any()); ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration()); - jsonNode.remove(updateAttributesOnValueChangeKey); - jsonNode.put(updateAttributesOnValueChangeKey, true); - assertThat(jsonNode.has(updateAttributesOnValueChangeKey)).as("pre condition has no " + updateAttributesOnValueChangeKey).isTrue(); - assertThat(jsonNode.get(updateAttributesOnValueChangeKey).asBoolean()).as("pre condition has [true] for key " + updateAttributesOnValueChangeKey).isTrue(); + jsonNode.remove(updateAttributesOnlyOnValueChangeKey); + jsonNode.put(updateAttributesOnlyOnValueChangeKey, true); + assertThat(jsonNode.has(updateAttributesOnlyOnValueChangeKey)).as("pre condition has no " + updateAttributesOnlyOnValueChangeKey).isTrue(); + assertThat(jsonNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("pre condition has [true] for key " + updateAttributesOnlyOnValueChangeKey).isTrue(); TbPair upgradeResult = node.upgrade(0, jsonNode); ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond(); assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isFalse(); - assertThat(resultNode.has(updateAttributesOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnValueChangeKey).isTrue(); - assertThat(resultNode.get(updateAttributesOnValueChangeKey).asBoolean()).as("upgrade result value [true] for key " + updateAttributesOnValueChangeKey).isTrue(); + assertThat(resultNode.has(updateAttributesOnlyOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnlyOnValueChangeKey).isTrue(); + assertThat(resultNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("upgrade result value [true] for key " + updateAttributesOnlyOnValueChangeKey).isTrue(); } } From c2d17b631b06a251d871d9a544a32ddfb804a849 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 5 Sep 2023 17:07:11 +0200 Subject: [PATCH 7/9] TbMsgAttributesNode implements TbVersionedNode --- .../thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 923abe444c..07ccad8b48 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -25,7 +25,6 @@ import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbVersionedNode; @@ -66,7 +65,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R configDirective = "tbActionNodeAttributesConfig", icon = "file_upload" ) -public class TbMsgAttributesNode implements TbNode, TbVersionedNode { +public class TbMsgAttributesNode implements TbVersionedNode { static final String UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY = "updateAttributesOnlyOnValueChange"; private TbMsgAttributesNodeConfiguration config; From 9e5d2635bc3b1501836d7e31e0d6b4eb13b27dc3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 5 Sep 2023 18:02:04 +0200 Subject: [PATCH 8/9] TbMsgAttributesNode refactored as suggested Shvaika Dmytro --- .../engine/telemetry/TbMsgAttributesNode.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 07ccad8b48..0ae2a89abf 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -17,12 +17,11 @@ package org.thingsboard.rule.engine.telemetry; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -101,18 +100,13 @@ public class TbMsgAttributesNode implements TbVersionedNode { List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); ListenableFuture> findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); - Futures.addCallback(findFuture, new FutureCallback<>() { - @Override - public void onSuccess(List currentAttributes) { - List attributesChanged = filterChangedAttr(currentAttributes, newAttributes); - saveAttr(attributesChanged, ctx, msg, scope, sendAttributesUpdateNotification); - } - - @Override - public void onFailure(Throwable throwable) { - ctx.tellFailure(msg, throwable); - } - }, MoreExecutors.directExecutor()); + DonAsynchron.withCallback(findFuture, + currentAttributes -> { + List attributesChanged = filterChangedAttr(currentAttributes, newAttributes); + saveAttr(attributesChanged, ctx, msg, scope, sendAttributesUpdateNotification); + }, + throwable -> ctx.tellFailure(msg, throwable), + MoreExecutors.directExecutor()); } void saveAttr(List attributes, TbContext ctx, TbMsg msg, String scope, boolean sendAttributesUpdateNotification) { @@ -176,6 +170,7 @@ public class TbMsgAttributesNode implements TbVersionedNode { } break; default: + break; } return new TbPair<>(hasChanges, oldConfiguration); } From 21ce51c50dba812967f9c664131aad4ad64cec13 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 5 Sep 2023 18:04:56 +0200 Subject: [PATCH 9/9] TbMsgAttributesNode tests added for filterChangedAttr method --- .../telemetry/TbMsgAttributesNodeTest.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java index 8f01a0062e..818c1bb97f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java @@ -21,19 +21,88 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.util.TbPair; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.willCallRealMethod; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; @Slf4j class TbMsgAttributesNodeTest { final String updateAttributesOnlyOnValueChangeKey = "updateAttributesOnlyOnValueChange"; + @Test + void testFilterChangedAttr_whenCurrentAttributesEmpty_thenReturnNewAttributes() { + TbMsgAttributesNode node = spy(TbMsgAttributesNode.class); + List newAttributes = new ArrayList<>(); + + List filtered = node.filterChangedAttr(Collections.emptyList(), newAttributes); + assertThat(filtered).isSameAs(newAttributes); + } + + @Test + void testFilterChangedAttr_whenCurrentAttributesContainsInAnyOrderNewAttributes_thenReturnEmptyList() { + TbMsgAttributesNode node = spy(TbMsgAttributesNode.class); + List currentAttributes = List.of( + new BaseAttributeKvEntry(1694000000L, new StringDataEntry("address", "Peremohy ave 1")), + new BaseAttributeKvEntry(1694000000L, new BooleanDataEntry("valid", true)), + new BaseAttributeKvEntry(1694000000L, new LongDataEntry("counter", 100L)), + new BaseAttributeKvEntry(1694000000L, new DoubleDataEntry("temp", -18.35)), + new BaseAttributeKvEntry(1694000000L, new JsonDataEntry("json", "{\"warning\":\"out of paper\"}")) + ); + List newAttributes = new ArrayList<>(currentAttributes); + newAttributes.add(newAttributes.get(0)); + newAttributes.remove(0); + assertThat(newAttributes).hasSize(currentAttributes.size()); + assertThat(currentAttributes).isNotEmpty(); + assertThat(newAttributes).containsExactlyInAnyOrderElementsOf(currentAttributes); + + List filtered = node.filterChangedAttr(currentAttributes, newAttributes); + assertThat(filtered).isEmpty(); //no changes + } + + @Test + void testFilterChangedAttr_whenCurrentAttributesContainsInAnyOrderNewAttributes_thenReturnExpectedList() { + TbMsgAttributesNode node = spy(TbMsgAttributesNode.class); + List currentAttributes = List.of( + new BaseAttributeKvEntry(1694000000L, new StringDataEntry("address", "Peremohy ave 1")), + new BaseAttributeKvEntry(1694000000L, new BooleanDataEntry("valid", true)), + new BaseAttributeKvEntry(1694000000L, new LongDataEntry("counter", 100L)), + new BaseAttributeKvEntry(1694000000L, new DoubleDataEntry("temp", -18.35)), + new BaseAttributeKvEntry(1694000000L, new JsonDataEntry("json", "{\"warning\":\"out of paper\"}")) + ); + List newAttributes = List.of( + new BaseAttributeKvEntry(1694000999L, new JsonDataEntry("json", "{\"status\":\"OK\"}")), // value changed, reordered + new BaseAttributeKvEntry(1694000999L, new StringDataEntry("valid", "true")), //type changed + new BaseAttributeKvEntry(1694000999L, new LongDataEntry("counter", 101L)), //value changed + new BaseAttributeKvEntry(1694000999L, new DoubleDataEntry("temp", -18.35)), + new BaseAttributeKvEntry(1694000999L, new StringDataEntry("address", "Peremohy ave 1")) // reordered + ); + List expected = List.of( + new BaseAttributeKvEntry(1694000999L, new StringDataEntry("valid", "true")), + new BaseAttributeKvEntry(1694000999L, new LongDataEntry("counter", 101L)), + new BaseAttributeKvEntry(1694000999L, new JsonDataEntry("json", "{\"status\":\"OK\"}")) + ); + + List filtered = node.filterChangedAttr(currentAttributes, newAttributes); + assertThat(filtered).containsExactlyInAnyOrderElementsOf(expected); + } + @Test void testUpgrade_fromVersion0() throws TbNodeException {