Browse Source

updated init method to handle restore events

pull/14253/head
IrynaMatveieva 7 months ago
parent
commit
c03085bbe7
  1. 6
      application/src/main/data/upgrade/basic/schema_update.sql
  2. 10
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  3. 14
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  4. 2
      application/src/main/java/org/thingsboard/server/controller/SystemInfoController.java
  5. 5
      application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java
  6. 9
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  7. 6
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java
  8. 18
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java
  9. 11
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java
  10. 6
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java
  11. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  12. 24
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  13. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java
  14. 8
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java
  15. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java
  16. 7
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java
  17. 16
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java
  18. 7
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java
  19. 3
      application/src/main/resources/thingsboard.yml
  20. 2
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java
  21. 2
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java
  22. 2
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java
  23. 2
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java
  24. 2
      common/data/src/main/java/org/thingsboard/server/common/data/SystemParams.java
  25. 6
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java
  26. 4
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java
  27. 2
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java
  28. 2
      common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java
  29. 8
      dao/src/main/java/org/thingsboard/server/dao/service/validator/CalculatedFieldDataValidator.java
  30. 2
      ui-ngx/src/app/core/auth/auth.models.ts
  31. 2
      ui-ngx/src/app/core/auth/auth.reducer.ts
  32. 4
      ui-ngx/src/app/modules/home/components/calculated-fields/components/entity-aggregation-configuration/entity-aggregation-component.component.html
  33. 6
      ui-ngx/src/app/modules/home/components/calculated-fields/components/entity-aggregation-configuration/entity-aggregation-component.component.ts
  34. 6
      ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html
  35. 2
      ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts
  36. 4
      ui-ngx/src/app/shared/models/tenant.model.ts

6
application/src/main/data/upgrade/basic/schema_update.sql

@ -47,9 +47,9 @@ SET profile_data = jsonb_set(
THEN NULL
ELSE to_jsonb(60)
END,
'minAggregationIntervalInSecForCF',
'minAllowedAggregationIntervalInSecForCF',
CASE
WHEN (profile_data -> 'configuration') ? 'minAggregationIntervalInSecForCF'
WHEN (profile_data -> 'configuration') ? 'minAllowedAggregationIntervalInSecForCF'
THEN NULL
ELSE to_jsonb(60)
END
@ -66,7 +66,7 @@ WHERE NOT (
AND
(profile_data -> 'configuration') ? 'minAllowedDeduplicationIntervalInSecForCF'
AND
(profile_data -> 'configuration') ? 'minAggregationIntervalInSecForCF'
(profile_data -> 'configuration') ? 'minAllowedAggregationIntervalInSecForCF'
);
-- UPDATE TENANT PROFILE CONFIGURATION END

10
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -97,8 +97,8 @@ import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.queue.QueueStatsService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.TbResourceDataCache;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.resource.TbResourceDataCache;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleNodeStateService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
@ -664,10 +664,14 @@ public class ActorSystemContext {
@Getter
private long cfCalculationResultTimeout;
@Value("${actors.calculated_fields.check_interval:120}")
@Value("${actors.calculated_fields.check_interval:60}")
@Getter
private long cfCheckInterval;
@Value("${actors.alarms.reevaluation_interval:120}")
@Getter
private long alarmRulesReevaluationInterval;
@Autowired
@Getter
private MqttClientSettings mqttClientSettings;
@ -851,7 +855,7 @@ public class ActorSystemContext {
if (arguments != null) {
eventBuilder.arguments(JacksonUtil.toString(
arguments.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toTbelCfArg()))
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().jsonValue()))
));
}
if (result != null) {

14
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -53,7 +53,6 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState;
@ -124,12 +123,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state != null) {
state.setCtx(msg.getCtx(), actorCtx);
state.setPartition(msg.getPartition());
if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) {
relatedEntitiesAggState.scheduleReevaluation();
}
if (state instanceof EntityAggregationCalculatedFieldState entityAggState) {
entityAggState.fillMissingIntervals();
}
state.init(true);
states.put(cfId, state);
} else {
removeState(cfId);
@ -140,7 +134,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
log.debug("Processing CF state partition restore msg: {}", msg);
for (CalculatedFieldState state : states.values()) {
if (msg.getPartition().equals(state.getPartition())) {
state.init();
state.init(false);
}
}
}
@ -455,7 +449,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
private void initState(CalculatedFieldState state, CalculatedFieldCtx ctx) {
state.setCtx(ctx, actorCtx);
state.init();
state.init(false);
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationQueryDynamicArguments()) {
GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state;
@ -504,7 +498,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} else {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
String errorMsg = ctx.isInitialized() ? state.getReadinessStatus().errorMsg() : "Calculated field state is not initialized!";
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, errorMsg);
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, errorMsg);
}
callback.onSuccess();
}

