Browse Source

fixes afrer review

pull/14560/head
dshvaika 6 months ago
parent
commit
3bb215f159
  1. 10
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  2. 1
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  3. 12
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java
  4. 32
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java
  5. 25
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java

10
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -63,7 +63,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState,
this.ctx = ctx;
this.actorCtx = actorCtx;
this.requiredArguments = ctx.getArgNames();
this.readinessStatus = checkReadiness(requiredArguments, arguments);
this.readinessStatus = checkReadiness();
}
@Override
@ -108,7 +108,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState,
if (updatedArguments == null) {
return Collections.emptyMap();
}
readinessStatus = checkReadiness(requiredArguments, arguments);
readinessStatus = checkReadiness();
return updatedArguments;
}
@ -183,13 +183,13 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState,
return latestTs;
}
protected ReadinessStatus checkReadiness(List<String> requiredArguments, Map<String, ArgumentEntry> currentArguments) {
if (currentArguments == null) {
protected ReadinessStatus checkReadiness() {
if (arguments == null) {
return ReadinessStatus.from(requiredArguments);
}
List<String> emptyArguments = null;
for (String requiredArgumentKey : requiredArguments) {
ArgumentEntry argumentEntry = currentArguments.get(requiredArgumentKey);
ArgumentEntry argumentEntry = arguments.get(requiredArgumentKey);
if (argumentEntry == null || argumentEntry.isEmpty()) {
if (emptyArguments == null) {
emptyArguments = new ArrayList<>();

1
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -636,7 +636,6 @@ public class CalculatedFieldCtx implements Closeable {
return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId);
}
// TODO: Consider to reevaluate if we need to refresh relationPathSource and scheduled update
public boolean hasRefreshContextOnlyChanges(CalculatedFieldCtx other) { // has changes that do not require state recalculation
if (output != null) {
var thisOutputStrategy = output.getStrategy();

12
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java

@ -34,24 +34,18 @@ public class PropagationArgumentEntry implements ArgumentEntry {
private Set<EntityId> entityIds;
private transient List<EntityId> added;
private transient EntityId removed;
private transient boolean partitionStateRestore;
private boolean forceResetPrevious;
private transient boolean forceResetPrevious;
private transient boolean ignoreRemovedEntities;
public PropagationArgumentEntry() {
this.entityIds = new HashSet<>();
this.added = null;
this.removed = null;
this.partitionStateRestore = false;
}
public PropagationArgumentEntry(List<EntityId> entityIds) {
this(entityIds, false);
}
public PropagationArgumentEntry(List<EntityId> entityIds, boolean partitionStateRestore) {
this.entityIds = new HashSet<>(entityIds);
this.partitionStateRestore = partitionStateRestore;
}
@Override
@ -75,7 +69,7 @@ public class PropagationArgumentEntry implements ArgumentEntry {
if (updated.getRemoved() != null) {
return entityIds.remove(updated.getRemoved());
}
if (updated.isPartitionStateRestore()) {
if (updated.isIgnoreRemovedEntities()) {
Set<EntityId> updatedIds = updated.getEntityIds();
if (updatedIds.isEmpty()) {
entityIds.clear();

32
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java

@ -38,14 +38,12 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState {
private CalculatedFieldProcessingService cfProcessingService;
private ScheduledFuture<?> reevaluationFuture;
public PropagationCalculatedFieldState(EntityId entityId) {
super(entityId);
@ -58,7 +56,7 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
this.cfProcessingService = ctx.getCfProcessingService();
this.requiredArguments = new ArrayList<>(ctx.getArgNames());
requiredArguments.add(PROPAGATION_CONFIG_ARGUMENT);
this.readinessStatus = checkReadiness(requiredArguments, arguments);
this.readinessStatus = checkReadiness();
if (ctx.isApplyExpressionForResolvedArguments()) {
this.tbelExpression = ctx.getTbelExpressions().get(ctx.getExpression());
}
@ -69,28 +67,16 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
super.init(restored);
if (restored) {
cfProcessingService.fetchPropagationArgumentFromDb(ctx, entityId).ifPresent(fromDb -> {
fromDb.setPartitionStateRestore(true);
var update = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx);
if (update.isEmpty()) {
fromDb.setIgnoreRemovedEntities(true);
var updatedArgs = update(Map.of(PROPAGATION_CONFIG_ARGUMENT, fromDb), ctx);
if (updatedArgs.isEmpty()) {
return;
}
ScheduledFuture<?> future = ctx.scheduleReevaluation(0L, actorCtx);
if (future != null) {
reevaluationFuture = future;
}
ctx.scheduleReevaluation(0L, actorCtx);
});
}
}
@Override
public void close() {
super.close();
if (reevaluationFuture != null) {
reevaluationFuture.cancel(true);
reevaluationFuture = null;
}
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.PROPAGATION;
@ -131,15 +117,15 @@ public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState
}
@Override
protected ReadinessStatus checkReadiness(List<String> requiredArguments, Map<String, ArgumentEntry> currentArguments) {
if (ctx.isApplyExpressionForResolvedArguments() || currentArguments == null) {
return super.checkReadiness(requiredArguments, currentArguments);
protected ReadinessStatus checkReadiness() {
if (ctx.isApplyExpressionForResolvedArguments() || arguments == null) {
return super.checkReadiness();
}
boolean propagationNotEmpty = false;
boolean hasOtherNonEmpty = false;
List<String> emptyArguments = null;
for (String requiredArgumentKey : requiredArguments) {
ArgumentEntry argumentEntry = currentArguments.get(requiredArgumentKey);
ArgumentEntry argumentEntry = arguments.get(requiredArgumentKey);
if (argumentEntry == null || argumentEntry.isEmpty()) {
if (emptyArguments == null) {
emptyArguments = new ArrayList<>();

25
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationArgumentEntryTest.java

@ -151,7 +151,8 @@ public class PropagationArgumentEntryTest {
@Test
void testUpdateEntryWhenPartitionStateRestoreAddsMissingIds() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID), true);
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID));
restore.setIgnoreRemovedEntities(true);
boolean changed = entry.updateEntry(restore);
@ -159,12 +160,13 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID);
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreRemovesStaleIds() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID), true);
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID));
restore.setIgnoreRemovedEntities(true);
boolean changed = entry.updateEntry(restore);
@ -172,12 +174,13 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID);
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreAddsAndRemoves() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID), true);
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_3_ID));
restore.setIgnoreRemovedEntities(true);
boolean changed = entry.updateEntry(restore);
@ -185,13 +188,14 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_3_ID);
assertThat(entry.getAdded()).containsExactly(ENTITY_3_ID);
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreNoChanges() {
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID), true);
var restore = new PropagationArgumentEntry(List.of(ENTITY_1_ID, ENTITY_2_ID));
restore.setIgnoreRemovedEntities(true);
boolean changed = entry.updateEntry(restore);
@ -199,12 +203,13 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).containsExactlyInAnyOrder(ENTITY_1_ID, ENTITY_2_ID);
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
}
@Test
void testUpdateEntryWhenPartitionStateRestoreEmptySet() {
var restore = new PropagationArgumentEntry(List.of(), true);
var restore = new PropagationArgumentEntry(List.of());
restore.setIgnoreRemovedEntities(true);
boolean changed = entry.updateEntry(restore);
@ -212,7 +217,7 @@ public class PropagationArgumentEntryTest {
assertThat(entry.getEntityIds()).isEmpty();
assertThat(entry.getAdded()).isNull();
assertThat(entry.getRemoved()).isNull();
assertThat(entry.isPartitionStateRestore()).isFalse();
assertThat(entry.isIgnoreRemovedEntities()).isFalse();
}
@Test

Loading…
Cancel
Save