diff --git a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json index ec1341cc71..de71ced3a5 100644 --- a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json @@ -49,8 +49,11 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { + "version": 1, "scope": "CLIENT_SCOPE", - "notifyDevice": "false" + "notifyDevice": "false", + "sendAttributesUpdatedNotification": "false", + "updateAttributesOnlyOnValueChange": "true" }, "externalId": null }, diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index 4776ef2aae..7d1deb7e41 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -33,7 +33,11 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { - "scope": "CLIENT_SCOPE" + "version": 1, + "scope": "CLIENT_SCOPE", + "notifyDevice": "false", + "sendAttributesUpdatedNotification": "false", + "updateAttributesOnlyOnValueChange": "true" } }, { diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index 88ef27e2f8..57c3a86dfb 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -32,8 +32,11 @@ "name": "Save Client Attributes", "debugMode": false, "configuration": { + "version": 1, "scope": "CLIENT_SCOPE", - "notifyDevice": "false" + "notifyDevice": "false", + "sendAttributesUpdatedNotification": "false", + "updateAttributesOnlyOnValueChange": "true" } }, { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index ef49558631..0ae2a89abf 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -15,22 +15,33 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbVersionedNode; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; @@ -42,17 +53,20 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R type = ComponentType.ACTION, name = "save attributes", configClazz = TbMsgAttributesNodeConfiguration.class, + version = 1, nodeDescription = "Saves attributes data", nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " + "If upsert(update/insert) operation is completed successfully rule node will send the incoming message via Success chain, otherwise, Failure chain is used. " + "Additionally if checkbox Send attributes updated notification is set to true, rule node will put the \"Attributes Updated\" " + - "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue.", + "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue." + + "Performance checkbox 'Save attributes only if the value changes' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes).", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeAttributesConfig", icon = "file_upload" ) -public class TbMsgAttributesNode implements TbNode { +public class TbMsgAttributesNode implements TbVersionedNode { + static final String UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY = "updateAttributesOnlyOnValueChange"; private TbMsgAttributesNodeConfiguration config; @Override @@ -70,13 +84,36 @@ public class TbMsgAttributesNode implements TbNode { return; } String src = msg.getData(); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); - if (attributes.isEmpty()) { + List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); + if (newAttributes.isEmpty()) { ctx.tellSuccess(msg); return; } String scope = getScope(msg.getMetaData().getValue(SCOPE)); boolean sendAttributesUpdateNotification = checkSendNotification(scope); + + if (!config.isUpdateAttributesOnlyOnValueChange()) { + saveAttr(newAttributes, ctx, msg, scope, sendAttributesUpdateNotification); + return; + } + + List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); + ListenableFuture> findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); + + DonAsynchron.withCallback(findFuture, + currentAttributes -> { + List attributesChanged = filterChangedAttr(currentAttributes, newAttributes); + saveAttr(attributesChanged, ctx, msg, scope, sendAttributesUpdateNotification); + }, + throwable -> ctx.tellFailure(msg, throwable), + MoreExecutors.directExecutor()); + } + + void saveAttr(List attributes, TbContext ctx, TbMsg msg, String scope, boolean sendAttributesUpdateNotification) { + if (attributes.isEmpty()) { + ctx.tellSuccess(msg); + return; + } ctx.getTelemetryService().saveAndNotify( ctx.getTenantId(), msg.getOriginator(), @@ -89,6 +126,24 @@ public class TbMsgAttributesNode implements TbNode { ); } + List filterChangedAttr(List currentAttributes, List newAttributes) { + if (currentAttributes == null || currentAttributes.isEmpty()) { + return newAttributes; + } + + Map currentAttrMap = currentAttributes.stream() + .collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing)); + + return newAttributes.stream() + .filter(item -> { + AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey()); + return cacheAttr == null + || !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type + || !Objects.equals(item.getDataType(), cacheAttr.getDataType()); + }) + .collect(Collectors.toList()); + } + private boolean checkSendNotification(String scope) { return config.isSendAttributesUpdatedNotification() && !CLIENT_SCOPE.equals(scope); } @@ -104,4 +159,20 @@ public class TbMsgAttributesNode implements TbNode { return config.getScope(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (!oldConfiguration.has(UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY, false); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java index dd6140c4b5..1dd98feb16 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java @@ -26,6 +26,7 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration newAttributes = new ArrayList<>(); + + List filtered = node.filterChangedAttr(Collections.emptyList(), newAttributes); + assertThat(filtered).isSameAs(newAttributes); + } + + @Test + void testFilterChangedAttr_whenCurrentAttributesContainsInAnyOrderNewAttributes_thenReturnEmptyList() { + TbMsgAttributesNode node = spy(TbMsgAttributesNode.class); + List currentAttributes = List.of( + new BaseAttributeKvEntry(1694000000L, new StringDataEntry("address", "Peremohy ave 1")), + new BaseAttributeKvEntry(1694000000L, new BooleanDataEntry("valid", true)), + new BaseAttributeKvEntry(1694000000L, new LongDataEntry("counter", 100L)), + new BaseAttributeKvEntry(1694000000L, new DoubleDataEntry("temp", -18.35)), + new BaseAttributeKvEntry(1694000000L, new JsonDataEntry("json", "{\"warning\":\"out of paper\"}")) + ); + List newAttributes = new ArrayList<>(currentAttributes); + newAttributes.add(newAttributes.get(0)); + newAttributes.remove(0); + assertThat(newAttributes).hasSize(currentAttributes.size()); + assertThat(currentAttributes).isNotEmpty(); + assertThat(newAttributes).containsExactlyInAnyOrderElementsOf(currentAttributes); + + List filtered = node.filterChangedAttr(currentAttributes, newAttributes); + assertThat(filtered).isEmpty(); //no changes + } + + @Test + void testFilterChangedAttr_whenCurrentAttributesContainsInAnyOrderNewAttributes_thenReturnExpectedList() { + TbMsgAttributesNode node = spy(TbMsgAttributesNode.class); + List currentAttributes = List.of( + new BaseAttributeKvEntry(1694000000L, new StringDataEntry("address", "Peremohy ave 1")), + new BaseAttributeKvEntry(1694000000L, new BooleanDataEntry("valid", true)), + new BaseAttributeKvEntry(1694000000L, new LongDataEntry("counter", 100L)), + new BaseAttributeKvEntry(1694000000L, new DoubleDataEntry("temp", -18.35)), + new BaseAttributeKvEntry(1694000000L, new JsonDataEntry("json", "{\"warning\":\"out of paper\"}")) + ); + List newAttributes = List.of( + new BaseAttributeKvEntry(1694000999L, new JsonDataEntry("json", "{\"status\":\"OK\"}")), // value changed, reordered + new BaseAttributeKvEntry(1694000999L, new StringDataEntry("valid", "true")), //type changed + new BaseAttributeKvEntry(1694000999L, new LongDataEntry("counter", 101L)), //value changed + new BaseAttributeKvEntry(1694000999L, new DoubleDataEntry("temp", -18.35)), + new BaseAttributeKvEntry(1694000999L, new StringDataEntry("address", "Peremohy ave 1")) // reordered + ); + List expected = List.of( + new BaseAttributeKvEntry(1694000999L, new StringDataEntry("valid", "true")), + new BaseAttributeKvEntry(1694000999L, new LongDataEntry("counter", 101L)), + new BaseAttributeKvEntry(1694000999L, new JsonDataEntry("json", "{\"status\":\"OK\"}")) + ); + + List filtered = node.filterChangedAttr(currentAttributes, newAttributes); + assertThat(filtered).containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + void testUpgrade_fromVersion0() throws TbNodeException { + + TbMsgAttributesNode node = mock(TbMsgAttributesNode.class); + willCallRealMethod().given(node).upgrade(anyInt(), any()); + + ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration()); + jsonNode.remove(updateAttributesOnlyOnValueChangeKey); + assertThat(jsonNode.has(updateAttributesOnlyOnValueChangeKey)).as("pre condition has no " + updateAttributesOnlyOnValueChangeKey).isFalse(); + + TbPair upgradeResult = node.upgrade(0, jsonNode); + + ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isTrue(); + assertThat(resultNode.has(updateAttributesOnlyOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnlyOnValueChangeKey).isTrue(); + assertThat(resultNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("upgrade result value [false] for key " + updateAttributesOnlyOnValueChangeKey).isFalse(); + } + + @Test + void testUpgrade_fromVersion0_alreadyHasupdateAttributesOnlyOnValueChange() throws TbNodeException { + TbMsgAttributesNode node = mock(TbMsgAttributesNode.class); + willCallRealMethod().given(node).upgrade(anyInt(), any()); + + ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration()); + jsonNode.remove(updateAttributesOnlyOnValueChangeKey); + jsonNode.put(updateAttributesOnlyOnValueChangeKey, true); + assertThat(jsonNode.has(updateAttributesOnlyOnValueChangeKey)).as("pre condition has no " + updateAttributesOnlyOnValueChangeKey).isTrue(); + assertThat(jsonNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("pre condition has [true] for key " + updateAttributesOnlyOnValueChangeKey).isTrue(); + + TbPair upgradeResult = node.upgrade(0, jsonNode); + + ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isFalse(); + assertThat(resultNode.has(updateAttributesOnlyOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnlyOnValueChangeKey).isTrue(); + assertThat(resultNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("upgrade result value [true] for key " + updateAttributesOnlyOnValueChangeKey).isTrue(); + } + +}