2
application/src/main/java/org/thingsboard/server/controller/SystemInfoController.java

@ -165,7 +165,7 @@ public class SystemInfoController extends BaseController {
systemParams.setMinAllowedScheduledUpdateIntervalInSecForCF(tenantProfileConfiguration.getMinAllowedScheduledUpdateIntervalInSecForCF());
systemParams.setMaxRelationLevelPerCfArgument(tenantProfileConfiguration.getMaxRelationLevelPerCfArgument());
systemParams.setMinAllowedDeduplicationIntervalInSecForCF(tenantProfileConfiguration.getMinAllowedDeduplicationIntervalInSecForCF());
systemParams.setMinAggregationIntervalInSecForCF(tenantProfileConfiguration.getMinAggregationIntervalInSecForCF());
systemParams.setMinAllowedAggregationIntervalInSecForCF(tenantProfileConfiguration.getMinAllowedAggregationIntervalInSecForCF());
systemParams.setTrendzSettings(trendzSettingsService.findTrendzSettings(currentUser.getTenantId()));
}
systemParams.setMobileQrEnabled(Optional.ofNullable(qrCodeSettingService.findQrCodeSettings(TenantId.SYS_TENANT_ID))

5
application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java

@ -164,9 +164,14 @@ public class TenantProfileController extends BaseController {
" \"warnThreshold\": 0,\n" +
" \"maxCalculatedFieldsPerEntity\": 5,\n" +
" \"maxArgumentsPerCF\": 10,\n" +
" \"minAllowedScheduledUpdateIntervalInSecForCF\": 60,\n" +
" \"maxRelationLevelPerCfArgument\": 10,\n" +
" \"maxRelatedEntitiesToReturnPerCfArgument\": 100,\n" +
" \"maxDataPointsPerRollingArg\": 1000,\n" +
" \"maxStateSizeInKBytes\": 32,\n" +
" \"maxSingleValueArgumentSizeInKBytes\": 2" +
" \"minAllowedDeduplicationIntervalInSecForCF\": 60" +
" \"minAllowedAggregationIntervalInSecForCF\": 60" +
" }\n" +
" },\n" +
" \"default\": false\n" +

9
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -199,12 +199,13 @@ public abstract class AbstractCalculatedFieldProcessingService {
}
protected Map<String, ListenableFuture<ArgumentEntry>> fetchEntityAggArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
EntityAggregationCalculatedFieldConfiguration aggConfig = (EntityAggregationCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
return aggConfig.getArguments().entrySet().stream()
if (!(ctx.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration config)) {
return Collections.emptyMap();
}
return config.getArguments().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> fetchTimeSeries(ctx.getTenantId(), entityId, entry.getValue(), aggConfig.getInterval(), ts)
entry -> fetchTimeSeries(ctx.getTenantId(), entityId, entry.getValue(), config.getInterval(), ts)
));
}

6
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java

