Browse Source

Updated calculated field processing logic && minor refactoring

pull/14501/head
dshvaika 6 months ago
parent
commit
071c254f67
  1. 59
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  2. 6
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  3. 3
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java
  4. 6
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java
  5. 29
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java

59
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -54,9 +54,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -75,7 +73,6 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
@ -84,7 +81,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
@ -101,7 +97,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATE
import static org.thingsboard.server.dao.util.KvUtils.filterChangedAttr;
import static org.thingsboard.server.dao.util.KvUtils.toTsKvEntryList;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultKvEntry;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultTsKvEntry;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformAggMetricArgument;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformAggregationArgument;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.transformSingleValueArgument;
@ -210,14 +206,14 @@ public abstract class AbstractCalculatedFieldProcessingService {
default -> {
var resolvedEntityIdsFuture = resolveGeofencingEntityIds(ctx.getTenantId(), entityId, entry);
argFutures.put(entry.getKey(), Futures.transformAsync(resolvedEntityIdsFuture, resolvedEntityIds ->
fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue()), MoreExecutors.directExecutor()));
fetchGeofencingArgumentValue(ctx.getTenantId(), resolvedEntityIds, entry.getValue(), startTs), MoreExecutors.directExecutor()));
}
}
}
return argFutures;
}
protected Map<String, ListenableFuture<ArgumentEntry>> fetchRelatedEntitiesAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
private Map<String, ListenableFuture<ArgumentEntry>> fetchRelatedEntitiesAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
if (!(ctx.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration config)) {
return Collections.emptyMap();
}
@ -230,7 +226,7 @@ public abstract class AbstractCalculatedFieldProcessingService {
));
}
protected Map<String, ListenableFuture<ArgumentEntry>> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
private Map<String, ListenableFuture<ArgumentEntry>> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
if (!(ctx.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration config)) {
return Collections.emptyMap();
}
@ -291,31 +287,28 @@ public abstract class AbstractCalculatedFieldProcessingService {
return ownerService.getOwner(tenantId, entityId);
}
private ListenableFuture<ArgumentEntry> fetchGeofencingKvEntry(TenantId tenantId, List<EntityId> geofencingEntities, Argument argument) {
private ListenableFuture<ArgumentEntry> fetchGeofencingArgumentValue(TenantId tenantId, List<EntityId> geofencingEntities, Argument argument, long startTs) {
if (argument.getRefEntityKey().getType() != ArgumentType.ATTRIBUTE) {
throw new IllegalStateException("Unsupported argument key type: " + argument.getRefEntityKey().getType());
}
List<ListenableFuture<Map.Entry<EntityId, AttributeKvEntry>>> kvFutures = geofencingEntities.stream()
var geofencingEntityIdToKvEntryMapFutures = Futures.allAsList(fetchGeofencingEntityIdToKvEntriesFutures(tenantId, geofencingEntities, argument, startTs));
return Futures.transform(geofencingEntityIdToKvEntryMapFutures, entries -> ArgumentEntry.createGeofencingValueArgument(entries.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), MoreExecutors.directExecutor());
}
private List<ListenableFuture<Map.Entry<EntityId, AttributeKvEntry>>> fetchGeofencingEntityIdToKvEntriesFutures(TenantId tenantId, List<EntityId> geofencingEntities, Argument argument, long startTs) {
return geofencingEntities.stream()
.map(entityId -> {
var attributesFuture = attributesService.find(
tenantId,
entityId,
argument.getRefEntityKey().getScope(),
argument.getRefEntityKey().getKey()
);
AttributeScope scope = argument.getRefEntityKey().getScope();
String key = argument.getRefEntityKey().getKey();
var attributesFuture = attributesService.find(tenantId, entityId, scope, key);
return Futures.transform(attributesFuture, resultOpt ->
Map.entry(entityId, resultOpt.orElseGet(() -> createDefaultAttributeEntry(argument, System.currentTimeMillis()))),
calculatedFieldCallbackExecutor
);
Map.entry(entityId, resultOpt.orElseGet(() -> createDefaultAttributeEntry(argument, startTs))),
calculatedFieldCallbackExecutor);
}).collect(Collectors.toList());
ListenableFuture<List<Map.Entry<EntityId, AttributeKvEntry>>> allFutures = Futures.allAsList(kvFutures);
return Futures.transform(allFutures, entries -> ArgumentEntry.createGeofencingValueArgument(entries.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), MoreExecutors.directExecutor());
}
public ListenableFuture<ArgumentEntry> fetchRelatedEntitiesArgumentEntry(TenantId tenantId, List<EntityId> aggEntities, Argument argument, long startTs) {
private ListenableFuture<ArgumentEntry> fetchRelatedEntitiesArgumentEntry(TenantId tenantId, List<EntityId> aggEntities, Argument argument, long startTs) {
List<ListenableFuture<Map.Entry<EntityId, ArgumentEntry>>> futures = aggEntities.stream()
.map(entityId -> {
ListenableFuture<ArgumentEntry> argumentEntryFut = fetchArgumentValue(tenantId, entityId, argument, startTs);
@ -368,21 +361,17 @@ public abstract class AbstractCalculatedFieldProcessingService {
return Futures.transform(attributeOptFuture, attrOpt -> {
log.debug("[{}][{}] Fetched attribute for key {}: {}", tenantId, entityId, argument.getRefEntityKey(), attrOpt);
AttributeKvEntry attributeKvEntry = attrOpt.orElseGet(() -> new BaseAttributeKvEntry(createDefaultKvEntry(argument), defaultLastUpdateTs, SingleValueArgumentEntry.DEFAULT_VERSION));
return transformSingleValueArgument(Optional.of(attributeKvEntry));
return transformSingleValueArgument(attrOpt.orElseGet(() -> createDefaultAttributeEntry(argument, defaultLastUpdateTs)));
}, calculatedFieldCallbackExecutor);
}
protected ListenableFuture<ArgumentEntry> fetchTsLatest(TenantId tenantId, EntityId entityId, Argument argument, long defaultTs) {
private ListenableFuture<ArgumentEntry> fetchTsLatest(TenantId tenantId, EntityId entityId, Argument argument, long defaultTs) {
String timeseriesKey = argument.getRefEntityKey().getKey();
log.trace("[{}][{}] Fetching latest timeseries {}", tenantId, entityId, timeseriesKey);
return transformSingleValueArgument(
Futures.transform(
timeseriesService.findLatest(tenantId, entityId, timeseriesKey),
result -> {
log.debug("[{}][{}] Fetched latest timeseries {}: {}", tenantId, entityId, timeseriesKey, result);
return result.or(() -> Optional.of(new BasicTsKvEntry(defaultTs, createDefaultKvEntry(argument), SingleValueArgumentEntry.DEFAULT_VERSION)));
}, calculatedFieldCallbackExecutor));
return Futures.transform(timeseriesService.findLatest(tenantId, entityId, timeseriesKey), result -> {
log.debug("[{}][{}] Fetched latest timeseries {}: {}", tenantId, entityId, timeseriesKey, result);
return transformSingleValueArgument(result.orElseGet(() -> createDefaultTsKvEntry(argument, defaultTs)));
}, calculatedFieldCallbackExecutor);
}
private ListenableFuture<ArgumentEntry> fetchTimeSeriesInternal(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, Function<List<TsKvEntry>, ArgumentEntry> transformArgument) {

6
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -85,7 +85,7 @@ import static org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldSta
@Slf4j
public class CalculatedFieldCtx implements Closeable {
private static final long SCHEDULED_UPDATE_DISABLED_VALUE = -1L;
public static final long DISABLED_INTERVAL_VALUE = -1L;
private CalculatedField calculatedField;
@ -201,7 +201,7 @@ public class CalculatedFieldCtx implements Closeable {
}
}
if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) {
this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : SCHEDULED_UPDATE_DISABLED_VALUE;
this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : DISABLED_INTERVAL_VALUE;
}
if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) {
this.useLatestTs = aggConfig.isUseLatestTs();
@ -720,7 +720,7 @@ public class CalculatedFieldCtx implements Closeable {
}
private boolean isScheduledUpdateDisabled() {
return scheduledUpdateIntervalMillis == SCHEDULED_UPDATE_DISABLED_VALUE;
return scheduledUpdateIntervalMillis == DISABLED_INTERVAL_VALUE;
}
public boolean shouldFetchRelationQueryDynamicArgumentsFromDb(CalculatedFieldState state) {

3
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java

@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx.DISABLED_INTERVAL_VALUE;
@Slf4j
@Getter
@ -62,7 +63,7 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat
private long lastMetricsEvalTs = DEFAULT_LAST_UPDATE_TS;
@Setter
private long lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS;
private long deduplicationIntervalMs = DEFAULT_LAST_UPDATE_TS;
private long deduplicationIntervalMs = DISABLED_INTERVAL_VALUE;
private Map<String, AggMetric> metrics;
private ScheduledFuture<?> reevaluationFuture;

6
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java

@ -218,7 +218,7 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
if (argEntryIntervalStatus.getLastArgsRefreshTs() > argEntryIntervalStatus.getLastMetricsEvalTs()) {
argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis());
processArgument(intervalEntry, argName, false, results);
} else if (argEntryIntervalStatus.getLastMetricsEvalTs() == -1) {
} else if (argEntryIntervalStatus.getLastMetricsEvalTs() == DEFAULT_LAST_UPDATE_TS) {
argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis());
processArgument(intervalEntry, argName, true, results);
}
@ -232,9 +232,9 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
if (argEntryIntervalStatus.intervalPassed(checkInterval)) {
if (argEntryIntervalStatus.argsUpdated()) {
argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis());
argEntryIntervalStatus.setLastArgsRefreshTs(-1);
argEntryIntervalStatus.setLastArgsRefreshTs(DEFAULT_LAST_UPDATE_TS);
processArgument(intervalEntry, argName, false, results);
} else if (argEntryIntervalStatus.getLastMetricsEvalTs() == -1) {
} else if (argEntryIntervalStatus.getLastMetricsEvalTs() == DEFAULT_LAST_UPDATE_TS) {
argEntryIntervalStatus.setLastMetricsEvalTs(System.currentTimeMillis());
processArgument(intervalEntry, argName, true, results);
}

29
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java

@ -15,9 +15,7 @@
*/
package org.thingsboard.server.utils;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.NonNull;
import org.apache.commons.lang3.math.NumberUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.configuration.Argument;
@ -25,6 +23,7 @@ import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -48,20 +47,13 @@ import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalcul
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry.DEFAULT_VERSION;
public class CalculatedFieldArgumentUtils {
public static ListenableFuture<ArgumentEntry> transformSingleValueArgument(ListenableFuture<Optional<? extends KvEntry>> kvEntryFuture) {
return Futures.transform(kvEntryFuture, CalculatedFieldArgumentUtils::transformSingleValueArgument, MoreExecutors.directExecutor());
}
public static ArgumentEntry transformSingleValueArgument(Optional<? extends KvEntry> kvEntry) {
if (kvEntry.isPresent() && kvEntry.get().getValue() != null) {
return ArgumentEntry.createSingleValueArgument(kvEntry.get());
} else {
return new SingleValueArgumentEntry();
}
public static ArgumentEntry transformSingleValueArgument(@NonNull KvEntry kvEntry) {
return kvEntry.getValue() != null ? ArgumentEntry.createSingleValueArgument(kvEntry) : new SingleValueArgumentEntry();
}
public static ArgumentEntry transformTsRollingArgument(List<TsKvEntry> tsRolling, int limit, long argTimeWindow) {
@ -94,7 +86,7 @@ public class CalculatedFieldArgumentUtils {
return new EntityAggregationArgumentEntry(aggIntervals);
}
public static KvEntry createDefaultKvEntry(Argument argument) {
private static KvEntry createDefaultKvEntry(Argument argument) {
String key = argument.getRefEntityKey().getKey();
String defaultValue = argument.getDefaultValue();
if (StringUtils.isBlank(defaultValue)) {
@ -109,9 +101,12 @@ public class CalculatedFieldArgumentUtils {
return new StringDataEntry(key, defaultValue);
}
public static TsKvEntry createDefaultTsKvEntry(Argument argument, long ts) {
return new BasicTsKvEntry(ts, createDefaultKvEntry(argument), DEFAULT_VERSION);
}
public static AttributeKvEntry createDefaultAttributeEntry(Argument argument, long ts) {
KvEntry kvEntry = createDefaultKvEntry(argument);
return new BaseAttributeKvEntry(kvEntry, ts, 0L);
return new BaseAttributeKvEntry(createDefaultKvEntry(argument), ts, DEFAULT_VERSION);
}
public static CalculatedFieldState createStateByType(CalculatedFieldCtx ctx, EntityId entityId) {

Loading…
Cancel
Save