Browse Source

Merge pull request #14195 from irynamatveieva/fix/cf-arguments

Fixed incorrect CF calculation when same key is used across multiple arguments
pull/14198/head
Viacheslav Klimov 8 months ago
committed by GitHub
parent
commit
f68259d141
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 44
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 19
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  3. 50
      application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java
  4. 13
      common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java

44
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -346,21 +346,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return mapToArguments(argNames, data);
}
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, List<TsKvProto> data) {
if (argNames.isEmpty()) {
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, Set<String>> args, List<TsKvProto> data) {
if (args.isEmpty()) {
return Collections.emptyMap();
}
Map<String, ArgumentEntry> arguments = new HashMap<>();
for (TsKvProto item : data) {
ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null);
String argName = argNames.get(key);
if (argName != null) {
arguments.put(argName, new SingleValueArgumentEntry(item));
Set<String> argNames = args.get(key);
if (argNames != null) {
argNames.forEach(argName -> arguments.put(argName, new SingleValueArgumentEntry(item)));
}
key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_ROLLING, null);
argName = argNames.get(key);
if (argName != null) {
arguments.put(argName, new SingleValueArgumentEntry(item));
argNames = args.get(key);
if (argNames != null) {
argNames.forEach(argName -> arguments.put(argName, new SingleValueArgumentEntry(item)));
}
}
return arguments;
@ -378,13 +379,13 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return mapToArguments(argNames, scope, attrDataList);
}
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, Set<String>> args, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
Map<String, ArgumentEntry> arguments = new HashMap<>();
for (AttributeValueProto item : attrDataList) {
ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
String argName = argNames.get(key);
if (argName != null) {
arguments.put(argName, new SingleValueArgumentEntry(item));
Set<String> argNames = args.get(key);
if (argNames != null) {
argNames.forEach(argName -> arguments.put(argName, new SingleValueArgumentEntry(item)));
}
}
return arguments;
@ -402,18 +403,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys);
}
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(Map<ReferencedEntityKey, String> argNames, Map<String, Argument> configArguments, AttributeScopeProto scope, List<String> removedAttrKeys) {
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(Map<ReferencedEntityKey, Set<String>> args, Map<String, Argument> configArguments, AttributeScopeProto scope, List<String> removedAttrKeys) {
Map<String, ArgumentEntry> arguments = new HashMap<>();
for (String removedKey : removedAttrKeys) {
ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
String argName = argNames.get(key);
if (argName != null) {
Argument argument = configArguments.get(argName);
String defaultValue = (argument != null) ? argument.getDefaultValue() : null;
arguments.put(argName, StringUtils.isNotEmpty(defaultValue)
? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null)
: new SingleValueArgumentEntry());
Set<String> argNames = args.get(key);
if (argNames != null) {
argNames.forEach(argName -> {
Argument argument = configArguments.get(argName);
String defaultValue = (argument != null) ? argument.getDefaultValue() : null;
arguments.put(argName, StringUtils.isNotEmpty(defaultValue)
? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null)
: new SingleValueArgumentEntry());
});
}
}
return arguments;

19
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@ -44,6 +45,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.thingsboard.common.util.ExpressionFunctionsUtil.userDefinedFunctions;
@ -57,8 +59,8 @@ public class CalculatedFieldCtx {
private EntityId entityId;
private CalculatedFieldType cfType;
private final Map<String, Argument> arguments;
private final Map<ReferencedEntityKey, String> mainEntityArguments;
private final Map<EntityId, Map<ReferencedEntityKey, String>> linkedEntityArguments;
private final Map<ReferencedEntityKey, Set<String>> mainEntityArguments;
private final Map<EntityId, Map<ReferencedEntityKey, Set<String>>> linkedEntityArguments;
private final List<String> argNames;
private Output output;
private String expression;
@ -88,9 +90,10 @@ public class CalculatedFieldCtx {
var refId = entry.getValue().getRefEntityId();
var refKey = entry.getValue().getRefEntityKey();
if (refId == null || refId.equals(calculatedField.getEntityId())) {
mainEntityArguments.put(refKey, entry.getKey());
mainEntityArguments.compute(refKey, (key, existingNames) -> CollectionsUtil.addToSet(existingNames, entry.getKey()));
} else {
linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey());
linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>())
.compute(refKey, (key, existingNames) -> CollectionsUtil.addToSet(existingNames, entry.getKey()));
}
}
this.argNames = new ArrayList<>(arguments.keySet());
@ -182,7 +185,7 @@ public class CalculatedFieldCtx {
return map != null && matchesTimeSeries(map, values);
}
private boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
private boolean matchesAttributes(Map<ReferencedEntityKey, Set<String>> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
if (argMap.isEmpty() || values.isEmpty()) {
return false;
}
@ -196,7 +199,7 @@ public class CalculatedFieldCtx {
return false;
}
private boolean matchesTimeSeries(Map<ReferencedEntityKey, String> argMap, List<TsKvEntry> values) {
private boolean matchesTimeSeries(Map<ReferencedEntityKey, Set<String>> argMap, List<TsKvEntry> values) {
if (argMap.isEmpty() || values.isEmpty()) {
return false;
}
@ -225,7 +228,7 @@ public class CalculatedFieldCtx {
return matchesTimeSeriesKeys(mainEntityArguments, keys);
}
private boolean matchesAttributesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys, AttributeScope scope) {
private boolean matchesAttributesKeys(Map<ReferencedEntityKey, Set<String>> argMap, List<String> keys, AttributeScope scope) {
if (argMap.isEmpty() || keys.isEmpty()) {
return false;
}
@ -240,7 +243,7 @@ public class CalculatedFieldCtx {
return false;
}
private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys) {
private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, Set<String>> argMap, List<String> keys) {
if (argMap.isEmpty() || keys.isEmpty()) {
return false;
}

50
application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java

@ -659,6 +659,56 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
});
}
@Test
public void testCalculatedFieldWhenTheSameTelemetryKeysUsed() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":5}"));
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SIMPLE);
calculatedField.setName("a + b");
calculatedField.setDebugSettings(DebugSettings.all());
SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration();
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null);
Argument argumentA = new Argument();
argumentA.setRefEntityKey(refEntityKey);
Argument argumentB = new Argument();
argumentB.setRefEntityKey(refEntityKey);
config.setArguments(Map.of("a", argumentA, "b", argumentB));
config.setExpression("a + b");
Output output = new Output();
output.setName("c");
output.setType(OutputType.TIME_SERIES);
output.setDecimalsByDefault(0);
config.setOutput(output);
calculatedField.setConfiguration(config);
doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode c = getLatestTelemetry(testDevice.getId(), "c");
assertThat(c).isNotNull();
assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("10");
});
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"a\":10}"));
await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode c = getLatestTelemetry(testDevice.getId(), "c");
assertThat(c).isNotNull();
assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("20");
});
}
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {
return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class);
}

13
common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java

@ -95,4 +95,17 @@ public class CollectionsUtil {
return false;
}
public static <T> Set<T> addToSet(Set<T> existing, T value) {
if (existing == null || existing.isEmpty()) {
return Set.of(value);
}
if (existing.contains(value)) {
return existing;
}
Set<T> newSet = new HashSet<>(existing.size() + 1);
newSet.addAll(existing);
newSet.add(value);
return (Set<T>) Set.of(newSet.toArray());
}
}

Loading…
Cancel
Save