From 3bb215f15906a69e85cd089a60c8a467fda04c36 Mon Sep 17 00:00:00 2001 From: dshvaika Date: Tue, 16 Dec 2025 13:37:19 +0200 Subject: [PATCH] fixes afrer review --- .../ctx/state/BaseCalculatedFieldState.java | 10 +++--- .../cf/ctx/state/CalculatedFieldCtx.java | 1 - .../propagation/PropagationArgumentEntry.java | 12 ++----- .../PropagationCalculatedFieldState.java | 32 ++++++------------- .../state/PropagationArgumentEntryTest.java | 25 +++++++++------ 5 files changed, 32 insertions(+), 48 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 5173c48892..754fc2f6ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -63,7 +63,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState, this.ctx = ctx; this.actorCtx = actorCtx; this.requiredArguments = ctx.getArgNames(); - this.readinessStatus = checkReadiness(requiredArguments, arguments); + this.readinessStatus = checkReadiness(); } @Override @@ -108,7 +108,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState, if (updatedArguments == null) { return Collections.emptyMap(); } - readinessStatus = checkReadiness(requiredArguments, arguments); + readinessStatus = checkReadiness(); return updatedArguments; } @@ -183,13 +183,13 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState, return latestTs; } - protected ReadinessStatus checkReadiness(List requiredArguments, Map currentArguments) { - if (currentArguments == null) { + protected ReadinessStatus checkReadiness() { + if (arguments == null) { return ReadinessStatus.from(requiredArguments); } List emptyArguments = null; for (String requiredArgumentKey : requiredArguments) { - ArgumentEntry argumentEntry = currentArguments.get(requiredArgumentKey); + ArgumentEntry argumentEntry = arguments.get(requiredArgumentKey); if (argumentEntry == null || argumentEntry.isEmpty()) { if (emptyArguments == null) { emptyArguments = new ArrayList<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 0ab11ff742..72e31ebd50 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -636,7 +636,6 @@ public class CalculatedFieldCtx implements Closeable { return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId); } - // TODO: Consider to reevaluate if we need to refresh relationPathSource and scheduled update public boolean hasRefreshContextOnlyChanges(CalculatedFieldCtx other) { // has changes that do not require state recalculation if (output != null) { var thisOutputStrategy = output.getStrategy(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java index 6b1846c18f..8536c0f65f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java @@ -34,24 +34,18 @@ public class PropagationArgumentEntry implements ArgumentEntry { private Set entityIds; private transient List added; private transient EntityId removed; - private transient boolean partitionStateRestore; - private boolean forceResetPrevious; + private transient boolean forceResetPrevious; + private transient boolean ignoreRemovedEntities; public PropagationArgumentEntry() { this.entityIds = new HashSet<>(); this.added = null; this.removed = null; - this.partitionStateRestore = false; } public PropagationArgumentEntry(List entityIds) { - this(entityIds, false); - } - - public PropagationArgumentEntry(List entityIds, boolean partitionStateRestore) { this.entityIds = new HashSet<>(entityIds); - this.partitionStateRestore = partitionStateRestore; } @Override @@ -75,7 +69,7 @@ public class PropagationArgumentEntry implements ArgumentEntry { if (updated.getRemoved() != null) { return entityIds.remove(updated.getRemoved()); } - if (updated.isPartitionStateRestore()) { + if (updated.isIgnoreRemovedEntities()) { Set updatedIds = updated.getEntityIds(); if (updatedIds.isEmpty()) { entityIds.clear(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java index 0f1c44d0e2..c4533aef37 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java @@ -38,14 +38,12 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState { private CalculatedFieldProcessingService cfProcessingService; - private ScheduledFuture reevaluationFuture; public PropagationCalculatedFieldState(EntityId entityId) { super(entityId); @@ -58,7 +56,7 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState this.cfProcessingService = ctx.getCfProcessingService(); this.requiredArguments = new ArrayList<>(ctx.getArgNames()); requiredArguments.add(PROPAGATION_CONFIG_ARGUMENT); - this.readinessStatus = checkReadiness(requiredArguments, arguments); + this.readinessStatus = checkReadiness(); if (ctx.isApplyExpressionForResolvedArguments()) { this.tbelExpression = ctx.getTbelExpressions().get(ctx.getExpression()); } @@ -69,28 +67,16 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState super.init(restored); if (restored) { cfProcessingService.fetchPropagationArgumentFromDb(ctx, entityId).ifPresent(fromDb -> { - fromDb.setPartitionStateRestore(true); - var update = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx); - if (update.isEmpty()) { + fromDb.setIgnoreRemovedEntities(true); + var updatedArgs = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx); + if (updatedArgs.isEmpty()) { return; } - ScheduledFuture future = ctx.scheduleReevaluation(0L, actorCtx); - if (future != null) { - reevaluationFuture = future; - } + ctx.scheduleReevaluation(0L, actorCtx); }); } } - @Override - public void close() { - super.close(); - if (reevaluationFuture != null) { - reevaluationFuture.cancel(true); - reevaluationFuture = null; - } - } - @Override public CalculatedFieldType getType() { return CalculatedFieldType.PROPAGATION; @@ -131,15 +117,15 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState } @Override - protected ReadinessStatus checkReadiness(List requiredArguments, Map currentArguments) { - if (ctx.isApplyExpressionForResolvedArguments() || currentArguments == null) { - return super.checkReadiness(requiredArguments, currentArguments); + protected ReadinessStatus checkReadiness() { + if (ctx.isApplyExpressionForResolvedArguments() || arguments == null) { + return super.checkReadiness(); } boolean propagationNotEmpty = false; boolean hasOtherNonEmpty = false; List emptyArguments = null; for (String requiredArgumentKey : requiredArguments) { - ArgumentEntry argumentEntry = currentArguments.get(requiredArgumentKey); + ArgumentEntry argumentEntry = arguments.get(requiredArgumentKey); if (argumentEntry == null || argumentEntry.isEmpty()) { if (emptyArguments == null) { emptyArguments = new ArrayList<>(); diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java index dd5dfda79a..12f3e4298d 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java @@ -151,7 +151,8 @@ public class PropagationArgumentEntryTest { @Test void testUpdateEntryWhenPartitionStateRestoreAddsMissingIds() { - var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID), true); + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID)); + restore.setIgnoreRemovedEntities(true); boolean changed = entry.updateEntry(restore); @@ -159,12 +160,13 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID); assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isPartitionStateRestore()).isFalse(); + assertThat(entry.isIgnoreRemovedEntities()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreRemovesStaleIds() { - var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID), true); + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID)); + restore.setIgnoreRemovedEntities(true); boolean changed = entry.updateEntry(restore); @@ -172,12 +174,13 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID); assertThat(entry.getAdded()).isNull(); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isPartitionStateRestore()).isFalse(); + assertThat(entry.isIgnoreRemovedEntities()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreAddsAndRemoves() { - var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID), true); + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID)); + restore.setIgnoreRemovedEntities(true); boolean changed = entry.updateEntry(restore); @@ -185,13 +188,14 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_3_ID); assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isPartitionStateRestore()).isFalse(); + assertThat(entry.isIgnoreRemovedEntities()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreNoChanges() { - var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID), true); + var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID)); + restore.setIgnoreRemovedEntities(true); boolean changed = entry.updateEntry(restore); @@ -199,12 +203,13 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID); assertThat(entry.getAdded()).isNull(); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isPartitionStateRestore()).isFalse(); + assertThat(entry.isIgnoreRemovedEntities()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreEmptySet() { - var restore = new PropagationArgumentEntry(List.of(), true); + var restore = new PropagationArgumentEntry(List.of()); + restore.setIgnoreRemovedEntities(true); boolean changed = entry.updateEntry(restore); @@ -212,7 +217,7 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).isEmpty(); assertThat(entry.getAdded()).isNull(); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isPartitionStateRestore()).isFalse(); + assertThat(entry.isIgnoreRemovedEntities()).isFalse(); } @Test