Browse Source

Merge pull request #14413 from irynamatveieva/fix/related-entities-agg

Added filter to process only related entities calculated fields
pull/14495/head
Viacheslav Klimov 7 months ago
committed by GitHub
parent
commit
de4ff25088
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 10
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  2. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  3. 48
      application/src/test/java/org/thingsboard/server/cf/RelatedEntitiesAggregationCalculatedFieldTest.java

10
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -84,6 +84,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
@ -562,15 +563,20 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
};
RelationPathLevel inverseRelation = new RelationPathLevel(inverseDirection, relation.relationType());
List<EntityRelation> byRelationPathQuery = relationService.findByRelationPathQuery(tenantId, new EntityRelationPathQuery(entityId, List.of(inverseRelation)));
EntityId cfEntityId = cf.getEntityId();
Predicate<EntityId> matchesCfEntity = relatedEntity -> cfEntityId.equals(relatedEntity) || cfEntityId.equals(getProfileId(tenantId, relatedEntity));
if (byRelationPathQuery != null && !byRelationPathQuery.isEmpty()) {
switch (relation.direction()) {
case FROM -> {
EntityRelation entityRelation = byRelationPathQuery.get(0); // only one supported
result.add(new CalculatedFieldEntityCtxId(tenantId, cf.getCfId(), entityRelation.getFrom()));
EntityId relatedId = entityRelation.getFrom();
if (matchesCfEntity.test(relatedId)) {
result.add(new CalculatedFieldEntityCtxId(tenantId, cf.getCfId(), relatedId));
}
}
case TO -> {
byRelationPathQuery.stream()
.filter(entityRelation -> entityRelation.getTo().equals(cf.getEntityId()))
.filter(entityRelation -> matchesCfEntity.test(entityRelation.getTo()))
.forEach(entityRelation -> result.add(new CalculatedFieldEntityCtxId(tenantId, cf.getCfId(), entityRelation.getTo())));
}
}

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -651,8 +651,8 @@ public class CalculatedFieldCtx implements Closeable {
}
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig
&& other.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) {
boolean metricsChanged = thisConfig.getMetrics().equals(otherConfig.getMetrics());
boolean watermarkChanged = thisConfig.getWatermark().equals(otherConfig.getWatermark());
boolean metricsChanged = !Objects.equals(thisConfig.getMetrics(), otherConfig.getMetrics());
boolean watermarkChanged = !Objects.equals(thisConfig.getWatermark(), otherConfig.getWatermark());
return metricsChanged || watermarkChanged;
}
return false;

48
application/src/test/java/org/thingsboard/server/cf/RelatedEntitiesAggregationCalculatedFieldTest.java

@ -673,6 +673,54 @@ public class RelatedEntitiesAggregationCalculatedFieldTest extends AbstractContr
});
}
@Test
public void testTheSameRelationTypeAndKeyButDifferentCfs_checkAggregation() throws Exception {
postTelemetry(device1.getId(), "{\"temperature\":24.2}");
postTelemetry(device2.getId(), "{\"temperature\":19.6}");
CalculatedField cf = createAvgTemperatureCF(asset.getId());
Asset asset2 = createAsset("Asset 2", assetProfile.getId());
Device device3 = createDevice("Device 3", "1234567890333");
createEntityRelation(asset2.getId(), device3.getId(), "Contains");
postTelemetry(device3.getId(), "{\"temperature\":10.2}");
CalculatedField calculatedField = new CalculatedField();
calculatedField.setName("Average temperature");
calculatedField.setEntityId(asset2.getId());
calculatedField.setType(CalculatedFieldType.RELATED_ENTITIES_AGGREGATION);
var config = (RelatedEntitiesAggregationCalculatedFieldConfiguration) cf.getConfiguration();
AggMetric avgMetric = new AggMetric();
avgMetric.setFunction(AggFunction.AVG);
avgMetric.setInput(new AggKeyInput("temp"));
config.setMetrics(Map.of("avgTemperature_2", avgMetric));
calculatedField.setConfiguration(config);
calculatedField.setDebugSettings(DebugSettings.all());
saveCalculatedField(calculatedField);
await().alias("create avg temp cf and perform initial aggregation").atMost(deduplicationInterval * 2, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode avgTemperature = getLatestTelemetry(asset.getId(), "avgTemperature", "avgTemperature_2");
assertThat(avgTemperature).isNotNull();
assertThat(avgTemperature.get("avgTemperature").get(0).get("value").asText()).isEqualTo("24");
assertThat(avgTemperature.get("avgTemperature_2").get(0).get("value").isNull()).isTrue();
});
postTelemetry(device1.getId(), "{\"temperature\":26.2}");
await().alias("create avg temp cf and perform initial aggregation").atMost(deduplicationInterval * 2, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode avgTemperature = getLatestTelemetry(asset.getId(), "avgTemperature", "avgTemperature_2");
assertThat(avgTemperature).isNotNull();
assertThat(avgTemperature.get("avgTemperature").get(0).get("value").asText()).isEqualTo("26");
assertThat(avgTemperature.get("avgTemperature_2").get(0).get("value").isNull()).isTrue();
});
}
private void checkInitialCalculation() {
await().alias("create CF and perform initial aggregation").atMost(deduplicationInterval * 2, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)

Loading…
Cancel
Save