|
|
|
@ -43,6 +43,7 @@ import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Map.Entry; |
|
|
|
import java.util.concurrent.ScheduledFuture; |
|
|
|
|
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS; |
|
|
|
|
|
|
|
@ -59,6 +60,8 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat |
|
|
|
private long deduplicationIntervalMs = -1; |
|
|
|
private Map<String, AggMetric> metrics; |
|
|
|
|
|
|
|
private ScheduledFuture<?> reevaluationFuture; |
|
|
|
|
|
|
|
public RelatedEntitiesAggregationCalculatedFieldState(EntityId entityId) { |
|
|
|
super(entityId); |
|
|
|
} |
|
|
|
@ -71,8 +74,13 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat |
|
|
|
deduplicationIntervalMs = SECONDS.toMillis(configuration.getDeduplicationIntervalInSec()); |
|
|
|
} |
|
|
|
|
|
|
|
public void scheduleReevaluation() { |
|
|
|
ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); |
|
|
|
@Override |
|
|
|
public void close() { |
|
|
|
super.close(); |
|
|
|
if (reevaluationFuture != null) { |
|
|
|
reevaluationFuture.cancel(true); |
|
|
|
reevaluationFuture = null; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -142,6 +150,13 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat |
|
|
|
lastArgsRefreshTs = System.currentTimeMillis(); |
|
|
|
} |
|
|
|
|
|
|
|
public void scheduleReevaluation() { |
|
|
|
ScheduledFuture<?> future = ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); |
|
|
|
if (future != null) { |
|
|
|
reevaluationFuture = future; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public ListenableFuture<CalculatedFieldResult> performCalculation(Map<String, ArgumentEntry> updatedArgs, CalculatedFieldCtx ctx) throws Exception { |
|
|
|
boolean cfUpdated = updatedArgs != null && updatedArgs.isEmpty(); |
|
|
|
@ -149,7 +164,7 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat |
|
|
|
Output output = ctx.getOutput(); |
|
|
|
ObjectNode aggResult = aggregateMetrics(output); |
|
|
|
lastMetricsEvalTs = System.currentTimeMillis(); |
|
|
|
ctx.scheduleReevaluation(deduplicationIntervalMs, actorCtx); |
|
|
|
scheduleReevaluation(); |
|
|
|
return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() |
|
|
|
.type(output.getType()) |
|
|
|
.scope(output.getScope()) |
|
|
|
|