From e80b92184584bf4b381aef0c6d0b14ec00b1abd5 Mon Sep 17 00:00:00 2001 From: dshvaika Date: Fri, 12 Dec 2025 19:21:35 +0200 Subject: [PATCH] handle missing entities on propagation state restore & refactoring --- ...CalculatedFieldEntityMessageProcessor.java | 25 +++---- .../cf/CalculatedFieldProcessingService.java | 4 ++ ...faultCalculatedFieldProcessingService.java | 22 ++++-- .../cf/ctx/state/CalculatedFieldCtx.java | 41 ++++------- ...titiesAggregationCalculatedFieldState.java | 24 +++++-- .../alarm/AlarmCalculatedFieldState.java | 2 - .../GeofencingCalculatedFieldState.java | 19 +++-- .../geofencing/ScheduledRefreshSupported.java | 26 +++++++ .../propagation/PropagationArgumentEntry.java | 50 +++++++++---- .../PropagationCalculatedFieldState.java | 34 ++++++++- .../server/utils/CalculatedFieldUtils.java | 12 +++- .../server/controller/AbstractWebTest.java | 2 +- .../state/PropagationArgumentEntryTest.java | 72 ++++++++++++++++++- .../PropagationCalculatedFieldStateTest.java | 40 ++++++++++- .../utils/CalculatedFieldUtilsTest.java | 5 +- common/proto/src/main/proto/queue.proto | 1 + 16 files changed, 292 insertions(+), 87 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/ScheduledRefreshSupported.java diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 5ac230096b..06cb2073eb 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -242,7 +242,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } if (state instanceof PropagationCalculatedFieldState propagationState) { PropagationArgumentEntry entry = new PropagationArgumentEntry(); - entry.setAdded(msg.getRelatedEntityId()); + entry.setAdded(List.of(msg.getRelatedEntityId())); updatedArgs = propagationState.update(Map.of(PROPAGATION_CONFIG_ARGUMENT, entry), ctx); } if (CollectionsUtil.isEmpty(updatedArgs)) { @@ -421,19 +421,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state == null) { state = createState(ctx); justRestored = true; - } else if (ctx.shouldFetchRelationQueryDynamicArgumentsFromDb(state)) { - log.debug("[{}][{}] Going to update dynamic arguments for CF.", entityId, ctx.getCfId()); - try { - Map dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId); - dynamicArgsFromDb.forEach(newArgValues::putIfAbsent); - if (ctx.getCfType() == CalculatedFieldType.GEOFENCING) { - var geofencingState = (GeofencingCalculatedFieldState) state; - geofencingState.updateLastDynamicArgumentsRefreshTs(); - } - } catch (Exception e) { - throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); - } - } else if (ctx.shouldFetchEntityRelations(state)) { + } else if (ctx.shouldFetchRelatedEntities(state)) { log.debug("[{}][{}] Going to update related entities for CF.", entityId, ctx.getCfId()); try { if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesState) { @@ -447,6 +435,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM justRestored = true; } } + if (state instanceof GeofencingCalculatedFieldState geofencingCalculatedFieldState) { + Map dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId); + dynamicArgsFromDb.forEach(newArgValues::putIfAbsent); + geofencingCalculatedFieldState.updateScheduledRefreshTs(); + } } catch (Exception e) { throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } @@ -476,9 +469,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM state.setCtx(ctx, actorCtx); state.init(false); - if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationQueryDynamicArguments()) { + if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationPathSource()) { GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state; - geofencingState.updateLastDynamicArgumentsRefreshTs(); + geofencingState.updateScheduledRefreshTs(); } Map arguments = fetchArguments(ctx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 53d64e5b27..13fad720b3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -27,9 +27,11 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry; +import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; import java.util.List; import java.util.Map; +import java.util.Optional; public interface CalculatedFieldProcessingService { @@ -37,6 +39,8 @@ public interface CalculatedFieldProcessingService { Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId); + Optional fetchPropagationArgumentFromDb(CalculatedFieldCtx ctx, EntityId entityId); + List fetchRelatedEntities(CalculatedFieldCtx ctx, EntityId entityId); Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 927ba7d1f1..9c9020686b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -23,6 +23,7 @@ import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric; import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration; @@ -49,6 +50,7 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry; +import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; @@ -56,6 +58,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -93,11 +96,20 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF @Override public Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) { - return switch (ctx.getCfType()) { - case GEOFENCING -> resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis())); - case PROPAGATION -> resolveArgumentFutures(Map.of(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId))); - default -> Collections.emptyMap(); - }; + return ctx.getCfType() == CalculatedFieldType.GEOFENCING ? + resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis())) : + Collections.emptyMap(); + } + + @Override + public Optional fetchPropagationArgumentFromDb(CalculatedFieldCtx ctx, EntityId entityId) { + if (ctx.getCfType() != CalculatedFieldType.PROPAGATION) { + return Optional.empty(); + } + PropagationArgumentEntry argumentEntry = (PropagationArgumentEntry) + resolveArgumentValue(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId)); + argumentEntry.setPartitionStateRestore(true); + return Optional.of(argumentEntry); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index b97be4dcd8..8f395a10f3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -63,8 +63,7 @@ import org.thingsboard.server.dao.util.TimeUtils; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; -import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.geofencing.ScheduledRefreshSupported; import org.thingsboard.server.service.telemetry.AlarmSubscriptionService; import java.io.Closeable; @@ -122,7 +121,7 @@ public class CalculatedFieldCtx implements Closeable { private long maxSingleValueArgumentSize; private long intermediateAggregationIntervalMillis; - private boolean relationQueryDynamicArguments; + private boolean relationPathSource; private List mainEntityGeofencingArgumentNames; private List linkedEntityAndCurrentOwnerGeofencingArgumentNames; private List relatedEntityArgumentNames; @@ -161,10 +160,11 @@ public class CalculatedFieldCtx implements Closeable { if (refId == null) { if (CalculatedFieldType.RELATED_ENTITIES_AGGREGATION.equals(cfType)) { relatedEntityArguments.compute(refKey, (key, existingNames) -> CollectionsUtil.addToSet(existingNames, entry.getKey())); + relationPathSource = true; continue; } if (entry.getValue().hasRelationQuerySource()) { - relationQueryDynamicArguments = true; + relationPathSource = true; continue; } if (entry.getValue().hasOwnerSource()) { @@ -201,7 +201,7 @@ public class CalculatedFieldCtx implements Closeable { if (calculatedField.getConfiguration() instanceof PropagationCalculatedFieldConfiguration propagationConfig) { propagationArgument = propagationConfig.toPropagationArgument(); applyExpressionForResolvedArguments = propagationConfig.isApplyExpressionToResolvedArguments(); - relationQueryDynamicArguments = true; + relationPathSource = true; } } if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) { @@ -636,6 +636,7 @@ public class CalculatedFieldCtx implements Closeable { return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId); } + // TODO: Consider to reevaluate if we need to refresh relationPathSource and scheduled update public boolean hasRefreshContextOnlyChanges(CalculatedFieldCtx other) { // has changes that do not require state recalculation if (output != null) { var thisOutputStrategy = output.getStrategy(); @@ -757,38 +758,20 @@ public class CalculatedFieldCtx implements Closeable { return scheduledUpdateIntervalMillis == DISABLED_INTERVAL_VALUE; } - public boolean shouldFetchRelationQueryDynamicArgumentsFromDb(CalculatedFieldState state) { - if (!relationQueryDynamicArguments) { + public boolean shouldFetchRelatedEntities(CalculatedFieldState state) { + if (!relationPathSource) { return false; } - return switch (cfType) { - case PROPAGATION -> true; - case GEOFENCING -> { - if (isScheduledUpdateDisabled()) { - yield false; - } - var geofencingState = (GeofencingCalculatedFieldState) state; - if (geofencingState.getLastDynamicArgumentsRefreshTs() == DEFAULT_LAST_UPDATE_TS) { - yield true; - } - yield geofencingState.getLastDynamicArgumentsRefreshTs() < - System.currentTimeMillis() - scheduledUpdateIntervalMillis; - } - default -> false; - }; - } - - public boolean shouldFetchEntityRelations(CalculatedFieldState state) { - if (!(state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState)) { + if (isScheduledUpdateDisabled()) { return false; } - if (isScheduledUpdateDisabled()) { + if (!(state instanceof ScheduledRefreshSupported scheduledRefreshSupported)) { return false; } - if (relatedEntitiesAggState.getLastRelatedEntitiesRefreshTs() == DEFAULT_LAST_UPDATE_TS) { + if (scheduledRefreshSupported.getLastScheduledRefreshTs() == DEFAULT_LAST_UPDATE_TS) { return true; } - return relatedEntitiesAggState.getLastRelatedEntitiesRefreshTs() < System.currentTimeMillis() - scheduledUpdateIntervalMillis; + return scheduledRefreshSupported.getLastScheduledRefreshTs() < System.currentTimeMillis() - scheduledUpdateIntervalMillis; } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java index 7036e4bd77..c518b6f726 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java @@ -41,6 +41,7 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; import org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.aggregation.function.AggEntry; +import org.thingsboard.server.service.cf.ctx.state.geofencing.ScheduledRefreshSupported; import java.util.ArrayList; import java.util.HashMap; @@ -54,14 +55,14 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx.DISABLED_INTERVAL_VALUE; @Slf4j -@Getter -public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculatedFieldState { +public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculatedFieldState implements ScheduledRefreshSupported { @Setter + @Getter private long lastArgsRefreshTs = DEFAULT_LAST_UPDATE_TS; @Setter + @Getter private long lastMetricsEvalTs = DEFAULT_LAST_UPDATE_TS; - @Setter private long lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS; private long deduplicationIntervalMs = DISABLED_INTERVAL_VALUE; private Map metrics; @@ -103,13 +104,24 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat @Override public void reset() { // must reset everything dependent on arguments super.reset(); + resetScheduledRefreshTs(); lastArgsRefreshTs = DEFAULT_LAST_UPDATE_TS; lastMetricsEvalTs = DEFAULT_LAST_UPDATE_TS; - lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS; metrics = null; } - public void updateLastRelatedEntitiesRefreshTs() { + @Override + public void resetScheduledRefreshTs() { + lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS; + } + + @Override + public long getLastScheduledRefreshTs() { + return lastRelatedEntitiesRefreshTs; + } + + @Override + public void updateScheduledRefreshTs() { lastRelatedEntitiesRefreshTs = System.currentTimeMillis(); } @@ -127,7 +139,7 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat public List checkRelatedEntities(List relatedEntities) { Map> entityInputs = prepareInputs(); findOutdatedEntities(entityInputs, relatedEntities).forEach(this::cleanupEntityData); - updateLastRelatedEntitiesRefreshTs(); + updateScheduledRefreshTs(); return findMissingEntities(entityInputs, relatedEntities); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java index 18629bd370..9dd944c2e4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java @@ -94,8 +94,6 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { private Alarm currentAlarm; private boolean initialFetchDone; - // TODO: deprecate device profile node, describe the differences and improvements - public AlarmCalculatedFieldState(EntityId entityId) { super(entityId); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java index ea47dafa59..e336672877 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java @@ -21,8 +21,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.geo.Coordinates; @@ -52,11 +50,9 @@ import static org.thingsboard.server.common.data.cf.configuration.geofencing.Ent import static org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingPresenceStatus.INSIDE; import static org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingPresenceStatus.OUTSIDE; -@Getter -@Setter @Slf4j @EqualsAndHashCode(callSuper = true) -public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState { +public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState implements ScheduledRefreshSupported { private long lastDynamicArgumentsRefreshTs = DEFAULT_LAST_UPDATE_TS; @@ -147,10 +143,21 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState { @Override public void reset() { super.reset(); + resetScheduledRefreshTs(); + } + + @Override + public void resetScheduledRefreshTs() { lastDynamicArgumentsRefreshTs = DEFAULT_LAST_UPDATE_TS; } - public void updateLastDynamicArgumentsRefreshTs() { + @Override + public long getLastScheduledRefreshTs() { + return lastDynamicArgumentsRefreshTs; + } + + @Override + public void updateScheduledRefreshTs() { lastDynamicArgumentsRefreshTs = System.currentTimeMillis(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/ScheduledRefreshSupported.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/ScheduledRefreshSupported.java new file mode 100644 index 0000000000..f43959443a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/ScheduledRefreshSupported.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state.geofencing; + +public interface ScheduledRefreshSupported { + + void resetScheduledRefreshTs(); + + long getLastScheduledRefreshTs(); + + void updateScheduledRefreshTs(); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java index 0450a0599a..6b1846c18f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java @@ -22,6 +22,8 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,8 +32,9 @@ import java.util.Set; public class PropagationArgumentEntry implements ArgumentEntry { private Set entityIds; - private transient EntityId added; + private transient List added; private transient EntityId removed; + private transient boolean partitionStateRestore; private boolean forceResetPrevious; @@ -39,10 +42,16 @@ public class PropagationArgumentEntry implements ArgumentEntry { this.entityIds = new HashSet<>(); this.added = null; this.removed = null; + this.partitionStateRestore = false; } public PropagationArgumentEntry(List entityIds) { + this(entityIds, false); + } + + public PropagationArgumentEntry(List entityIds, boolean partitionStateRestore) { this.entityIds = new HashSet<>(entityIds); + this.partitionStateRestore = partitionStateRestore; } @Override @@ -57,27 +66,44 @@ public class PropagationArgumentEntry implements ArgumentEntry { @Override public boolean updateEntry(ArgumentEntry entry) { - if (!(entry instanceof PropagationArgumentEntry propagationArgumentEntry)) { + if (!(entry instanceof PropagationArgumentEntry updated)) { throw new IllegalArgumentException("Unsupported argument entry type for propagation argument entry: " + entry.getType()); } - if (propagationArgumentEntry.getAdded() != null) { - boolean updated = entityIds.add(propagationArgumentEntry.getAdded()); - if (updated) { - added = propagationArgumentEntry.getAdded(); - } - return updated; + if (updated.getAdded() != null) { + return checkAdded(updated.getAdded()); } - if (propagationArgumentEntry.getRemoved() != null) { - return entityIds.remove(propagationArgumentEntry.getRemoved()); + if (updated.getRemoved() != null) { + return entityIds.remove(updated.getRemoved()); + } + if (updated.isPartitionStateRestore()) { + Set updatedIds = updated.getEntityIds(); + if (updatedIds.isEmpty()) { + entityIds.clear(); + return false; + } + entityIds.retainAll(updatedIds); + return checkAdded(updatedIds); } - if (propagationArgumentEntry.isEmpty()) { + if (updated.isEmpty()) { entityIds.clear(); return true; } - entityIds = propagationArgumentEntry.getEntityIds(); + entityIds = updated.getEntityIds(); return true; } + private boolean checkAdded(Collection updatedIds) { + for (EntityId id : updatedIds) { + if (entityIds.add(id)) { + if (added == null) { + added = new ArrayList<>(); + } + added.add(id); + } + } + return added != null && !added.isEmpty(); + } + @Override public boolean isEmpty() { return entityIds.isEmpty(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java index c55a227ea0..3fe06ea4c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.util.CollectionsUtil; +import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.PropagationCalculatedFieldResult; import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult; @@ -37,11 +38,15 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState { + private CalculatedFieldProcessingService cfProcessingService; + private ScheduledFuture reevaluationFuture; + public PropagationCalculatedFieldState(EntityId entityId) { super(entityId); } @@ -50,6 +55,7 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) { this.ctx = ctx; this.actorCtx = actorCtx; + this.cfProcessingService = ctx.getCfProcessingService(); this.requiredArguments = new ArrayList<>(ctx.getArgNames()); requiredArguments.add(PROPAGATION_CONFIG_ARGUMENT); this.readinessStatus = checkReadiness(requiredArguments, arguments); @@ -58,6 +64,32 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState } } + @Override + public void init(boolean restored) { + super.init(restored); + if (restored) { + cfProcessingService.fetchPropagationArgumentFromDb(ctx, entityId).ifPresent(fromDb -> { + var update = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx); + if (update.isEmpty()) { + return; + } + ScheduledFuture future = ctx.scheduleReevaluation(0L, actorCtx); + if (future != null) { + reevaluationFuture = future; + } + }); + } + } + + @Override + public void close() { + super.close(); + if (reevaluationFuture != null) { + reevaluationFuture.cancel(true); + reevaluationFuture = null; + } + } + @Override public CalculatedFieldType getType() { return CalculatedFieldType.PROPAGATION; @@ -72,7 +104,7 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState boolean newEntityAdded = propagationArgumentEntry.getAdded() != null; List entityIds; if (newEntityAdded) { - entityIds = List.of(propagationArgumentEntry.getAdded()); + entityIds = propagationArgumentEntry.getAdded(); propagationArgumentEntry.setAdded(null); } else { if (propagationArgumentEntry.getEntityIds().isEmpty()) { diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 7046af3d9c..c644af190f 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -33,6 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ArgumentIntervalProt import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.EntityIdProto; import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto; import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; @@ -61,6 +62,7 @@ import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgume import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; @@ -108,6 +110,7 @@ public class CalculatedFieldUtils { case SINGLE_VALUE -> builder.addSingleValueArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) argEntry)); case TS_ROLLING -> builder.addRollingValueArguments(toRollingArgumentProto(argName, (TsRollingArgumentEntry) argEntry)); case GEOFENCING -> builder.addGeofencingArguments(toGeofencingArgumentProto(argName, (GeofencingArgumentEntry) argEntry)); + case PROPAGATION -> builder.addAllPropagationEntityIds(toPropagationEntityIdsProto((PropagationArgumentEntry) argEntry)); case RELATED_ENTITIES -> { RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = (RelatedEntitiesArgumentEntry) argEntry; relatedEntitiesArgumentEntry.getEntityInputs() @@ -136,6 +139,10 @@ public class CalculatedFieldUtils { return builder.build(); } + private static List toPropagationEntityIdsProto(PropagationArgumentEntry argEntry) { + return argEntry.getEntityIds().stream().map(ProtoUtils::toProto).collect(Collectors.toList()); + } + private static AlarmRuleStateProto toAlarmRuleStateProto(AlarmRuleState ruleState) { return AlarmRuleStateProto.newBuilder() .setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse("")) @@ -268,7 +275,10 @@ public class CalculatedFieldUtils { state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto))); case GEOFENCING -> proto.getGeofencingArgumentsList().forEach(argProto -> state.getArguments().put(argProto.getArgName(), fromGeofencingArgumentProto(argProto))); - case PROPAGATION -> state.getArguments().put(PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry()); + case PROPAGATION -> { + List propagationEntityIds = proto.getPropagationEntityIdsList().stream().map(ProtoUtils::fromProto).toList(); + state.getArguments().put(PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(propagationEntityIds)); + } case ALARM -> { AlarmCalculatedFieldState alarmState = (AlarmCalculatedFieldState) state; AlarmStateProto alarmStateProto = proto.getAlarmState(); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 80e79280df..3de93f4811 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -1209,7 +1209,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { Map statesMap = (Map) ReflectionTestUtils.getField(processor, "states"); Awaitility.await("CF state for entity actor ready to refresh dynamic arguments").atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> { CalculatedFieldState calculatedFieldState = statesMap.get(cfId); - boolean isReady = calculatedFieldState != null && ((GeofencingCalculatedFieldState) calculatedFieldState).getLastDynamicArgumentsRefreshTs() < + boolean isReady = calculatedFieldState != null && ((GeofencingCalculatedFieldState) calculatedFieldState).getLastScheduledRefreshTs() < System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(scheduledUpdateInterval); log.warn("entityId {}, cfId {}, state ready to refresh == {}", entityId, cfId, isReady); return isReady; diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java index bf6a112e72..dd5dfda79a 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java @@ -104,19 +104,19 @@ public class PropagationArgumentEntryTest { @Test void testUpdateEntryWhenAdded() { var added = new PropagationArgumentEntry(); - added.setAdded(ENTITY_3_ID); + added.setAdded(List.of(ENTITY_3_ID)); boolean changed = entry.updateEntry(added); assertThat(changed).isTrue(); assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID); - assertThat(entry.getAdded()).isEqualTo(ENTITY_3_ID); + assertThat(entry.getAdded()).isEqualTo(List.of(ENTITY_3_ID)); } @Test void testUpdateEntryWhenAddedExistingEntity() { var added = new PropagationArgumentEntry(); - added.setAdded(ENTITY_2_ID); + added.setAdded(List.of(ENTITY_2_ID)); boolean changed = entry.updateEntry(added); @@ -149,6 +149,72 @@ public class PropagationArgumentEntryTest { assertThat(entry.getRemoved()).isNull(); } + @Test + void testUpdateEntryWhenPartitionStateRestoreAddsMissingIds() { + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID), true); + + boolean changed = entry.updateEntry(restore); + + assertThat(changed).isTrue(); + assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID); + assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID); + assertThat(entry.getRemoved()).isNull(); + assertThat(entry.isPartitionStateRestore()).isFalse(); + } + + @Test + void testUpdateEntryWhenPartitionStateRestoreRemovesStaleIds() { + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID), true); + + boolean changed = entry.updateEntry(restore); + + assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op + assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID); + assertThat(entry.getAdded()).isNull(); + assertThat(entry.getRemoved()).isNull(); + assertThat(entry.isPartitionStateRestore()).isFalse(); + } + + @Test + void testUpdateEntryWhenPartitionStateRestoreAddsAndRemoves() { + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID), true); + + boolean changed = entry.updateEntry(restore); + + assertThat(changed).isTrue(); + assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_3_ID); + assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID); + assertThat(entry.getRemoved()).isNull(); + assertThat(entry.isPartitionStateRestore()).isFalse(); + } + + + @Test + void testUpdateEntryWhenPartitionStateRestoreNoChanges() { + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID), true); + + boolean changed = entry.updateEntry(restore); + + assertThat(changed).isFalse(); + assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID); + assertThat(entry.getAdded()).isNull(); + assertThat(entry.getRemoved()).isNull(); + assertThat(entry.isPartitionStateRestore()).isFalse(); + } + + @Test + void testUpdateEntryWhenPartitionStateRestoreEmptySet() { + var restore = new PropagationArgumentEntry(List.of(), true); + + boolean changed = entry.updateEntry(restore); + + assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op + assertThat(entry.getEntityIds()).isEmpty(); + assertThat(entry.getAdded()).isNull(); + assertThat(entry.getRemoved()).isNull(); + assertThat(entry.isPartitionStateRestore()).isFalse(); + } + @Test @SuppressWarnings("unchecked") void testToTbelCfArgWithValues() { diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java index 346dbbcb7c..a9bf6d54d1 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java @@ -47,6 +47,7 @@ import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.RelationPathLevel; import org.thingsboard.server.common.stats.DefaultStatsFactory; import org.thingsboard.server.dao.usagerecord.ApiLimitService; +import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.PropagationCalculatedFieldResult; import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult; import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; @@ -57,12 +58,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; @@ -105,15 +111,19 @@ public class PropagationCalculatedFieldStateTest { @MockitoBean private ActorSystemContext actorSystemContext; + @MockitoBean + private CalculatedFieldProcessingService cfProcessingService; + @BeforeEach void setUp() { when(actorSystemContext.getTbelInvokeService()).thenReturn(tbelInvokeService); when(actorSystemContext.getApiLimitService()).thenReturn(apiLimitService); + when(actorSystemContext.getCalculatedFieldProcessingService()).thenReturn(cfProcessingService); when(apiLimitService.getLimit(any(), any())).thenReturn(1000L); } void initCtxAndState(boolean applyExpressionToResolvedArguments) { - ctx = new CalculatedFieldCtx(getCalculatedField(applyExpressionToResolvedArguments), actorSystemContext); + ctx = spy(new CalculatedFieldCtx(getCalculatedField(applyExpressionToResolvedArguments), actorSystemContext)); ctx.init(); state = new PropagationCalculatedFieldState(ctx.getEntityId()); @@ -287,7 +297,7 @@ public class PropagationCalculatedFieldStateTest { AssetId newEntityId = new AssetId(UUID.fromString("83e2c962-eeae-4708-984e-e6a24760f9c3")); PropagationArgumentEntry propagationArgumentEntry = new PropagationArgumentEntry(); - propagationArgumentEntry.setAdded(newEntityId); + propagationArgumentEntry.setAdded(List.of(newEntityId)); Map updated = state.update(Map.of(PROPAGATION_CONFIG_ARGUMENT, propagationArgumentEntry), ctx); assertThat(updated).isNotNull().containsEntry(PROPAGATION_CONFIG_ARGUMENT, propagationArgumentEntry); @@ -299,6 +309,32 @@ public class PropagationCalculatedFieldStateTest { assertThat(propagationCalculatedFieldResult.getResult().getResult()).isEqualTo(JacksonUtil.newObjectNode().put(TEMPERATURE_ARGUMENT_NAME, TEMPERATURE_VALUE)); } + @Test + void testPropapagationStateInitWithRestoredSetToFalse() { + initCtxAndState(false); + verify(cfProcessingService, never()).fetchPropagationArgumentFromDb(any(), any()); + verify(ctx, never()).scheduleReevaluation(anyLong(), any()); + } + + @Test + void testPropapagationStateInitWithRestoredSetToTrue() { + initCtxAndState(false); + Map initArgs = Map.of( + TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, + HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry, + PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(Collections.emptyList()) + ); + state.update(initArgs, ctx); + assertThat(state.isReady()).isFalse(); + + when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(propagationArgEntry)); + + state.init(true); + + verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId()); + verify(ctx).scheduleReevaluation(0L, state.getActorCtx()); + } + private CalculatedField getCalculatedField(boolean applyExpressionToResolvedArguments) { CalculatedField calculatedField = new CalculatedField(); calculatedField.setTenantId(TENANT_ID); diff --git a/application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java b/application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java index 83538fe07c..f4bb9bc4c9 100644 --- a/application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java +++ b/application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java @@ -119,7 +119,7 @@ class CalculatedFieldUtilsTest { } @Test - void toProtoAndFromProto_shouldCreatePropagationStateWithEmptyPropagationArgument() { + void toProtoAndFromProto_shouldCreatePropagationStateWithNotEmptyPropagationArgument() { // given CalculatedFieldEntityCtxId stateId = mock(CalculatedFieldEntityCtxId.class); given(stateId.tenantId()).willReturn(TENANT_ID); @@ -158,8 +158,7 @@ class CalculatedFieldUtilsTest { assertThat(propagationState.getEntityId()).isEqualTo(DEVICE_ID); assertThat(propagationState.getArguments()).isNotNull(); - assertThat(propagationState.getArguments().get(PROPAGATION_CONFIG_ARGUMENT)).isNotNull(); - assertThat(propagationState.getArguments().get(PROPAGATION_CONFIG_ARGUMENT).isEmpty()).isTrue(); + assertThat(propagationState.getArguments().get(PROPAGATION_CONFIG_ARGUMENT)).isEqualTo(propagationArgumentEntry); assertThat(propagationState.getArguments().get("state")).isNotNull().isEqualTo(singleValueArgumentEntry); assertThat(propagationState.getRequiredArguments()).isNull(); assertThat(propagationState.getReadinessStatus()).isNull(); diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 9fb8528bce..3cbc84ba1a 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -935,6 +935,7 @@ message CalculatedFieldStateProto { int64 lastArgsUpdateTs = 7; int64 lastMetricsEvalTs = 8; repeated ArgumentIntervalProto aggregationArguments = 9; + repeated EntityIdProto propagationEntityIds = 10; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.