Browse Source

handle missing entities on propagation state restore & refactoring

pull/14560/head
dshvaika 6 months ago
parent
commit
e80b921845
  1. 25
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 4
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java
  3. 22
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  4. 41
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  5. 24
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java
  6. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java
  7. 19
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java
  8. 26
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/ScheduledRefreshSupported.java
  9. 50
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java
  10. 34
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java
  11. 12
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java
  12. 2
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  13. 72
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java
  14. 40
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java
  15. 5
      application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java
  16. 1
      common/proto/src/main/proto/queue.proto

25
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -242,7 +242,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
if (state instanceof PropagationCalculatedFieldState propagationState) {
PropagationArgumentEntry entry = new PropagationArgumentEntry();
entry.setAdded(msg.getRelatedEntityId());
entry.setAdded(List.of(msg.getRelatedEntityId()));
updatedArgs = propagationState.update(Map.of(PROPAGATION_CONFIG_ARGUMENT, entry), ctx);
}
if (CollectionsUtil.isEmpty(updatedArgs)) {
@ -421,19 +421,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state == null) {
state = createState(ctx);
justRestored = true;
} else if (ctx.shouldFetchRelationQueryDynamicArgumentsFromDb(state)) {
log.debug("[{}][{}] Going to update dynamic arguments for CF.", entityId, ctx.getCfId());
try {
Map<String, ArgumentEntry> dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId);
dynamicArgsFromDb.forEach(newArgValues::putIfAbsent);
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING) {
var geofencingState = (GeofencingCalculatedFieldState) state;
geofencingState.updateLastDynamicArgumentsRefreshTs();
}
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
} else if (ctx.shouldFetchEntityRelations(state)) {
} else if (ctx.shouldFetchRelatedEntities(state)) {
log.debug("[{}][{}] Going to update related entities for CF.", entityId, ctx.getCfId());
try {
if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesState) {
@ -447,6 +435,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
justRestored = true;
}
}
if (state instanceof GeofencingCalculatedFieldState geofencingCalculatedFieldState) {
Map<String, ArgumentEntry> dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId);
dynamicArgsFromDb.forEach(newArgValues::putIfAbsent);
geofencingCalculatedFieldState.updateScheduledRefreshTs();
}
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
@ -476,9 +469,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
state.setCtx(ctx, actorCtx);
state.init(false);
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationQueryDynamicArguments()) {
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationPathSource()) {
GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state;
geofencingState.updateLastDynamicArgumentsRefreshTs();
geofencingState.updateScheduledRefreshTs();
}
Map<String, ArgumentEntry> arguments = fetchArguments(ctx);

4
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java

@ -27,9 +27,11 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface CalculatedFieldProcessingService {
@ -37,6 +39,8 @@ public interface CalculatedFieldProcessingService {
Map<String, ArgumentEntry> fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId);
Optional<PropagationArgumentEntry> fetchPropagationArgumentFromDb(CalculatedFieldCtx ctx, EntityId entityId);
List<EntityId> fetchRelatedEntities(CalculatedFieldCtx ctx, EntityId entityId);
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments);

22
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -23,6 +23,7 @@ import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.aggregation.AggMetric;
import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration;
@ -49,6 +50,7 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.AggIntervalEntry;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.ArrayList;
@ -56,6 +58,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@ -93,11 +96,20 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
@Override
public Map<String, ArgumentEntry> fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) {
return switch (ctx.getCfType()) {
case GEOFENCING -> resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis()));
case PROPAGATION -> resolveArgumentFutures(Map.of(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId)));
default -> Collections.emptyMap();
};
return ctx.getCfType() == CalculatedFieldType.GEOFENCING ?
resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis())) :
Collections.emptyMap();
}
@Override
public Optional<PropagationArgumentEntry> fetchPropagationArgumentFromDb(CalculatedFieldCtx ctx, EntityId entityId) {
if (ctx.getCfType() != CalculatedFieldType.PROPAGATION) {
return Optional.empty();
}
PropagationArgumentEntry argumentEntry = (PropagationArgumentEntry)
resolveArgumentValue(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId));
argumentEntry.setPartitionStateRestore(true);
return Optional.of(argumentEntry);
}
@Override

