Browse Source

Warn on stale propagation state during restore; fix sync update to recalculate readiness

pull/14706/head
dshvaika 5 months ago
parent
commit
a54e04c06f
  1. 18
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java
  2. 11
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java
  3. 24
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java
  4. 113
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java

18
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java

@ -36,7 +36,7 @@ public class PropagationArgumentEntry implements ArgumentEntry {
private transient EntityId removed;
private transient boolean forceResetPrevious;
private transient boolean ignoreRemovedEntities;
private transient boolean syncWithDb;
public PropagationArgumentEntry() {
this.entityIds = new HashSet<>();
@ -69,14 +69,18 @@ public class PropagationArgumentEntry implements ArgumentEntry {
if (updated.getRemoved() != null) {
return entityIds.remove(updated.getRemoved());
}
if (updated.isIgnoreRemovedEntities()) {
Set<EntityId> updatedIds = updated.getEntityIds();
if (updatedIds.isEmpty()) {
if (updated.isSyncWithDb()) {
Set<EntityId> dbEntityIds = updated.getEntityIds();
if (dbEntityIds.isEmpty()) {
if (entityIds.isEmpty()) {
return false;
}
entityIds.clear();
return false;
return true;
}
entityIds.retainAll(updatedIds);
return checkAdded(updatedIds);
boolean retained = entityIds.retainAll(dbEntityIds);
boolean added = checkAdded(dbEntityIds);
return retained || added;
}
if (updated.isEmpty()) {
entityIds.clear();

11
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java

@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
@ -41,6 +42,7 @@ import java.util.Map;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
@Slf4j
public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState {
private CalculatedFieldProcessingService cfProcessingService;
@ -67,12 +69,17 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
super.init(restored);
if (restored) {
cfProcessingService.fetchPropagationArgumentFromDb(ctx, entityId).ifPresent(fromDb -> {
fromDb.setIgnoreRemovedEntities(true);
fromDb.setSyncWithDb(true);
var updatedArgs = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx);
if (updatedArgs.isEmpty()) {
return;
}
ctx.scheduleReevaluation(0L, actorCtx);
log.warn("[{}][{}] Propagation argument was out of sync during state restore and was reconciled with DB.", ctx.getCfId(), entityId);
var updatedPropagationArgument = (PropagationArgumentEntry) arguments.get(PROPAGATION_CONFIG_ARGUMENT);
if (updatedPropagationArgument.getAdded() != null) {
log.warn("[{}][{}] New propagation entities were detected during restore. Scheduling reevaluation for new entities...", ctx.getCfId(), entityId);
ctx.scheduleReevaluation(0L, actorCtx);
}
});
}
}

24
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java

@ -152,7 +152,7 @@ public class PropagationArgumentEntryTest {
@Test
void testUpdateEntryWhenPartitionStateRestoreAddsMissingIds() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID));
restore.setIgnoreRemovedEntities(true);
restore.setSyncWithDb(true);
boolean changed = entry.updateEntry(restore);
@ -160,27 +160,27 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID);
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
assertThat(entry.isSyncWithDb()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreRemovesStaleIds() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID));
restore.setIgnoreRemovedEntities(true);
restore.setSyncWithDb(true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op
assertThat(changed).isTrue(); // expected to be changed, so we re-check readiness for the state
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID);
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
assertThat(entry.isSyncWithDb()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreAddsAndRemoves() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID));
restore.setIgnoreRemovedEntities(true);
restore.setSyncWithDb(true);
boolean changed = entry.updateEntry(restore);
@ -188,14 +188,14 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID);
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
assertThat(entry.isSyncWithDb()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreNoChanges() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID));
restore.setIgnoreRemovedEntities(true);
restore.setSyncWithDb(true);
boolean changed = entry.updateEntry(restore);
@ -203,21 +203,21 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID);
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
assertThat(entry.isSyncWithDb()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreEmptySet() {
var restore = new PropagationArgumentEntry(List.of());
restore.setIgnoreRemovedEntities(true);
restore.setSyncWithDb(true);
boolean changed = entry.updateEntry(restore);
assertThat(changed).isFalse(); // expected no change, since we consider the removal of stale ids as no-op
assertThat(changed).isTrue(); // expected to be changed, so we re-check readiness for the state
assertThat(entry.getEntityIds()).isEmpty();
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
assertThat(entry.isSyncWithDb()).isFalse();
}
@Test

113
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java

@ -317,7 +317,49 @@ public class PropagationCalculatedFieldStateTest {
}
@Test
void testPropapagationStateInitWithRestoredSetToTrue() {
void testPropagationStateInitRestoredTrueNoDbChanges_noReevaluation() {
initCtxAndState(false);
// existing state already matches DB snapshot
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry,
PROPAGATION_CONFIG_ARGUMENT, propagationArgEntry
);
state.update(initArgs, ctx);
// DB returns the same set
when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(propagationArgEntry));
state.init(true);
verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId());
verify(ctx, never()).scheduleReevaluation(anyLong(), any());
}
@Test
void testPropagationStateInitRestoredTrueOnlyRemovals_noReevaluation() {
initCtxAndState(false);
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry,
PROPAGATION_CONFIG_ARGUMENT, propagationArgEntry
);
state.update(initArgs, ctx);
// DB snapshot contains only ASSET_ID_1 (ASSET_ID_2 should be removed)
var fromDb = new PropagationArgumentEntry(List.of(ASSET_ID_1));
when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb));
state.init(true);
verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId());
verify(ctx, never()).scheduleReevaluation(anyLong(), any());
}
@Test
void testPropagationStateIsNotReadyInitRestoredTrueAddedEntities_scheduleReevaluation() {
initCtxAndState(false);
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
@ -335,6 +377,75 @@ public class PropagationCalculatedFieldStateTest {
verify(ctx).scheduleReevaluation(0L, state.getActorCtx());
}
@Test
void testPropagationStateInitRestoredTrueAddedEntities_scheduleReevaluation() {
initCtxAndState(false);
// existing missing ASSET_ID_2
var existing = new PropagationArgumentEntry(List.of(ASSET_ID_1));
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry,
PROPAGATION_CONFIG_ARGUMENT, existing
);
state.update(initArgs, ctx);
assertThat(state.isReady()).isTrue();
// DB snapshot contains both (ASSET_ID_2 should be "added")
var fromDb = new PropagationArgumentEntry(List.of(ASSET_ID_1, ASSET_ID_2));
when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb));
state.init(true);
verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId());
verify(ctx).scheduleReevaluation(0L, state.getActorCtx());
}
@Test
void testPropagationStateInitRestoredTrueDbEmpty_clearsState_noReevaluation() {
initCtxAndState(false);
// existing has something
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry,
PROPAGATION_CONFIG_ARGUMENT, propagationArgEntry
);
state.update(initArgs, ctx);
// DB snapshot empty -> clear
var fromDb = new PropagationArgumentEntry(Collections.emptyList());
when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb));
state.init(true);
verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId());
verify(ctx, never()).scheduleReevaluation(anyLong(), any());
}
@Test
void testPropagationStateInitRestoredTrueBothEmpty_noReevaluation() {
initCtxAndState(false);
// Existing propagation argument is empty
Map<String, ArgumentEntry> initArgs = Map.of(
TEMPERATURE_ARGUMENT_NAME, temperatureArgumentEntry,
HUMIDITY_ARGUMENT_NAME, humidityArgumentEntry,
PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(Collections.emptyList())
);
state.update(initArgs, ctx);
assertThat(state.isReady()).isFalse();
// DB snapshot is also empty
var fromDb = new PropagationArgumentEntry(Collections.emptyList());
when(cfProcessingService.fetchPropagationArgumentFromDb(any(), any())).thenReturn(Optional.of(fromDb));
state.init(true);
verify(cfProcessingService).fetchPropagationArgumentFromDb(ctx, state.getEntityId());
verify(ctx, never()).scheduleReevaluation(anyLong(), any());
}
private CalculatedField getCalculatedField(boolean applyExpressionToResolvedArguments) {
CalculatedField calculatedField = new CalculatedField();
calculatedField.setTenantId(TENANT_ID);

Loading…
Cancel
Save