|
|
|
@ -15,12 +15,15 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.server.service.cf.ctx.state.aggregation.single; |
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.JsonNode; |
|
|
|
import com.fasterxml.jackson.databind.node.ArrayNode; |
|
|
|
import com.fasterxml.jackson.databind.node.ObjectNode; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
import org.thingsboard.common.util.DebugModeUtil; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.script.api.tbel.TbUtils; |
|
|
|
import org.thingsboard.script.api.tbel.TbelCfArg; |
|
|
|
import org.thingsboard.server.actors.TbActorRef; |
|
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.Output; |
|
|
|
@ -36,6 +39,7 @@ import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; |
|
|
|
|
|
|
|
import java.time.Instant; |
|
|
|
import java.time.ZoneId; |
|
|
|
@ -58,6 +62,8 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt |
|
|
|
private long checkInterval; |
|
|
|
private Map<String, AggMetric> metrics; |
|
|
|
|
|
|
|
private final EntityAggregationDebugArgumentsTracker debugTracker = new EntityAggregationDebugArgumentsTracker(new HashMap<>()); |
|
|
|
|
|
|
|
private CalculatedFieldProcessingService cfProcessingService; |
|
|
|
|
|
|
|
public EntityAggregationCalculatedFieldState(EntityId entityId) { |
|
|
|
@ -91,9 +97,14 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt |
|
|
|
|
|
|
|
@Override |
|
|
|
public ListenableFuture<CalculatedFieldResult> performCalculation(Map<String, ArgumentEntry> updatedArgs, CalculatedFieldCtx ctx) throws Exception { |
|
|
|
debugTracker.reset(); |
|
|
|
createIntervalIfNotExist(); |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
|
|
|
|
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { |
|
|
|
debugTracker.recordUpdatedArgs(updatedArgs, arguments); |
|
|
|
} |
|
|
|
|
|
|
|
Map<AggIntervalEntry, Map<String, ArgumentEntry>> results = new HashMap<>(); |
|
|
|
List<AggIntervalEntry> expiredIntervals = new ArrayList<>(); |
|
|
|
getIntervals().forEach((intervalEntry, argIntervalStatuses) -> { |
|
|
|
@ -114,6 +125,12 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt |
|
|
|
.build()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public Map<String, ArgumentEntry> update(Map<String, ArgumentEntry> argumentValues, CalculatedFieldCtx ctx) { |
|
|
|
createIntervalIfNotExist(); |
|
|
|
return super.update(argumentValues, ctx); |
|
|
|
} |
|
|
|
|
|
|
|
private void removeExpiredIntervals(List<AggIntervalEntry> expiredIntervals) { |
|
|
|
expiredIntervals.forEach(expiredInterval -> { |
|
|
|
arguments.values().stream() |
|
|
|
@ -183,6 +200,9 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt |
|
|
|
expiredIntervals.add(intervalEntry); |
|
|
|
} else if (now - startTs >= intervalEntry.getIntervalDuration()) { |
|
|
|
handleActiveInterval(intervalEntry, args, results); |
|
|
|
if (watermarkDuration == 0) { |
|
|
|
expiredIntervals.add(intervalEntry); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -262,14 +282,83 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt |
|
|
|
resultNode.put("ts", interval.getEndTs() - 1); |
|
|
|
resultNode.set("values", metricsNode); |
|
|
|
result.add(resultNode); |
|
|
|
|
|
|
|
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { |
|
|
|
debugTracker.addInterval(interval); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public JsonNode getArgumentsJson() { |
|
|
|
EntityAggregationDebugArguments debugArguments = debugTracker.toDebugArguments(); |
|
|
|
return debugArguments == null ? null : JacksonUtil.valueToTree(debugArguments); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public boolean isReady() { |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
record EntityAggregationDebugArgumentsTracker(Map<AggIntervalEntry, Map<String, TbelCfArg>> processedIntervals) { |
|
|
|
|
|
|
|
public void reset() { |
|
|
|
processedIntervals.clear(); |
|
|
|
} |
|
|
|
|
|
|
|
public void addInterval(AggIntervalEntry interval) { |
|
|
|
processedIntervals.computeIfAbsent(interval, k -> new HashMap<>()); |
|
|
|
} |
|
|
|
|
|
|
|
public void recordUpdatedArgs(Map<String, ArgumentEntry> updatedArgs, Map<String, ArgumentEntry> arguments) { |
|
|
|
if (updatedArgs != null && !updatedArgs.isEmpty()) { |
|
|
|
updatedArgs.forEach((argName, argEntry) -> { |
|
|
|
ArgumentEntry argumentEntry = arguments.get(argName); |
|
|
|
if (argumentEntry instanceof EntityAggregationArgumentEntry entityAggEntry && argEntry instanceof SingleValueArgumentEntry singleEntry) { |
|
|
|
entityAggEntry.getAggIntervals().forEach((aggIntervalEntry, aggIntervalEntryStatus) -> { |
|
|
|
boolean match = singleEntry.isForceResetPrevious() || aggIntervalEntry.belongsToInterval(singleEntry.getTs()); |
|
|
|
if (match) { |
|
|
|
recordArg(aggIntervalEntry, argName, singleEntry.toTbelCfArg()); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void recordArg(AggIntervalEntry interval, String argName, TbelCfArg value) { |
|
|
|
processedIntervals.computeIfAbsent(interval, k -> new HashMap<>()).put(argName, value); |
|
|
|
} |
|
|
|
|
|
|
|
public EntityAggregationDebugArguments toDebugArguments() { |
|
|
|
if (processedIntervals.isEmpty()) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
return EntityAggregationDebugArguments.toDebugArguments(processedIntervals); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
record EntityAggregationDebugArguments(List<IntervalDebugArgument> processedIntervals) { |
|
|
|
|
|
|
|
public static EntityAggregationDebugArguments toDebugArguments(Map<AggIntervalEntry, Map<String, TbelCfArg>> processedIntervals) { |
|
|
|
List<IntervalDebugArgument> result = new ArrayList<>(); |
|
|
|
processedIntervals.forEach((interval, args) -> { |
|
|
|
result.add(new IntervalDebugArgument(interval.getStartTs(), interval.getEndTs(), args)); |
|
|
|
}); |
|
|
|
return new EntityAggregationDebugArguments(result); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
record IntervalDebugArgument(Long intervalStartTs, Long intervalEndTs, JsonNode updatedArguments) { |
|
|
|
|
|
|
|
public IntervalDebugArgument(Long intervalStartTs, Long intervalEndTs, Map<String, TbelCfArg> updatedArguments) { |
|
|
|
this(intervalStartTs, intervalEndTs, updatedArguments == null || updatedArguments.isEmpty() ? null : JacksonUtil.valueToTree(updatedArguments)); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|