41
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -63,8 +63,7 @@ import org.thingsboard.server.dao.util.TimeUtils;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.ScheduledRefreshSupported;
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
import java.io.Closeable;
@ -122,7 +121,7 @@ public class CalculatedFieldCtx implements Closeable {
private long maxSingleValueArgumentSize;
private long intermediateAggregationIntervalMillis;
private boolean relationQueryDynamicArguments;
private boolean relationPathSource;
private List<String> mainEntityGeofencingArgumentNames;
private List<String> linkedEntityAndCurrentOwnerGeofencingArgumentNames;
private List<String> relatedEntityArgumentNames;
@ -161,10 +160,11 @@ public class CalculatedFieldCtx implements Closeable {
if (refId == null) {
if (CalculatedFieldType.RELATED_ENTITIES_AGGREGATION.equals(cfType)) {
relatedEntityArguments.compute(refKey, (key, existingNames) -> CollectionsUtil.addToSet(existingNames, entry.getKey()));
relationPathSource = true;
continue;
}
if (entry.getValue().hasRelationQuerySource()) {
relationQueryDynamicArguments = true;
relationPathSource = true;
continue;
}
if (entry.getValue().hasOwnerSource()) {
@ -201,7 +201,7 @@ public class CalculatedFieldCtx implements Closeable {
if (calculatedField.getConfiguration() instanceof PropagationCalculatedFieldConfiguration propagationConfig) {
propagationArgument = propagationConfig.toPropagationArgument();
applyExpressionForResolvedArguments = propagationConfig.isApplyExpressionToResolvedArguments();
relationQueryDynamicArguments = true;
relationPathSource = true;
}
}
if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) {
@ -636,6 +636,7 @@ public class CalculatedFieldCtx implements Closeable {
return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId);
}
// TODO: Consider to reevaluate if we need to refresh relationPathSource and scheduled update
public boolean hasRefreshContextOnlyChanges(CalculatedFieldCtx other) { // has changes that do not require state recalculation
if (output != null) {
var thisOutputStrategy = output.getStrategy();
@ -757,38 +758,20 @@ public class CalculatedFieldCtx implements Closeable {
return scheduledUpdateIntervalMillis == DISABLED_INTERVAL_VALUE;
}
public boolean shouldFetchRelationQueryDynamicArgumentsFromDb(CalculatedFieldState state) {
if (!relationQueryDynamicArguments) {
public boolean shouldFetchRelatedEntities(CalculatedFieldState state) {
if (!relationPathSource) {
return false;
}
return switch (cfType) {
case PROPAGATION -> true;
case GEOFENCING -> {
if (isScheduledUpdateDisabled()) {
yield false;
}
var geofencingState = (GeofencingCalculatedFieldState) state;
if (geofencingState.getLastDynamicArgumentsRefreshTs() == DEFAULT_LAST_UPDATE_TS) {
yield true;
}
yield geofencingState.getLastDynamicArgumentsRefreshTs() <
System.currentTimeMillis() - scheduledUpdateIntervalMillis;
}
default -> false;
};
}
public boolean shouldFetchEntityRelations(CalculatedFieldState state) {
if (!(state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState)) {
if (isScheduledUpdateDisabled()) {
return false;
}
if (isScheduledUpdateDisabled()) {
if (!(state instanceof ScheduledRefreshSupported scheduledRefreshSupported)) {
return false;
}
if (relatedEntitiesAggState.getLastRelatedEntitiesRefreshTs() == DEFAULT_LAST_UPDATE_TS) {
if (scheduledRefreshSupported.getLastScheduledRefreshTs() == DEFAULT_LAST_UPDATE_TS) {
return true;
}
return relatedEntitiesAggState.getLastRelatedEntitiesRefreshTs() < System.currentTimeMillis() - scheduledUpdateIntervalMillis;
return scheduledRefreshSupported.getLastScheduledRefreshTs() < System.currentTimeMillis() - scheduledUpdateIntervalMillis;
}
@Override

24
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java

@ -41,6 +41,7 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType;
import org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.aggregation.function.AggEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.ScheduledRefreshSupported;
import java.util.ArrayList;
import java.util.HashMap;
@ -54,14 +55,14 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx.DISABLED_INTERVAL_VALUE;
@Slf4j
@Getter
public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculatedFieldState {
public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculatedFieldState implements ScheduledRefreshSupported {
@Setter
@Getter
private long lastArgsRefreshTs = DEFAULT_LAST_UPDATE_TS;
@Setter
@Getter
private long lastMetricsEvalTs = DEFAULT_LAST_UPDATE_TS;
@Setter
private long lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS;
private long deduplicationIntervalMs = DISABLED_INTERVAL_VALUE;
private Map<String, AggMetric> metrics;
@ -103,13 +104,24 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat
@Override
public void reset() { // must reset everything dependent on arguments
super.reset();
resetScheduledRefreshTs();
lastArgsRefreshTs = DEFAULT_LAST_UPDATE_TS;
lastMetricsEvalTs = DEFAULT_LAST_UPDATE_TS;
lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS;
metrics = null;
}
public void updateLastRelatedEntitiesRefreshTs() {
@Override
public void resetScheduledRefreshTs() {
lastRelatedEntitiesRefreshTs = DEFAULT_LAST_UPDATE_TS;
}
@Override
public long getLastScheduledRefreshTs() {
return lastRelatedEntitiesRefreshTs;
}
@Override
public void updateScheduledRefreshTs() {
lastRelatedEntitiesRefreshTs = System.currentTimeMillis();
}
@ -127,7 +139,7 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat
public List<EntityId> checkRelatedEntities(List<EntityId> relatedEntities) {
Map<EntityId, Map<String, ArgumentEntry>> entityInputs = prepareInputs();
findOutdatedEntities(entityInputs, relatedEntities).forEach(this::cleanupEntityData);
updateLastRelatedEntitiesRefreshTs();
updateScheduledRefreshTs();
return findMissingEntities(entityInputs, relatedEntities);
}

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java

@ -94,8 +94,6 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
private Alarm currentAlarm;
private boolean initialFetchDone;
// TODO: deprecate device profile node, describe the differences and improvements
public AlarmCalculatedFieldState(EntityId entityId) {
super(entityId);
}

19
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java

@ -21,8 +21,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.geo.Coordinates;
@ -52,11 +50,9 @@ import static org.thingsboard.server.common.data.cf.configuration.geofencing.Ent
import static org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingPresenceStatus.INSIDE;
import static org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingPresenceStatus.OUTSIDE;
@Getter
@Setter
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState {
public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState implements ScheduledRefreshSupported {
private long lastDynamicArgumentsRefreshTs = DEFAULT_LAST_UPDATE_TS;
@ -147,10 +143,21 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState {
@Override
public void reset() {
super.reset();
resetScheduledRefreshTs();
}
@Override
public void resetScheduledRefreshTs() {
lastDynamicArgumentsRefreshTs = DEFAULT_LAST_UPDATE_TS;
}
public void updateLastDynamicArgumentsRefreshTs() {
@Override
public long getLastScheduledRefreshTs() {
return lastDynamicArgumentsRefreshTs;
}
@Override
public void updateScheduledRefreshTs() {
lastDynamicArgumentsRefreshTs = System.currentTimeMillis();
}

26
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/ScheduledRefreshSupported.java

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state.geofencing;
public interface ScheduledRefreshSupported {
void resetScheduledRefreshTs();
long getLastScheduledRefreshTs();
void updateScheduledRefreshTs();
}

50
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java

@ -22,6 +22,8 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -30,8 +32,9 @@ import java.util.Set;
public class PropagationArgumentEntry implements ArgumentEntry {
private Set<EntityId> entityIds;
private transient EntityId added;
private transient List<EntityId> added;
private transient EntityId removed;
private transient boolean partitionStateRestore;
private boolean forceResetPrevious;
@ -39,10 +42,16 @@ public class PropagationArgumentEntry implements ArgumentEntry {
this.entityIds = new HashSet<>();
this.added = null;
this.removed = null;
this.partitionStateRestore = false;
}
public PropagationArgumentEntry(List<EntityId> entityIds) {
this(entityIds, false);
}
public PropagationArgumentEntry(List<EntityId> entityIds, boolean partitionStateRestore) {
this.entityIds = new HashSet<>(entityIds);
this.partitionStateRestore = partitionStateRestore;
}
@Override
@ -57,27 +66,44 @@ public class PropagationArgumentEntry implements ArgumentEntry {
@Override
public boolean updateEntry(ArgumentEntry entry) {
if (!(entry instanceof PropagationArgumentEntry propagationArgumentEntry)) {
if (!(entry instanceof PropagationArgumentEntry updated)) {
throw new IllegalArgumentException("Unsupported argument entry type for propagation argument entry: " + entry.getType());
}
if (propagationArgumentEntry.getAdded() != null) {
boolean updated = entityIds.add(propagationArgumentEntry.getAdded());
if (updated) {
added = propagationArgumentEntry.getAdded();
}
return updated;
if (updated.getAdded() != null) {
return checkAdded(updated.getAdded());
}
if (propagationArgumentEntry.getRemoved() != null) {
return entityIds.remove(propagationArgumentEntry.getRemoved());
if (updated.getRemoved() != null) {
return entityIds.remove(updated.getRemoved());
}
if (updated.isPartitionStateRestore()) {
Set<EntityId> updatedIds = updated.getEntityIds();
if (updatedIds.isEmpty()) {
entityIds.clear();
return false;
}
entityIds.retainAll(updatedIds);
return checkAdded(updatedIds);
}
if (propagationArgumentEntry.isEmpty()) {
if (updated.isEmpty()) {
entityIds.clear();
return true;
}
entityIds = propagationArgumentEntry.getEntityIds();
entityIds = updated.getEntityIds();
return true;
}
private boolean checkAdded(Collection<EntityId> updatedIds) {
for (EntityId id : updatedIds) {
if (entityIds.add(id)) {
if (added == null) {
added = new ArrayList<>();
}
added.add(id);
}
}
return added != null && !added.isEmpty();
}
@Override
public boolean isEmpty() {
return entityIds.isEmpty();

34
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.PropagationCalculatedFieldResult;
import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult;
@ -37,11 +38,15 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState {
private CalculatedFieldProcessingService cfProcessingService;
private ScheduledFuture<?> reevaluationFuture;
public PropagationCalculatedFieldState(EntityId entityId) {
super(entityId);
}
@ -50,6 +55,7 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) {
this.ctx = ctx;
this.actorCtx = actorCtx;
this.cfProcessingService = ctx.getCfProcessingService();
this.requiredArguments = new ArrayList<>(ctx.getArgNames());
requiredArguments.add(PROPAGATION_CONFIG_ARGUMENT);
this.readinessStatus = checkReadiness(requiredArguments, arguments);
@ -58,6 +64,32 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
}
}
@Override
public void init(boolean restored) {
super.init(restored);
if (restored) {
cfProcessingService.fetchPropagationArgumentFromDb(ctx, entityId).ifPresent(fromDb -> {
var update = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx);
if (update.isEmpty()) {
return;
}
ScheduledFuture<?> future = ctx.scheduleReevaluation(0L, actorCtx);
if (future != null) {
reevaluationFuture = future;
}
});
}
}
@Override
public void close() {
super.close();
if (reevaluationFuture != null) {
reevaluationFuture.cancel(true);
reevaluationFuture = null;
}
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.PROPAGATION;
@ -72,7 +104,7 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
boolean newEntityAdded = propagationArgumentEntry.getAdded() != null;
List<EntityId> entityIds;
if (newEntityAdded) {
entityIds = List.of(propagationArgumentEntry.getAdded());
entityIds = propagationArgumentEntry.getAdded();
propagationArgumentEntry.setAdded(null);
} else {
if (propagationArgumentEntry.getEntityIds().isEmpty()) {

12
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java

@ -33,6 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ArgumentIntervalProt
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.EntityIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto;
import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto;
@ -61,6 +62,7 @@ import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgume
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
@ -108,6 +110,7 @@ public class CalculatedFieldUtils {
case SINGLE_VALUE -> builder.addSingleValueArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) argEntry));
case TS_ROLLING -> builder.addRollingValueArguments(toRollingArgumentProto(argName, (TsRollingArgumentEntry) argEntry));
case GEOFENCING -> builder.addGeofencingArguments(toGeofencingArgumentProto(argName, (GeofencingArgumentEntry) argEntry));
case PROPAGATION -> builder.addAllPropagationEntityIds(toPropagationEntityIdsProto((PropagationArgumentEntry) argEntry));
case RELATED_ENTITIES -> {
RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = (RelatedEntitiesArgumentEntry) argEntry;
relatedEntitiesArgumentEntry.getEntityInputs()
@ -136,6 +139,10 @@ public class CalculatedFieldUtils {
return builder.build();
}
private static List<EntityIdProto> toPropagationEntityIdsProto(PropagationArgumentEntry argEntry) {
return argEntry.getEntityIds().stream().map(ProtoUtils::toProto).collect(Collectors.toList());
}
private static AlarmRuleStateProto toAlarmRuleStateProto(AlarmRuleState ruleState) {
return AlarmRuleStateProto.newBuilder()
.setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse(""))
@ -268,7 +275,10 @@ public class CalculatedFieldUtils {
state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto)));
case GEOFENCING -> proto.getGeofencingArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getArgName(), fromGeofencingArgumentProto(argProto)));
case PROPAGATION -> state.getArguments().put(PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry());
case PROPAGATION -> {
List<EntityId> propagationEntityIds = proto.getPropagationEntityIdsList().stream().map(ProtoUtils::fromProto).toList();
state.getArguments().put(PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(propagationEntityIds));
}
case ALARM -> {
AlarmCalculatedFieldState alarmState = (AlarmCalculatedFieldState) state;
AlarmStateProto alarmStateProto = proto.getAlarmState();

2
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -1209,7 +1209,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
Map<CalculatedFieldId, CalculatedFieldState> statesMap = (Map<CalculatedFieldId, CalculatedFieldState>) ReflectionTestUtils.getField(processor, "states");
Awaitility.await("CF state for entity actor ready to refresh dynamic arguments").atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> {
CalculatedFieldState calculatedFieldState = statesMap.get(cfId);
boolean isReady = calculatedFieldState != null && ((GeofencingCalculatedFieldState) calculatedFieldState).getLastDynamicArgumentsRefreshTs() <
boolean isReady = calculatedFieldState != null && ((GeofencingCalculatedFieldState) calculatedFieldState).getLastScheduledRefreshTs() <
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(scheduledUpdateInterval);
log.warn("entityId {}, cfId {}, state ready to refresh == {}", entityId, cfId, isReady);
return isReady;

72
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java

@ -104,19 +104,19 @@ public class PropagationArgumentEntryTest {
@Test
void testUpdateEntryWhenAdded() {
var added = new PropagationArgumentEntry();
added.setAdded(ENTITY_3_ID);
added.setAdded(List.of(ENTITY_3_ID));
boolean changed = entry.updateEntry(added);
assertThat(changed).isTrue();
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).isEqualTo(ENTITY_3_ID);
assertThat(entry.getAdded()).isEqualTo(List.of(ENTITY_3_ID));
}
@Test
void testUpdateEntryWhenAddedExistingEntity() {
var added = new PropagationArgumentEntry();
added.setAdded(ENTITY_2_ID);
added.setAdded(List.of(ENTITY_2_ID));
boolean changed = entry.updateEntry(added);
@ -149,6 +149,72 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getRemoved()).isNull();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreAddsMissingIds() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID), true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isTrue();
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID);
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreRemovesStaleIds() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID), true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID);
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreAddsAndRemoves() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID), true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isTrue();
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID);
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreNoChanges() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID), true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isFalse();
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID);
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreEmptySet() {
var restore = new PropagationArgumentEntry(List.of(), true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op
assertThat(entry.getEntityIds()).isEmpty();
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
}
@Test
@SuppressWarnings("unchecked")
void testToTbelCfArgWithValues() {

40
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java

@ -47,6 +47,7 @@ import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationPathLevel;
import org.thingsboard.server.common.stats.DefaultStatsFactory;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.PropagationCalculatedFieldResult;
import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry;
@ -57,12 +58,17 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
@ -105,15 +111,19 @@ public class PropagationCalculatedFieldStateTest {
@MockitoBean
private ActorSystemContext actorSystemContext;
@MockitoBean
private CalculatedFieldProcessingService cfProcessingService;
@BeforeEach
void setUp() {
when(actorSystemContext.getTbelInvokeService()).thenReturn(tbelInvokeService);
when(actorSystemContext.getApiLimitService()).thenReturn(apiLimitService);
when(actorSystemContext.getCalculatedFieldProcessingService()).thenReturn(cfProcessingService);
when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
}
void initCtxAndState(boolean applyExpressionToResolvedArguments) {
ctx = new CalculatedFieldCtx(getCalculatedField(applyExpressionToResolvedArguments), actorSystemContext);
ctx = spy(new CalculatedFieldCtx(getCalculatedField(applyExpressionToResolvedArguments), actorSystemContext));
ctx.init();
state = new PropagationCalculatedFieldState(ctx.getEntityId());
@ -287,7 +297,7 @@ public class PropagationCalculatedFieldStateTest {
AssetId newEntityId = new AssetId(UUID.fromString("83e2c962-eeae-4708-984e-e6a24760f9c3"));
PropagationArgumentEntry propagationArgumentEntry = new PropagationArgumentEntry();
propagationArgumentEntry.setAdded(newEntityId);
propagationArgumentEntry.setAdded(List.of(newEntityId));
Map<String, ArgumentEntry> updated = state.update(Map.of(PROPAGATION_CONFIG_ARGUMENT, propagationArgumentEntry), ctx);
assertThat(updated).isNotNull().containsEntry(PROPAGATION_CONFIG_ARGUMENT, propagationArgumentEntry);
@ -299,6 +309,32 @@ public class PropagationCalculatedFieldStateTest {
assertThat(propagationCalculatedFieldResult.getResult().getResult()).isEqualTo(JacksonUtil.newObjectNode().put(TEMPERATURE_ARGUMENT_NAME, TEMPERATURE_VALUE));
}
@Test
void testPropapagationStateInitWithRestoredSetToFalse() {
initCtxAndState(false);
verify(cfProcessingService, never()).fetchPropagationArgumentFromDb(any(), any());
verify(ctx, never()).scheduleReevaluation(anyLong(), any());
}
@Test
void testPropapagationStateInitWithRestoredSetToTrue() {
initCtxAndState(false);
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry,
PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(Collections.emptyList())
);
state.update(initArgs, ctx);
assertThat(state.isReady()).isFalse();
when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(propagationArgEntry));
state.init(true);
verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId());
verify(ctx).scheduleReevaluation(0L, state.getActorCtx());
}
private CalculatedField getCalculatedField(boolean applyExpressionToResolvedArguments) {
CalculatedField calculatedField = new CalculatedField();
calculatedField.setTenantId(TENANT_ID);

5
application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java

@ -119,7 +119,7 @@ class CalculatedFieldUtilsTest {
}
@Test
void toProtoAndFromProto_shouldCreatePropagationStateWithEmptyPropagationArgument() {
void toProtoAndFromProto_shouldCreatePropagationStateWithNotEmptyPropagationArgument() {
// given
CalculatedFieldEntityCtxId stateId = mock(CalculatedFieldEntityCtxId.class);
given(stateId.tenantId()).willReturn(TENANT_ID);
@ -158,8 +158,7 @@ class CalculatedFieldUtilsTest {
assertThat(propagationState.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(propagationState.getArguments()).isNotNull();
assertThat(propagationState.getArguments().get(PROPAGATION_CONFIG_ARGUMENT)).isNotNull();
assertThat(propagationState.getArguments().get(PROPAGATION_CONFIG_ARGUMENT).isEmpty()).isTrue();
assertThat(propagationState.getArguments().get(PROPAGATION_CONFIG_ARGUMENT)).isEqualTo(propagationArgumentEntry);
assertThat(propagationState.getArguments().get("state")).isNotNull().isEqualTo(singleValueArgumentEntry);
assertThat(propagationState.getRequiredArguments()).isNull();
assertThat(propagationState.getReadinessStatus()).isNull();

1
common/proto/src/main/proto/queue.proto

@ -935,6 +935,7 @@ message CalculatedFieldStateProto {
int64 lastArgsUpdateTs = 7;
int64 lastMetricsEvalTs = 8;
repeated ArgumentIntervalProto aggregationArguments = 9;
repeated EntityIdProto propagationEntityIds = 10;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.

Loading…
Cancel
Save