@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -25,6 +26,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;
public interface CalculatedFieldCache {
@ -38,9 +40,7 @@ public interface CalculatedFieldCache {
List<CalculatedFieldCtx> getCalculatedFieldCtxsByEntityId(EntityId entityId);
List<CalculatedFieldCtx> getRelatedEntitiesAggCalculatedFieldCtxsByFilter(Predicate<CalculatedFieldCtx> relatedEntityFilter);
List<CalculatedFieldCtx> getEntityAggCalculatedFieldCtxsByFilter(Predicate<CalculatedFieldCtx> entityAggCfFilter);
Stream<CalculatedFieldCtx> getCalculatedFieldCtxsByType(CalculatedFieldType cfType);
boolean hasCalculatedFields(TenantId tenantId, EntityId entityId, Predicate<CalculatedFieldCtx> filter);

18
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java

@ -49,6 +49,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Stream;
@Service
@Slf4j
@ -148,21 +149,10 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
}
@Override
public List<CalculatedFieldCtx> getRelatedEntitiesAggCalculatedFieldCtxsByFilter(Predicate<CalculatedFieldCtx> relatedEntityFilter) {
public Stream<CalculatedFieldCtx> getCalculatedFieldCtxsByType(CalculatedFieldType cfType) {
return calculatedFields.values().stream()
.filter(cf -> CalculatedFieldType.RELATED_ENTITIES_AGGREGATION.equals(cf.getType()))
.map(cf -> getCalculatedFieldCtx(cf.getId()))
.filter(relatedEntityFilter)
.toList();
}
@Override
public List<CalculatedFieldCtx> getEntityAggCalculatedFieldCtxsByFilter(Predicate<CalculatedFieldCtx> entityAggCfFilter) {
return calculatedFields.values().stream()
.filter(cf -> CalculatedFieldType.ENTITY_AGGREGATION.equals(cf.getType()))
.map(cf -> getCalculatedFieldCtx(cf.getId()))
.filter(entityAggCfFilter)
.toList();
.filter(cf -> cfType.equals(cf.getType()))
.map(cf -> getCalculatedFieldCtx(cf.getId()));
}
@Override

11
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java

@ -27,6 +27,7 @@ import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
@ -188,13 +189,15 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
}
}
List<CalculatedFieldCtx> entityAggCfCtxs = calculatedFieldCache.getEntityAggCalculatedFieldCtxsByFilter(filter);
if (!entityAggCfCtxs.isEmpty()) {
boolean hasMatchesEntityAggCfs = calculatedFieldCache.getCalculatedFieldCtxsByType(CalculatedFieldType.ENTITY_AGGREGATION).anyMatch(filter);
if (hasMatchesEntityAggCfs) {
return true;
}
List<CalculatedFieldCtx> relatedEntityAggCfCtxs = calculatedFieldCache.getRelatedEntitiesAggCalculatedFieldCtxsByFilter(relatedEntityFilter);
for (CalculatedFieldCtx cfCtx : relatedEntityAggCfCtxs) {
List<CalculatedFieldCtx> relatedEntitiesAggregationCfs = calculatedFieldCache.getCalculatedFieldCtxsByType(CalculatedFieldType.RELATED_ENTITIES_AGGREGATION)
.filter(relatedEntityFilter)
.toList();
for (CalculatedFieldCtx cfCtx : relatedEntitiesAggregationCfs) {
if (cfCtx.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) {
RelationPathLevel relation = aggConfig.getRelation();
EntitySearchDirection inverseDirection = switch (relation.direction()) {

6
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java

@ -18,6 +18,8 @@ package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -54,6 +56,10 @@ public interface ArgumentEntry {
boolean isEmpty();
default JsonNode jsonValue() {
return JacksonUtil.valueToTree(toTbelCfArg());
}
TbelCfArg toTbelCfArg();
boolean isForceResetPrevious();

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -62,7 +62,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState,
}
@Override
public void init() {
public void init(boolean restored) {
}
@Override

24
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -96,9 +96,11 @@ public class CalculatedFieldCtx implements Closeable {
private Output output;
private String expression;
private boolean useLatestTs;
private boolean requiresScheduledReevaluation;
private long aggCheckInterval;
private long cfCheckInterval;
private long alarmReevaluationInterval;
private long lastReevaluationTs;
private ActorSystemContext systemContext;
private TbelInvokeService tbelInvokeService;
@ -200,7 +202,8 @@ public class CalculatedFieldCtx implements Closeable {
if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfig) {
this.useLatestTs = aggConfig.isUseLatestTs();
}
this.aggCheckInterval = systemContext.getCfCheckInterval();
this.cfCheckInterval = systemContext.getCfCheckInterval();
this.alarmReevaluationInterval = systemContext.getAlarmRulesReevaluationInterval();
this.systemContext = systemContext;
this.tbelInvokeService = systemContext.getTbelInvokeService();
this.relationService = systemContext.getRelationService();
@ -213,19 +216,28 @@ public class CalculatedFieldCtx implements Closeable {
}
public boolean isRequiresScheduledReevaluation() {
long now = System.currentTimeMillis();
long cfCheckIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval());
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) {
long now = System.currentTimeMillis();
Watermark watermark = entityAggregationConfig.getWatermark();
if (watermark != null && watermark.getDuration() > 0) {
return true;
}
long cfCheckIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval());
long intervalEndTs = entityAggregationConfig.getInterval().getCurrentIntervalEndTs();
if (now + cfCheckIntervalMillis >= intervalEndTs) {
return true;
}
}
return calculatedField.getConfiguration().requiresScheduledReevaluation();
long reevaluationIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getAlarmRulesReevaluationInterval());
boolean requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation();
if (requiresScheduledReevaluation) {
if (now + cfCheckIntervalMillis >= lastReevaluationTs + reevaluationIntervalMillis) {
lastReevaluationTs = now;
return true;
}
return false;
}
return false;
}
public void init() {

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java

@ -27,8 +27,8 @@ import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.aggregation.single.EntityAggregationCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState;
@ -63,7 +63,7 @@ public interface CalculatedFieldState extends Closeable {
void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx);
void init();
void init(boolean restored);
Map<String, ArgumentEntry> update(Map<String, ArgumentEntry> arguments, CalculatedFieldCtx ctx);

8
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java

@ -74,6 +74,14 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat
deduplicationIntervalMs = SECONDS.toMillis(configuration.getDeduplicationIntervalInSec());
}
@Override
public void init(boolean restored) {
super.init(restored);
if (restored) {
scheduleReevaluation();
}
}
@Override
public void close() {
super.close();

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/AggIntervalEntry.java

@ -26,7 +26,7 @@ public class AggIntervalEntry {
private Long endTs;
public boolean belongsToInterval(long ts) {
return ts >= startTs && ts <= endTs;
return ts >= startTs && ts < endTs;
}
}

7
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationArgumentEntry.java

@ -15,7 +15,9 @@
*/
package org.thingsboard.server.service.cf.ctx.state.aggregation.single;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType;
@ -66,6 +68,11 @@ public class EntityAggregationArgumentEntry implements ArgumentEntry {
return aggIntervals.isEmpty();
}
@Override
public JsonNode jsonValue() {
return JacksonUtil.valueToTree(aggIntervals);
}
@Override
public TbelCfArg toTbelCfArg() {
return null;

16
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java

@ -68,11 +68,19 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
intervalDuration = configuration.getInterval().getIntervalDurationMillis();
Watermark watermark = configuration.getWatermark();
watermarkDuration = watermark == null ? 0 : TimeUnit.SECONDS.toMillis(watermark.getDuration());
checkInterval = TimeUnit.SECONDS.toMillis(ctx.getAggCheckInterval());
checkInterval = TimeUnit.SECONDS.toMillis(ctx.getCfCheckInterval());
interval = configuration.getInterval();
metrics = configuration.getMetrics();
}
@Override
public void init(boolean restored) {
super.init(restored);
if (restored) {
fillMissingIntervals();
}
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.ENTITY_AGGREGATION;
@ -114,9 +122,7 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
AggIntervalEntry currentInterval = new AggIntervalEntry(interval.getCurrentIntervalStartTs(), interval.getCurrentIntervalEndTs());
arguments.forEach((argName, argumentEntry) -> {
var entityAggEntry = (EntityAggregationArgumentEntry) argumentEntry;
if (!entityAggEntry.getAggIntervals().containsKey(currentInterval)) {
entityAggEntry.getAggIntervals().computeIfAbsent(currentInterval, current -> new AggIntervalEntryStatus());
}
entityAggEntry.getAggIntervals().computeIfAbsent(currentInterval, current -> new AggIntervalEntryStatus());
});
}
@ -244,7 +250,7 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt
}
ObjectNode resultNode = JacksonUtil.newObjectNode();
if (!metricsNode.isEmpty()) {
resultNode.put("ts", interval.getEndTs());
resultNode.put("ts", interval.getEndTs() - 1);
resultNode.set("values", metricsNode);
}
result.add(resultNode);

7
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java

@ -122,8 +122,11 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
}
@Override
public void init() {
super.init();
public void init(boolean restored) {
super.init(restored);
if (restored) {
return;
}
AtomicBoolean reevalNeeded = new AtomicBoolean(false);
Map<AlarmSeverity, AlarmRule> createRules = configuration.getCreateRules();
for (AlarmSeverity severity : AlarmSeverity.values()) {

3
application/src/main/resources/thingsboard.yml

@ -531,6 +531,9 @@ actors:
calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}"
# Interval in seconds to re-evaluate calculated fields that have a time schedule. 1 minute by default.
check_interval: "${ACTORS_CALCULATED_FIELDS_CHECK_INTERVAL_SEC:60}"
alarms:
# Interval in seconds to re-evaluate Alarm rules that have a time schedule. 2 minutes by default.
reevaluation_interval: "${ACTORS_ALARMS_REEVALUATION_INTERVAL_SEC:120}"
debug:
settings:

2
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java

@ -105,7 +105,7 @@ public class GeofencingCalculatedFieldStateTest {
ctx.init();
state = new GeofencingCalculatedFieldState(ctx.getEntityId());
state.setCtx(ctx, null);
state.init();
state.init(false);
}
@Test

2
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/PropagationCalculatedFieldStateTest.java

@ -105,7 +105,7 @@ public class PropagationCalculatedFieldStateTest {
state = new PropagationCalculatedFieldState(ctx.getEntityId());
state.setCtx(ctx, null);
state.init();
state.init(false);
}
@Test

2
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java

@ -88,7 +88,7 @@ public class ScriptCalculatedFieldStateTest {
ctx.init();
state = new ScriptCalculatedFieldState(ctx.getEntityId());
state.setCtx(ctx, null);
state.init();
state.init(false);
}
@Test

2
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java

@ -80,7 +80,7 @@ public class SimpleCalculatedFieldStateTest {
ctx.init();
state = new SimpleCalculatedFieldState(ctx.getEntityId());
state.setCtx(ctx, null);
state.init();
state.init(false);
}
@Test

2
common/data/src/main/java/org/thingsboard/server/common/data/SystemParams.java

@ -41,6 +41,6 @@ public class SystemParams {
int minAllowedScheduledUpdateIntervalInSecForCF;
int maxRelationLevelPerCfArgument;
long minAllowedDeduplicationIntervalInSecForCF;
long minAggregationIntervalInSecForCF;
long minAllowedAggregationIntervalInSecForCF;
TrendzSettings trendzSettings;
}

6
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/EntityAggregationCalculatedFieldConfiguration.java

@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.cf.configuration.aggregation.single;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
@ -36,8 +37,13 @@ public class EntityAggregationCalculatedFieldConfiguration implements ArgumentsB
@Valid
@NotEmpty
private Map<String, AggMetric> metrics;
@Valid
@NotNull
private AggInterval interval;
@Valid
private Watermark watermark;
@Valid
@NotNull
private Output output;
@Override

4
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/BaseAggInterval.java

@ -17,6 +17,9 @@ package org.thingsboard.server.common.data.cf.configuration.aggregation.single.i
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.time.DayOfWeek;
@ -32,6 +35,7 @@ import java.time.temporal.TemporalAdjusters;
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class BaseAggInterval implements AggInterval {
@NotBlank
protected String tz;
protected Long offsetSec; // delay seconds since start of interval

2
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/single/interval/Watermark.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.data.cf.configuration.aggregation.single.interval;
import jakarta.validation.constraints.Min;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -24,6 +25,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class Watermark {
@Min(0)
private long duration;
}

2
common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java

@ -189,7 +189,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
@Schema(example = "60")
private long minAllowedDeduplicationIntervalInSecForCF = 60;
@Schema(example = "60")
private long minAggregationIntervalInSecForCF = 60;
private long minAllowedAggregationIntervalInSecForCF = 60;
@Override
public long getProfileThreshold(ApiUsageRecordKey key) {

8
dao/src/main/java/org/thingsboard/server/dao/service/validator/CalculatedFieldDataValidator.java

@ -123,10 +123,10 @@ public class CalculatedFieldDataValidator extends DataValidator<CalculatedField>
if (!(calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration aggConfiguration)) {
return;
}
long minAllowedDeduplicationInterval = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMinAllowedDeduplicationIntervalInSecForCF);
if (aggConfiguration.getDeduplicationIntervalInSec() < minAllowedDeduplicationInterval) {
long minDeduplicationInterval = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMinAllowedDeduplicationIntervalInSecForCF);
if (aggConfiguration.getDeduplicationIntervalInSec() < minDeduplicationInterval) {
throw new IllegalArgumentException("Deduplication interval is less than configured " +
"minimum allowed interval in tenant profile: " + minAllowedDeduplicationInterval);
"minimum allowed interval in tenant profile: " + minDeduplicationInterval);
}
}
@ -134,7 +134,7 @@ public class CalculatedFieldDataValidator extends DataValidator<CalculatedField>
if (!(calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration aggConfiguration)) {
return;
}
long minAggregationIntervalInSec = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMinAggregationIntervalInSecForCF);
long minAggregationIntervalInSec = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMinAllowedAggregationIntervalInSecForCF);
if (minAggregationIntervalInSec <= 0) {
return;
}

2
ui-ngx/src/app/core/auth/auth.models.ts

@ -32,7 +32,7 @@ export interface SysParamsState {
maxDataPointsPerRollingArg: number;
maxArgumentsPerCF: number;
minAllowedDeduplicationIntervalInSecForCF: number;
minAggregationIntervalInSecForCF: number;
minAllowedAggregationIntervalInSecForCF: number;
minAllowedScheduledUpdateIntervalInSecForCF: number;
maxRelationLevelPerCfArgument: number;
ruleChainDebugPerTenantLimitsConfiguration?: string;

2
ui-ngx/src/app/core/auth/auth.reducer.ts

@ -34,7 +34,7 @@ const emptyUserAuthState: AuthPayload = {
maxResourceSize: 0,
maxArgumentsPerCF: 0,
minAllowedDeduplicationIntervalInSecForCF: 0,
minAggregationIntervalInSecForCF: 0,
minAllowedAggregationIntervalInSecForCF: 0,
minAllowedScheduledUpdateIntervalInSecForCF: 0,
maxRelationLevelPerCfArgument: 0,
maxDataPointsPerRollingArg: 0,

4
ui-ngx/src/app/modules/home/components/calculated-fields/components/entity-aggregation-configuration/entity-aggregation-component.component.html

@ -62,14 +62,14 @@
</div>
@if (entityAggregationConfiguration.get('interval.type').value === AggIntervalType.CUSTOM) {
<tb-time-unit-input required
[minTime]="minAggregationIntervalInSecForCF"
[minTime]="minAllowedAggregationIntervalInSecForCF"
[stepMultipleOf]="DayInSec"
sameWidthInputs
appearance="outline"
subscriptSizing="dynamic"
containerClass="flex gap-3"
labelText="{{ 'calculated-fields.aggregate-interval-value' | translate }}"
minErrorText="{{ 'calculated-fields.aggregate-interval-value-min' | translate : {sec: minAggregationIntervalInSecForCF} }}"
minErrorText="{{ 'calculated-fields.aggregate-interval-value-min' | translate : {sec: minAllowedAggregationIntervalInSecForCF} }}"
requiredText="{{ 'calculated-fields.aggregate-interval-value-required' | translate }}"
stepMultipleOfErrorText="{{ 'calculated-fields.aggregate-interval-value-step-multiple-of' | translate }}"
formControlName="durationSec">

6
ui-ngx/src/app/modules/home/components/calculated-fields/components/entity-aggregation-configuration/entity-aggregation-component.component.ts

@ -85,7 +85,7 @@ export class EntityAggregationComponentComponent implements ControlValueAccessor
@Input({required: true})
entityName: string;
readonly minAggregationIntervalInSecForCF = getCurrentAuthState(this.store).minAggregationIntervalInSecForCF;
readonly minAllowedAggregationIntervalInSecForCF = getCurrentAuthState(this.store).minAllowedAggregationIntervalInSecForCF;
readonly DayInSec = DAY / SECOND;
entityAggregationConfiguration = this.fb.group({
@ -94,9 +94,9 @@ export class EntityAggregationComponentComponent implements ControlValueAccessor
interval: this.fb.group({
type: [AggIntervalType.HOUR],
tz: ['', Validators.required],
durationSec: [this.minAggregationIntervalInSecForCF, Validators.required],
durationSec: [this.minAllowedAggregationIntervalInSecForCF, Validators.required],
allowOffsetSec: [false],
offsetSec: [this.minAggregationIntervalInSecForCF > 60 ? MINUTE / SECOND : 1, Validators.required],
offsetSec: [this.minAllowedAggregationIntervalInSecForCF > 60 ? MINUTE / SECOND : 1, Validators.required],
}),
allowWatermark: [false],
watermark: this.fb.group({

6
ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html

@ -344,12 +344,12 @@
<mat-form-field class="mat-block flex-1" appearance="fill" subscriptSizing="dynamic">
<mat-label translate>tenant-profile.min-allowed-aggregation-interval</mat-label>
<input matInput required min="0" step="1"
formControlName="minAggregationIntervalInSecForCF"
formControlName="minAllowedAggregationIntervalInSecForCF"
type="number">
<mat-error *ngIf="tenantProfileConfigurationForm.get('minAggregationIntervalInSecForCF').hasError('required')">
<mat-error *ngIf="tenantProfileConfigurationForm.get('minAllowedAggregationIntervalInSecForCF').hasError('required')">
{{ 'tenant-profile.min-allowed-aggregation-interval-required' | translate}}
</mat-error>
<mat-error *ngIf="tenantProfileConfigurationForm.get('minAggregationIntervalInSecForCF').hasError('min')">
<mat-error *ngIf="tenantProfileConfigurationForm.get('minAllowedAggregationIntervalInSecForCF').hasError('min')">
{{ 'tenant-profile.min-allowed-aggregation-interval-range' | translate}}
</mat-error>
<mat-hint></mat-hint>

2
ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts

@ -117,7 +117,7 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
maxArgumentsPerCF: [0, [Validators.required, Validators.min(0)]],
maxRelationLevelPerCfArgument: [1, [Validators.required, Validators.min(1)]],
minAllowedDeduplicationIntervalInSecForCF: [0, [Validators.required, Validators.min(0)]],
minAggregationIntervalInSecForCF: [0, [Validators.required, Validators.min(0)]],
minAllowedAggregationIntervalInSecForCF: [0, [Validators.required, Validators.min(0)]],
maxRelatedEntitiesToReturnPerCfArgument: [1, [Validators.required, Validators.min(1)]],
minAllowedScheduledUpdateIntervalInSecForCF: [0, [Validators.required, Validators.min(0)]],
maxDataPointsPerRollingArg: [0, [Validators.required, Validators.min(0)]],

4
ui-ngx/src/app/shared/models/tenant.model.ts

@ -108,7 +108,7 @@ export interface DefaultTenantProfileConfiguration {
maxArgumentsPerCF: number;
maxRelationLevelPerCfArgument: number;
minAllowedDeduplicationIntervalInSecForCF: number;
minAggregationIntervalInSecForCF: number;
minAllowedAggregationIntervalInSecForCF: number;
maxRelatedEntitiesToReturnPerCfArgument: number;
minAllowedScheduledUpdateIntervalInSecForCF: number;
maxDataPointsPerRollingArg: number;
@ -177,7 +177,7 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan
maxDataPointsPerRollingArg: 1000,
maxRelationLevelPerCfArgument: 10,
minAllowedDeduplicationIntervalInSecForCF: 60,
minAggregationIntervalInSecForCF: 60,
minAllowedAggregationIntervalInSecForCF: 60,
maxRelatedEntitiesToReturnPerCfArgument: 100,
minAllowedScheduledUpdateIntervalInSecForCF: 0,
maxStateSizeInKBytes: 32,

Loading…
Cancel
Save