|
|
|
@ -23,6 +23,7 @@ import jakarta.annotation.PostConstruct; |
|
|
|
import jakarta.annotation.PreDestroy; |
|
|
|
import lombok.Data; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.thingsboard.common.util.ThingsBoardExecutors; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.Argument; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; |
|
|
|
@ -53,7 +54,6 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry; |
|
|
|
import org.thingsboard.server.utils.CalculatedFieldArgumentUtils; |
|
|
|
|
|
|
|
import java.util.Collections; |
|
|
|
import java.util.HashMap; |
|
|
|
@ -71,6 +71,7 @@ import static org.thingsboard.server.common.data.cf.configuration.geofencing.Ent |
|
|
|
import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LONGITUDE_ARGUMENT_KEY; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultKvEntry; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformAggMetricArgument; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformAggregationArgument; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformSingleValueArgument; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformTsRollingArgument; |
|
|
|
@ -87,6 +88,9 @@ public abstract class AbstractCalculatedFieldProcessingService { |
|
|
|
|
|
|
|
protected ListeningExecutorService calculatedFieldCallbackExecutor; |
|
|
|
|
|
|
|
@Value("${actors.calculated_fields.max_datapoints_limit}") |
|
|
|
private int aggArgumentMaxDatapointsLimit; |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( |
|
|
|
@ -302,21 +306,21 @@ public abstract class AbstractCalculatedFieldProcessingService { |
|
|
|
AggFunction function = metric.getFunction(); |
|
|
|
long intervalMs = interval.getEndTs() - interval.getStartTs(); |
|
|
|
BaseReadTsKvQuery query = new BaseReadTsKvQuery(argKey, interval.getStartTs(), interval.getEndTs(), intervalMs, 1, Aggregation.valueOf(function.name())); |
|
|
|
ListenableFuture<ArgumentEntry> argumentEntryFut = fetchTimeSeriesInternal(tenantId, entityId, query, CalculatedFieldArgumentUtils::transformAggMetricArgument); |
|
|
|
ListenableFuture<ArgumentEntry> argumentEntryFut = fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggMetricArgument(timeSeries, argKey, metric)); |
|
|
|
return resolveArgumentValue(argKey, argumentEntryFut); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<ArgumentEntry> fetchTimeSeries(TenantId tenantId, EntityId entityId, Argument argument, AggInterval interval, long queryEndTs) { |
|
|
|
long startInterval = interval.getCurrentIntervalStartTs(); |
|
|
|
long intervalEndTs = interval.getCurrentIntervalEndTs(); |
|
|
|
ReadTsKvQuery query = buildTimeSeriesQuery(tenantId, argument, startInterval, queryEndTs); |
|
|
|
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startInterval, queryEndTs, 0, aggArgumentMaxDatapointsLimit, Aggregation.NONE); |
|
|
|
return fetchTimeSeriesInternal(tenantId, entityId, query, timeSeries -> transformAggregationArgument(timeSeries, startInterval, intervalEndTs)); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<ArgumentEntry> fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument, long queryEndTs) { |
|
|
|
long argTimeWindow = argument.getTimeWindow() == 0 ? queryEndTs : argument.getTimeWindow(); |
|
|
|
long startInterval = queryEndTs - argTimeWindow; |
|
|
|
ReadTsKvQuery query = buildTimeSeriesQuery(tenantId, argument, startInterval, queryEndTs); |
|
|
|
ReadTsKvQuery query = buildTsRollingQuery(tenantId, argument, startInterval, queryEndTs); |
|
|
|
return fetchTimeSeriesInternal(tenantId, entityId, query, tsRolling -> transformTsRollingArgument(tsRolling, query.getLimit(), argTimeWindow)); |
|
|
|
} |
|
|
|
|
|
|
|
@ -352,10 +356,10 @@ public abstract class AbstractCalculatedFieldProcessingService { |
|
|
|
}, calculatedFieldCallbackExecutor); |
|
|
|
} |
|
|
|
|
|
|
|
private ReadTsKvQuery buildTimeSeriesQuery(TenantId tenantId, Argument argument, long startTs, long endTs) { |
|
|
|
private ReadTsKvQuery buildTsRollingQuery(TenantId tenantId, Argument argument, long startTs, long endTs) { |
|
|
|
long maxDataPoints = apiLimitService.getLimit( |
|
|
|
tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg); |
|
|
|
int argumentLimit = argument.getLimit() == null ? 500000 : argument.getLimit(); |
|
|
|
int argumentLimit = argument.getLimit(); |
|
|
|
int limit = argumentLimit == 0 || argumentLimit > maxDataPoints ? (int) maxDataPoints : argumentLimit; |
|
|
|
return new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startTs, endTs, 0, limit, Aggregation.NONE); |
|
|
|
} |
|
|
|
|