Browse Source

Merge pull request #14414 from ShvaykaD/bugfixes/cfs

Propagation CF error handling & Geofencing evaluation logic improvements
pull/14454/head
Viacheslav Klimov 6 months ago
committed by GitHub
parent
commit
3ad349bf21
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 69
      application/src/main/data/upgrade/basic/schema_update.sql
  2. 4
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  3. 5
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  4. 16
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java
  5. 31
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java
  6. 5
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingZoneState.java
  7. 47
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java
  8. 33
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java
  9. 2
      application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java
  10. 8
      common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java

69
application/src/main/data/upgrade/basic/schema_update.sql

@ -18,56 +18,27 @@
UPDATE tenant_profile
SET profile_data = jsonb_set(
profile_data,
'{configuration}',
(profile_data -> 'configuration')
|| jsonb_strip_nulls(
jsonb_build_object(
'minAllowedScheduledUpdateIntervalInSecForCF',
CASE
WHEN (profile_data -> 'configuration') ? 'minAllowedScheduledUpdateIntervalInSecForCF'
THEN NULL
ELSE to_jsonb(60)
END,
'maxRelationLevelPerCfArgument',
CASE
WHEN (profile_data -> 'configuration') ? 'maxRelationLevelPerCfArgument'
THEN NULL
ELSE to_jsonb(10)
END,
'maxRelatedEntitiesToReturnPerCfArgument',
CASE
WHEN (profile_data -> 'configuration') ? 'maxRelatedEntitiesToReturnPerCfArgument'
THEN NULL
ELSE to_jsonb(100)
END,
'minAllowedDeduplicationIntervalInSecForCF',
CASE
WHEN (profile_data -> 'configuration') ? 'minAllowedDeduplicationIntervalInSecForCF'
THEN NULL
ELSE to_jsonb(60)
END,
'minAllowedAggregationIntervalInSecForCF',
CASE
WHEN (profile_data -> 'configuration') ? 'minAllowedAggregationIntervalInSecForCF'
THEN NULL
ELSE to_jsonb(60)
END
)
),
false
)
profile_data,
'{configuration}',
jsonb_build_object(
'minAllowedScheduledUpdateIntervalInSecForCF', 60,
'maxRelationLevelPerCfArgument', 10,
'maxRelatedEntitiesToReturnPerCfArgument', 100,
'minAllowedDeduplicationIntervalInSecForCF', 60,
'minAllowedAggregationIntervalInSecForCF', 60
)
||
jsonb_strip_nulls(profile_data -> 'configuration')
)
WHERE NOT (
(profile_data -> 'configuration') ? 'minAllowedScheduledUpdateIntervalInSecForCF'
AND
(profile_data -> 'configuration') ? 'maxRelationLevelPerCfArgument'
AND
(profile_data -> 'configuration') ? 'maxRelatedEntitiesToReturnPerCfArgument'
AND
(profile_data -> 'configuration') ? 'minAllowedDeduplicationIntervalInSecForCF'
AND
(profile_data -> 'configuration') ? 'minAllowedAggregationIntervalInSecForCF'
);
jsonb_strip_nulls(profile_data -> 'configuration') ?& ARRAY[
'minAllowedScheduledUpdateIntervalInSecForCF',
'maxRelationLevelPerCfArgument',
'maxRelatedEntitiesToReturnPerCfArgument',
'minAllowedDeduplicationIntervalInSecForCF',
'minAllowedAggregationIntervalInSecForCF'
]
);
-- UPDATE TENANT PROFILE CONFIGURATION END

4
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -174,9 +174,9 @@ public abstract class AbstractCalculatedFieldProcessingService {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new RuntimeException("Failed to fetch " + key + ": " + cause.getMessage(), cause);
throw new RuntimeException("Failed to fetch '" + key + "' argument: " + cause.getMessage(), cause);
} catch (InterruptedException e) {
throw new RuntimeException("Failed to fetch" + key, e);
throw new RuntimeException("Failed to fetch '" + key + "' argument!", e);
}
}

5
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -25,6 +25,8 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingZoneState;
import org.thingsboard.server.utils.CalculatedFieldUtils;
import java.io.Closeable;
@ -164,6 +166,9 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState,
.mapToLong(e -> (e instanceof SingleValueArgumentEntry s) ? s.getTs() : 0L)
.max()
.orElse(0L);
} else if (entry instanceof GeofencingArgumentEntry geofencingArgumentEntry) {
newTs = geofencingArgumentEntry.getZoneStates().values().stream()
.mapToLong(GeofencingZoneState::getTs).max().orElse(0L);
}
this.latestTimestamp = Math.max(this.latestTimestamp, newTs);
}

