diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index dc6757f88f..4022526aab 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -84,6 +84,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Stream; import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; @@ -562,15 +563,20 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware }; RelationPathLevel inverseRelation = new RelationPathLevel(inverseDirection, relation.relationType()); List byRelationPathQuery = relationService.findByRelationPathQuery(tenantId, new EntityRelationPathQuery(entityId, List.of(inverseRelation))); + EntityId cfEntityId = cf.getEntityId(); + Predicate matchesCfEntity = relatedEntity -> cfEntityId.equals(relatedEntity) || cfEntityId.equals(getProfileId(tenantId, relatedEntity)); if (byRelationPathQuery != null && !byRelationPathQuery.isEmpty()) { switch (relation.direction()) { case FROM -> { EntityRelation entityRelation = byRelationPathQuery.get(0); // only one supported - result.add(new CalculatedFieldEntityCtxId(tenantId, cf.getCfId(), entityRelation.getFrom())); + EntityId relatedId = entityRelation.getFrom(); + if (matchesCfEntity.test(relatedId)) { + result.add(new CalculatedFieldEntityCtxId(tenantId, cf.getCfId(), relatedId)); + } } case TO -> { byRelationPathQuery.stream() - .filter(entityRelation -> entityRelation.getTo().equals(cf.getEntityId())) + .filter(entityRelation -> matchesCfEntity.test(entityRelation.getTo())) .forEach(entityRelation -> result.add(new CalculatedFieldEntityCtxId(tenantId, cf.getCfId(), entityRelation.getTo()))); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 393851eb02..cadd34c420 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -651,8 +651,8 @@ public class CalculatedFieldCtx implements Closeable { } if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig && other.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) { - boolean metricsChanged = thisConfig.getMetrics().equals(otherConfig.getMetrics()); - boolean watermarkChanged = thisConfig.getWatermark().equals(otherConfig.getWatermark()); + boolean metricsChanged = !Objects.equals(thisConfig.getMetrics(), otherConfig.getMetrics()); + boolean watermarkChanged = !Objects.equals(thisConfig.getWatermark(), otherConfig.getWatermark()); return metricsChanged || watermarkChanged; } return false; diff --git a/application/src/test/java/org/thingsboard/server/cf/RelatedEntitiesAggregationCalculatedFieldTest.java b/application/src/test/java/org/thingsboard/server/cf/RelatedEntitiesAggregationCalculatedFieldTest.java index 3739cc2618..eb1e6905ca 100644 --- a/application/src/test/java/org/thingsboard/server/cf/RelatedEntitiesAggregationCalculatedFieldTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/RelatedEntitiesAggregationCalculatedFieldTest.java @@ -673,6 +673,54 @@ public class RelatedEntitiesAggregationCalculatedFieldTest extends AbstractContr }); } + + @Test + public void testTheSameRelationTypeAndKeyButDifferentCfs_checkAggregation() throws Exception { + postTelemetry(device1.getId(), "{\"temperature\":24.2}"); + postTelemetry(device2.getId(), "{\"temperature\":19.6}"); + CalculatedField cf = createAvgTemperatureCF(asset.getId()); + + Asset asset2 = createAsset("Asset 2", assetProfile.getId()); + Device device3 = createDevice("Device 3", "1234567890333"); + createEntityRelation(asset2.getId(), device3.getId(), "Contains"); + postTelemetry(device3.getId(), "{\"temperature\":10.2}"); + + CalculatedField calculatedField = new CalculatedField(); + calculatedField.setName("Average temperature"); + calculatedField.setEntityId(asset2.getId()); + calculatedField.setType(CalculatedFieldType.RELATED_ENTITIES_AGGREGATION); + + var config = (RelatedEntitiesAggregationCalculatedFieldConfiguration) cf.getConfiguration(); + AggMetric avgMetric = new AggMetric(); + avgMetric.setFunction(AggFunction.AVG); + avgMetric.setInput(new AggKeyInput("temp")); + config.setMetrics(Map.of("avgTemperature_2", avgMetric)); + + calculatedField.setConfiguration(config); + calculatedField.setDebugSettings(DebugSettings.all()); + saveCalculatedField(calculatedField); + + await().alias("create avg temp cf and perform initial aggregation").atMost(deduplicationInterval * 2, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode avgTemperature = getLatestTelemetry(asset.getId(), "avgTemperature", "avgTemperature_2"); + assertThat(avgTemperature).isNotNull(); + assertThat(avgTemperature.get("avgTemperature").get(0).get("value").asText()).isEqualTo("24"); + assertThat(avgTemperature.get("avgTemperature_2").get(0).get("value").isNull()).isTrue(); + }); + + postTelemetry(device1.getId(), "{\"temperature\":26.2}"); + + await().alias("create avg temp cf and perform initial aggregation").atMost(deduplicationInterval * 2, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode avgTemperature = getLatestTelemetry(asset.getId(), "avgTemperature", "avgTemperature_2"); + assertThat(avgTemperature).isNotNull(); + assertThat(avgTemperature.get("avgTemperature").get(0).get("value").asText()).isEqualTo("26"); + assertThat(avgTemperature.get("avgTemperature_2").get(0).get("value").isNull()).isTrue(); + }); + } + private void checkInitialCalculation() { await().alias("create CF and perform initial aggregation").atMost(deduplicationInterval * 2, TimeUnit.SECONDS) .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)