diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java index 23a4381ddc..ff50cd99b6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; import static java.util.concurrent.TimeUnit.SECONDS; @@ -59,6 +60,8 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat private long deduplicationIntervalMs = -1; private Map metrics; + private ScheduledFuture reevaluationFuture; + public RelatedEntitiesAggregationCalculatedFieldState(EntityId entityId) { super(entityId); } @@ -71,8 +74,13 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat deduplicationIntervalMs = SECONDS.toMillis(configuration.getDeduplicationIntervalInSec()); } - public void scheduleReevaluation() { - ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); + @Override + public void close() { + super.close(); + if (reevaluationFuture != null) { + reevaluationFuture.cancel(true); + reevaluationFuture = null; + } } @Override @@ -142,6 +150,13 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat lastArgsRefreshTs = System.currentTimeMillis(); } + public void scheduleReevaluation() { + ScheduledFuture future = ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); + if (future != null) { + reevaluationFuture = future; + } + } + @Override public ListenableFuture performCalculation(Map updatedArgs, CalculatedFieldCtx ctx) throws Exception { boolean cfUpdated = updatedArgs != null && updatedArgs.isEmpty(); @@ -149,7 +164,7 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat Output output = ctx.getOutput(); ObjectNode aggResult = aggregateMetrics(output); lastMetricsEvalTs = System.currentTimeMillis(); - ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); + scheduleReevaluation(); return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() .type(output.getType()) .scope(output.getScope())