16
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java

@ -38,6 +38,7 @@ import java.io.Closeable;
import java.util.List;
import java.util.Map;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
import static org.thingsboard.server.utils.CalculatedFieldUtils.toSingleValueArgumentProto;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@ -102,14 +103,25 @@ public interface CalculatedFieldState extends Closeable {
record ReadinessStatus(boolean ready, String errorMsg) {
private static final String ERROR_MESSAGE = "Required arguments are missing: ";
private static final String MISSING_REQUIRED_ARGUMENTS_ERROR = "Required arguments are missing: ";
private static final String MISSING_PROPAGATION_TARGETS_ERROR = "No entities found via 'Propagation path to related entities'. " +
"Verify the configured relation type and direction.";
private static final String MISSING_PROPAGATION_TARGETS_AND_ARGUMENTS_ERROR = MISSING_PROPAGATION_TARGETS_ERROR + " Missing arguments to propagate: ";
private static final ReadinessStatus READY = new ReadinessStatus(true, null);
public static ReadinessStatus from(List<String> emptyOrMissingArguments) {
if (CollectionsUtil.isEmpty(emptyOrMissingArguments)) {
return ReadinessStatus.READY;
}
return new ReadinessStatus(false, ERROR_MESSAGE + String.join(", ", emptyOrMissingArguments));
boolean propagationCtxIsEmpty = emptyOrMissingArguments.remove(PROPAGATION_CONFIG_ARGUMENT);
if (!propagationCtxIsEmpty) {
return new ReadinessStatus(false, MISSING_REQUIRED_ARGUMENTS_ERROR + String.join(", ", emptyOrMissingArguments));
}
if (emptyOrMissingArguments.isEmpty()) {
return new ReadinessStatus(false, MISSING_PROPAGATION_TARGETS_ERROR);
}
return new ReadinessStatus(false, MISSING_PROPAGATION_TARGETS_AND_ARGUMENTS_ERROR +
String.join(", ", emptyOrMissingArguments));
}
}

31
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java

@ -107,23 +107,26 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState {
boolean createRelationsWithMatchedZones = zoneGroupCfg.isCreateRelationsWithMatchedZones();
List<GeofencingEvalResult> zoneResults = new ArrayList<>(argumentEntry.getZoneStates().size());
argumentEntry.getZoneStates().forEach((zoneId, zoneState) -> {
boolean firstEval = zoneState.getLastPresence() == null;
GeofencingEvalResult eval = zoneState.evaluate(entityCoordinates);
zoneResults.add(eval);
if (createRelationsWithMatchedZones) {
GeofencingTransitionEvent transitionEvent = eval.transition();
if (transitionEvent == null) {
return;
}
EntityRelation relation = switch (zoneGroupCfg.getDirection()) {
case TO -> new EntityRelation(zoneId, entityId, zoneGroupCfg.getRelationType());
case FROM -> new EntityRelation(entityId, zoneId, zoneGroupCfg.getRelationType());
};
ListenableFuture<Boolean> f = switch (transitionEvent) {
case ENTERED -> ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), relation);
case LEFT -> ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation);
};
relationFutures.add(f);
if (!createRelationsWithMatchedZones) {
return;
}
GeofencingTransitionEvent transitionEvent = eval.transition();
if (transitionEvent == null && !firstEval) {
return;
}
transitionEvent = transitionEvent == null ? GeofencingTransitionEvent.LEFT : transitionEvent;
EntityRelation relation = switch (zoneGroupCfg.getDirection()) {
case TO -> new EntityRelation(zoneId, entityId, zoneGroupCfg.getRelationType());
case FROM -> new EntityRelation(entityId, zoneId, zoneGroupCfg.getRelationType());
};
ListenableFuture<Boolean> f = switch (transitionEvent) {
case ENTERED -> ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), relation);
case LEFT -> ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation);
};
relationFutures.add(f);
});
updateValuesNode(argumentKey, zoneResults, zoneGroupCfg.getReportStrategy(), valuesNode);
});

5
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingZoneState.java

