diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java index 8536c0f65f..c1ae259370 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java @@ -36,7 +36,7 @@ public class PropagationArgumentEntry implements ArgumentEntry { private transient EntityId removed; private transient boolean forceResetPrevious; - private transient boolean ignoreRemovedEntities; + private transient boolean syncWithDb; public PropagationArgumentEntry() { this.entityIds = new HashSet<>(); @@ -69,14 +69,18 @@ public class PropagationArgumentEntry implements ArgumentEntry { if (updated.getRemoved() != null) { return entityIds.remove(updated.getRemoved()); } - if (updated.isIgnoreRemovedEntities()) { - Set updatedIds = updated.getEntityIds(); - if (updatedIds.isEmpty()) { + if (updated.isSyncWithDb()) { + Set dbEntityIds = updated.getEntityIds(); + if (dbEntityIds.isEmpty()) { + if (entityIds.isEmpty()) { + return false; + } entityIds.clear(); - return false; + return true; } - entityIds.retainAll(updatedIds); - return checkAdded(updatedIds); + boolean retained = entityIds.retainAll(dbEntityIds); + boolean added = checkAdded(dbEntityIds); + return retained || added; } if (updated.isEmpty()) { entityIds.clear(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java index c4533aef37..8613e0c062 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.cf.CalculatedFieldType; @@ -41,6 +42,7 @@ import java.util.Map; import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; +@Slf4j public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState { private CalculatedFieldProcessingService cfProcessingService; @@ -67,12 +69,17 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState super.init(restored); if (restored) { cfProcessingService.fetchPropagationArgumentFromDb(ctx, entityId).ifPresent(fromDb -> { - fromDb.setIgnoreRemovedEntities(true); + fromDb.setSyncWithDb(true); var updatedArgs = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx); if (updatedArgs.isEmpty()) { return; } - ctx.scheduleReevaluation(0L, actorCtx); + log.warn("[{}][{}] Propagation argument was out of sync during state restore and was reconciled with DB.", ctx.getCfId(), entityId); + var updatedPropagationArgument = (PropagationArgumentEntry) arguments.get(PROPAGATION_CONFIG_ARGUMENT); + if (updatedPropagationArgument.getAdded() != null) { + log.warn("[{}][{}] New propagation entities were detected during restore. Scheduling reevaluation for new entities...", ctx.getCfId(), entityId); + ctx.scheduleReevaluation(0L, actorCtx); + } }); } } diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java index 12f3e4298d..1433785dfa 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java @@ -152,7 +152,7 @@ public class PropagationArgumentEntryTest { @Test void testUpdateEntryWhenPartitionStateRestoreAddsMissingIds() { var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID)); - restore.setIgnoreRemovedEntities(true); + restore.setSyncWithDb(true); boolean changed = entry.updateEntry(restore); @@ -160,27 +160,27 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID); assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isIgnoreRemovedEntities()).isFalse(); + assertThat(entry.isSyncWithDb()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreRemovesStaleIds() { var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID)); - restore.setIgnoreRemovedEntities(true); + restore.setSyncWithDb(true); boolean changed = entry.updateEntry(restore); - assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op + assertThat(changed).isTrue(); // expected to be changed, so we re-check readiness for the state assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID); assertThat(entry.getAdded()).isNull(); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isIgnoreRemovedEntities()).isFalse(); + assertThat(entry.isSyncWithDb()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreAddsAndRemoves() { var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID)); - restore.setIgnoreRemovedEntities(true); + restore.setSyncWithDb(true); boolean changed = entry.updateEntry(restore); @@ -188,14 +188,14 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_3_ID); assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isIgnoreRemovedEntities()).isFalse(); + assertThat(entry.isSyncWithDb()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreNoChanges() { var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID)); - restore.setIgnoreRemovedEntities(true); + restore.setSyncWithDb(true); boolean changed = entry.updateEntry(restore); @@ -203,21 +203,21 @@ public class PropagationArgumentEntryTest { assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID); assertThat(entry.getAdded()).isNull(); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isIgnoreRemovedEntities()).isFalse(); + assertThat(entry.isSyncWithDb()).isFalse(); } @Test void testUpdateEntryWhenPartitionStateRestoreEmptySet() { var restore = new PropagationArgumentEntry(List.of()); - restore.setIgnoreRemovedEntities(true); + restore.setSyncWithDb(true); boolean changed = entry.updateEntry(restore); - assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op + assertThat(changed).isTrue(); // expected to be changed, so we re-check readiness for the state assertThat(entry.getEntityIds()).isEmpty(); assertThat(entry.getAdded()).isNull(); assertThat(entry.getRemoved()).isNull(); - assertThat(entry.isIgnoreRemovedEntities()).isFalse(); + assertThat(entry.isSyncWithDb()).isFalse(); } @Test diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java index a9bf6d54d1..2ceafd8c0a 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java @@ -317,7 +317,49 @@ public class PropagationCalculatedFieldStateTest { } @Test - void testPropapagationStateInitWithRestoredSetToTrue() { + void testPropagationStateInitRestoredTrueNoDbChanges_noReevaluation() { + initCtxAndState(false); + + // existing state already matches DB snapshot + Map initArgs = Map.of( + TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, + HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry, + PROPAGATION_CONFIG_ARGUMENT, propagationArgEntry + ); + state.update(initArgs, ctx); + + // DB returns the same set + when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(propagationArgEntry)); + + state.init(true); + + verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId()); + verify(ctx, never()).scheduleReevaluation(anyLong(), any()); + } + + @Test + void testPropagationStateInitRestoredTrueOnlyRemovals_noReevaluation() { + initCtxAndState(false); + + Map initArgs = Map.of( + TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, + HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry, + PROPAGATION_CONFIG_ARGUMENT, propagationArgEntry + ); + state.update(initArgs, ctx); + + // DB snapshot contains only ASSET_ID_1 (ASSET_ID_2 should be removed) + var fromDb = new PropagationArgumentEntry(List.of(ASSET_ID_1)); + when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb)); + + state.init(true); + + verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId()); + verify(ctx, never()).scheduleReevaluation(anyLong(), any()); + } + + @Test + void testPropagationStateIsNotReadyInitRestoredTrueAddedEntities_scheduleReevaluation() { initCtxAndState(false); Map initArgs = Map.of( TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, @@ -335,6 +377,75 @@ public class PropagationCalculatedFieldStateTest { verify(ctx).scheduleReevaluation(0L, state.getActorCtx()); } + @Test + void testPropagationStateInitRestoredTrueAddedEntities_scheduleReevaluation() { + initCtxAndState(false); + + // existing missing ASSET_ID_2 + var existing = new PropagationArgumentEntry(List.of(ASSET_ID_1)); + Map initArgs = Map.of( + TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, + HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry, + PROPAGATION_CONFIG_ARGUMENT, existing + ); + state.update(initArgs, ctx); + assertThat(state.isReady()).isTrue(); + + // DB snapshot contains both (ASSET_ID_2 should be "added") + var fromDb = new PropagationArgumentEntry(List.of(ASSET_ID_1, ASSET_ID_2)); + when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb)); + + state.init(true); + + verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId()); + verify(ctx).scheduleReevaluation(0L, state.getActorCtx()); + } + + @Test + void testPropagationStateInitRestoredTrueDbEmpty_clearsState_noReevaluation() { + initCtxAndState(false); + + // existing has something + Map initArgs = Map.of( + TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, + HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry, + PROPAGATION_CONFIG_ARGUMENT, propagationArgEntry + ); + state.update(initArgs, ctx); + + // DB snapshot empty -> clear + var fromDb = new PropagationArgumentEntry(Collections.emptyList()); + when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb)); + + state.init(true); + + verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId()); + verify(ctx, never()).scheduleReevaluation(anyLong(), any()); + } + + @Test + void testPropagationStateInitRestoredTrueBothEmpty_noReevaluation() { + initCtxAndState(false); + + // Existing propagation argument is empty + Map initArgs = Map.of( + TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry, + HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry, + PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(Collections.emptyList()) + ); + state.update(initArgs, ctx); + assertThat(state.isReady()).isFalse(); + + // DB snapshot is also empty + var fromDb = new PropagationArgumentEntry(Collections.emptyList()); + when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb)); + + state.init(true); + + verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId()); + verify(ctx, never()).scheduleReevaluation(anyLong(), any()); + } + private CalculatedField getCalculatedField(boolean applyExpressionToResolvedArguments) { CalculatedField calculatedField = new CalculatedField(); calculatedField.setTenantId(TENANT_ID);