From cefc6925c196edec8989f3c42026792c9dc39f1c Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 20 Oct 2025 17:02:04 +0300 Subject: [PATCH 1/2] fixed arguments in ctx when the same keys defined --- ...CalculatedFieldEntityMessageProcessor.java | 44 ++++++++-------- .../cf/ctx/state/CalculatedFieldCtx.java | 19 ++++--- .../cf/CalculatedFieldIntegrationTest.java | 50 +++++++++++++++++++ 3 files changed, 84 insertions(+), 29 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 35539834c3..a905738a2f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -346,21 +346,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArguments(argNames, data); } - private Map mapToArguments(Map argNames, List data) { - if (argNames.isEmpty()) { + private Map mapToArguments(Map> args, List data) { + if (args.isEmpty()) { return Collections.emptyMap(); } Map arguments = new HashMap<>(); for (TsKvProto item : data) { ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null); - String argName = argNames.get(key); - if (argName != null) { - arguments.put(argName, new SingleValueArgumentEntry(item)); + Set argNames = args.get(key); + if (argNames != null) { + argNames.forEach(argName -> arguments.put(argName, new SingleValueArgumentEntry(item))); } + key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_ROLLING, null); - argName = argNames.get(key); - if (argName != null) { - arguments.put(argName, new SingleValueArgumentEntry(item)); + argNames = args.get(key); + if (argNames != null) { + argNames.forEach(argName -> arguments.put(argName, new SingleValueArgumentEntry(item))); } } return arguments; @@ -378,13 +379,13 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArguments(argNames, scope, attrDataList); } - private Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) { + private Map mapToArguments(Map> args, AttributeScopeProto scope, List attrDataList) { Map arguments = new HashMap<>(); for (AttributeValueProto item : attrDataList) { ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); - String argName = argNames.get(key); - if (argName != null) { - arguments.put(argName, new SingleValueArgumentEntry(item)); + Set argNames = args.get(key); + if (argNames != null) { + argNames.forEach(argName -> arguments.put(argName, new SingleValueArgumentEntry(item))); } } return arguments; @@ -402,18 +403,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys); } - private Map mapToArgumentsWithDefaultValue(Map argNames, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) { + private Map mapToArgumentsWithDefaultValue(Map> args, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) { Map arguments = new HashMap<>(); for (String removedKey : removedAttrKeys) { ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); - String argName = argNames.get(key); - if (argName != null) { - Argument argument = configArguments.get(argName); - String defaultValue = (argument != null) ? argument.getDefaultValue() : null; - arguments.put(argName, StringUtils.isNotEmpty(defaultValue) - ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) - : new SingleValueArgumentEntry()); - + Set argNames = args.get(key); + if (argNames != null) { + argNames.forEach(argName -> { + Argument argument = configArguments.get(argName); + String defaultValue = (argument != null) ? argument.getDefaultValue() : null; + arguments.put(argName, StringUtils.isNotEmpty(defaultValue) + ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) + : new SingleValueArgumentEntry()); + }); } } return arguments; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index c9eaaef19a..9cb68bbc27 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -42,8 +42,10 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.thingsboard.common.util.ExpressionFunctionsUtil.userDefinedFunctions; @@ -57,8 +59,8 @@ public class CalculatedFieldCtx { private EntityId entityId; private CalculatedFieldType cfType; private final Map arguments; - private final Map mainEntityArguments; - private final Map> linkedEntityArguments; + private final Map> mainEntityArguments; + private final Map>> linkedEntityArguments; private final List argNames; private Output output; private String expression; @@ -88,9 +90,10 @@ public class CalculatedFieldCtx { var refId = entry.getValue().getRefEntityId(); var refKey = entry.getValue().getRefEntityKey(); if (refId == null || refId.equals(calculatedField.getEntityId())) { - mainEntityArguments.put(refKey, entry.getKey()); + mainEntityArguments.computeIfAbsent(refKey, key -> new HashSet<>()).add(entry.getKey()); } else { - linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey()); + linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()) + .computeIfAbsent(refKey, key -> new HashSet<>()).add(entry.getKey()); } } this.argNames = new ArrayList<>(arguments.keySet()); @@ -182,7 +185,7 @@ public class CalculatedFieldCtx { return map != null && matchesTimeSeries(map, values); } - private boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { + private boolean matchesAttributes(Map> argMap, List values, AttributeScope scope) { if (argMap.isEmpty() || values.isEmpty()) { return false; } @@ -196,7 +199,7 @@ public class CalculatedFieldCtx { return false; } - private boolean matchesTimeSeries(Map argMap, List values) { + private boolean matchesTimeSeries(Map> argMap, List values) { if (argMap.isEmpty() || values.isEmpty()) { return false; } @@ -225,7 +228,7 @@ public class CalculatedFieldCtx { return matchesTimeSeriesKeys(mainEntityArguments, keys); } - private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + private boolean matchesAttributesKeys(Map> argMap, List keys, AttributeScope scope) { if (argMap.isEmpty() || keys.isEmpty()) { return false; } @@ -240,7 +243,7 @@ public class CalculatedFieldCtx { return false; } - private boolean matchesTimeSeriesKeys(Map argMap, List keys) { + private boolean matchesTimeSeriesKeys(Map> argMap, List keys) { if (argMap.isEmpty() || keys.isEmpty()) { return false; } diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index da4b5758cc..b500f95d45 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -659,6 +659,56 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes }); } + @Test + public void testCalculatedFieldWhenTheSameTelemetryKeysUsed() throws Exception { + Device testDevice = createDevice("Test device", "1234567890"); + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":5}")); + + CalculatedField calculatedField = new CalculatedField(); + calculatedField.setEntityId(testDevice.getId()); + calculatedField.setType(CalculatedFieldType.SIMPLE); + calculatedField.setName("a + b"); + calculatedField.setDebugSettings(DebugSettings.all()); + + SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration(); + + ReferencedEntityKey refEntityKey = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null); + Argument argumentA = new Argument(); + argumentA.setRefEntityKey(refEntityKey); + Argument argumentB = new Argument(); + argumentB.setRefEntityKey(refEntityKey); + config.setArguments(Map.of("a", argumentA, "b", argumentB)); + config.setExpression("a + b"); + + Output output = new Output(); + output.setName("c"); + output.setType(OutputType.TIME_SERIES); + output.setDecimalsByDefault(0); + config.setOutput(output); + + calculatedField.setConfiguration(config); + + doPost("/api/calculatedField", calculatedField, CalculatedField.class); + + await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode c = getLatestTelemetry(testDevice.getId(), "c"); + assertThat(c).isNotNull(); + assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("10"); + }); + + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":10}")); + + await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode c = getLatestTelemetry(testDevice.getId(), "c"); + assertThat(c).isNotNull(); + assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("20"); + }); + } + private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception { return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class); } From 77f2250e0af94d361837dba8467142a5a354d1d8 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 21 Oct 2025 14:09:33 +0300 Subject: [PATCH 2/2] added helper method to collectionsUtil --- .../service/cf/ctx/state/CalculatedFieldCtx.java | 6 +++--- .../server/common/data/util/CollectionsUtil.java | 13 +++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 9cb68bbc27..9ef5a8c2a9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -42,7 +43,6 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -90,10 +90,10 @@ public class CalculatedFieldCtx { var refId = entry.getValue().getRefEntityId(); var refKey = entry.getValue().getRefEntityKey(); if (refId == null || refId.equals(calculatedField.getEntityId())) { - mainEntityArguments.computeIfAbsent(refKey, key -> new HashSet<>()).add(entry.getKey()); + mainEntityArguments.compute(refKey, (key, existingNames) -> CollectionsUtil.addToSet(existingNames, entry.getKey())); } else { linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()) - .computeIfAbsent(refKey, key -> new HashSet<>()).add(entry.getKey()); + .compute(refKey, (key, existingNames) -> CollectionsUtil.addToSet(existingNames, entry.getKey())); } } this.argNames = new ArrayList<>(arguments.keySet()); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java index 71c5256203..082be9b71f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java @@ -95,4 +95,17 @@ public class CollectionsUtil { return false; } + public static Set addToSet(Set existing, T value) { + if (existing == null || existing.isEmpty()) { + return Set.of(value); + } + if (existing.contains(value)) { + return existing; + } + Set newSet = new HashSet<>(existing.size() + 1); + newSet.addAll(existing); + newSet.add(value); + return (Set) Set.of(newSet.toArray()); + } + }