@ -46,10 +46,13 @@ public class GeofencingZoneState {
public GeofencingZoneState(EntityId zoneId, KvEntry entry) {
this.zoneId = zoneId;
if (!(entry instanceof AttributeKvEntry attributeKvEntry)) {
throw new IllegalArgumentException("Unsupported KvEntry type for geofencing zone state: " + entry.getClass().getSimpleName());
throw new IllegalArgumentException("Invalid perimeter data source for zone with id: " + zoneId + ". Perimeter definition must be stored as attribute!");
}
this.ts = attributeKvEntry.getLastUpdateTs();
this.version = attributeKvEntry.getVersion();
if (entry.getValueAsString() == null) {
throw new IllegalArgumentException("Perimeter attribute key '" + entry.getKey() + "' not found for Zone with id: " + zoneId);
}
this.perimeterDefinition = JacksonUtil.fromString(entry.getValueAsString(), PerimeterDefinition.class);
}

47
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java

@ -221,7 +221,7 @@ public class GeofencingCalculatedFieldStateTest {
ENTITY_ID_LATITUDE_ARGUMENT_KEY, latitudeArgEntry,
ENTITY_ID_LONGITUDE_ARGUMENT_KEY, longitudeArgEntry,
"allowedZones", geofencingAllowedZoneArgEntry,
"restrictedZones", new GeofencingArgumentEntry()
"restrictedZones", new GeofencingArgumentEntry(Collections.emptyMap())
), ctx);
assertThat(state.isReady()).isFalse();
assertThat(state.getReadinessStatus().errorMsg()).contains("restrictedZones");
@ -290,10 +290,17 @@ public class GeofencingCalculatedFieldStateTest {
assertThat(relationFromSecondIteration.getType()).isEqualTo("CurrentZone");
ArgumentCaptor<EntityRelation> deleteCaptor = ArgumentCaptor.forClass(EntityRelation.class);
verify(relationService).deleteRelationAsync(eq(ctx.getTenantId()), deleteCaptor.capture());
EntityRelation leftRelation = deleteCaptor.getValue();
assertThat(leftRelation.getFrom()).isEqualTo(ZONE_1_ID);
assertThat(leftRelation.getTo()).isEqualTo(ctx.getEntityId());
verify(relationService, times(2)).deleteRelationAsync(eq(ctx.getTenantId()), deleteCaptor.capture());
List<EntityRelation> deleteValues = deleteCaptor.getAllValues();
assertThat(deleteValues).hasSize(2);
EntityRelation deleteRelationFromFirstIteration = deleteValues.get(0);
assertThat(deleteRelationFromFirstIteration.getFrom()).isEqualTo(ZONE_2_ID);
assertThat(deleteRelationFromFirstIteration.getTo()).isEqualTo(ctx.getEntityId());
EntityRelation deleteRelationFromSecondIteration = deleteValues.get(1);
assertThat(deleteRelationFromSecondIteration.getFrom()).isEqualTo(ZONE_1_ID);
assertThat(deleteRelationFromSecondIteration.getTo()).isEqualTo(ctx.getEntityId());
}
@Test
@ -360,10 +367,17 @@ public class GeofencingCalculatedFieldStateTest {
assertThat(relationFromSecondIteration.getType()).isEqualTo("CurrentZone");
ArgumentCaptor<EntityRelation> deleteCaptor = ArgumentCaptor.forClass(EntityRelation.class);
verify(relationService).deleteRelationAsync(eq(ctx.getTenantId()), deleteCaptor.capture());
EntityRelation leftRelation = deleteCaptor.getValue();
assertThat(leftRelation.getFrom()).isEqualTo(ZONE_1_ID);
assertThat(leftRelation.getTo()).isEqualTo(ctx.getEntityId());
verify(relationService, times(2)).deleteRelationAsync(eq(ctx.getTenantId()), deleteCaptor.capture());
List<EntityRelation> deleteValues = deleteCaptor.getAllValues();
assertThat(deleteValues).hasSize(2);
EntityRelation deleteRelationFromFirstIteration = deleteValues.get(0);
assertThat(deleteRelationFromFirstIteration.getFrom()).isEqualTo(ZONE_2_ID);
assertThat(deleteRelationFromFirstIteration.getTo()).isEqualTo(ctx.getEntityId());
EntityRelation deleteRelationFromSecondIteration = deleteValues.get(1);
assertThat(deleteRelationFromSecondIteration.getFrom()).isEqualTo(ZONE_1_ID);
assertThat(deleteRelationFromSecondIteration.getTo()).isEqualTo(ctx.getEntityId());
}
@Test
@ -432,10 +446,17 @@ public class GeofencingCalculatedFieldStateTest {
assertThat(relationFromSecondIteration.getType()).isEqualTo("CurrentZone");
ArgumentCaptor<EntityRelation> deleteCaptor = ArgumentCaptor.forClass(EntityRelation.class);
verify(relationService).deleteRelationAsync(eq(ctx.getTenantId()), deleteCaptor.capture());
EntityRelation leftRelation = deleteCaptor.getValue();
assertThat(leftRelation.getFrom()).isEqualTo(ZONE_1_ID);
assertThat(leftRelation.getTo()).isEqualTo(ctx.getEntityId());
verify(relationService, times(2)).deleteRelationAsync(eq(ctx.getTenantId()), deleteCaptor.capture());
List<EntityRelation> deleteValues = deleteCaptor.getAllValues();
assertThat(deleteValues).hasSize(2);
EntityRelation deleteRelationFromFirstIteration = deleteValues.get(0);
assertThat(deleteRelationFromFirstIteration.getFrom()).isEqualTo(ZONE_2_ID);
assertThat(deleteRelationFromFirstIteration.getTo()).isEqualTo(ctx.getEntityId());
EntityRelation deleteRelationFromSecondIteration = deleteValues.get(1);
assertThat(deleteRelationFromSecondIteration.getFrom()).isEqualTo(ZONE_1_ID);
assertThat(deleteRelationFromSecondIteration.getTo()).isEqualTo(ctx.getEntityId());
}
private CalculatedField getCalculatedField() {

33
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java

@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
@ -52,10 +54,12 @@ import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalcul
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@ -126,21 +130,28 @@ public class PropagationCalculatedFieldStateTest {
assertThat(state.isReady()).isFalse();
}
@Test
void testIsReadyWhenPropagationArgIsNull() {
initCtxAndState(false);
state.update(Map.of(TEMPERATURE_ARGUMENT_NAME, singleValueArgEntry), ctx);
assertThat(state.isReady()).isFalse();
assertThat(state.getReadinessStatus().errorMsg()).contains(PROPAGATION_CONFIG_ARGUMENT);
private static Stream<ArgumentEntry> provideInvalidPropagationArgs() {
return Stream.of(
null,
new PropagationArgumentEntry(Collections.emptyList())
);
}
@Test
void testIsReadyWhenPropagationArgIsEmpty() {
@ParameterizedTest
@MethodSource("provideInvalidPropagationArgs")
void testIsReadyWhenPropagationArgIsNullOrEmpty(ArgumentEntry propagationEntry) {
initCtxAndState(false);
state.update(Map.of(TEMPERATURE_ARGUMENT_NAME, singleValueArgEntry,
PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(Collections.emptyList())), ctx);
Map<String, ArgumentEntry> args = new HashMap<>();
args.put(TEMPERATURE_ARGUMENT_NAME, singleValueArgEntry); // Valid user arg
if (propagationEntry != null) {
args.put(PROPAGATION_CONFIG_ARGUMENT, propagationEntry);
}
state.update(args, ctx);
assertThat(state.isReady()).isFalse();
assertThat(state.getReadinessStatus().errorMsg()).contains(PROPAGATION_CONFIG_ARGUMENT);
assertThat(state.getReadinessStatus().errorMsg())
.isEqualTo("No entities found via 'Propagation path to related entities'. Verify the configured relation type and direction.");
}
@Test

2
application/src/test/java/org/thingsboard/server/utils/CalculatedFieldUtilsTest.java

@ -107,7 +107,7 @@ class CalculatedFieldUtilsTest {
assertThat(fromProto)
.usingRecursiveComparison()
.ignoringFields("ctx", "requiredArguments", "readinessStatus")
.ignoringFields("ctx", "requiredArguments", "readinessStatus", "latestTimestamp")
.isEqualTo(state);
ArgumentEntry fromProtoArgument = fromProto.getArguments().get("geofencingArgumentTest");

8
common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java

@ -16,7 +16,7 @@
package org.thingsboard.server.common.data.tenant.profile;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Positive;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -174,12 +174,16 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private long maxArgumentsPerCF = 10;
@Schema(example = "60")
private int minAllowedScheduledUpdateIntervalInSecForCF = 60;
@Builder.Default
@Schema(example = "10")
@Positive
private int maxRelationLevelPerCfArgument = 10;
@Builder.Default
@Schema(example = "100")
@Positive
private int maxRelatedEntitiesToReturnPerCfArgument = 100;
@Builder.Default
@Min(value = 1, message = "must be at least 1")
@Positive
@Schema(example = "1000")
private long maxDataPointsPerRollingArg = 1000;
@Schema(example = "32")

Loading…
Cancel
Save