From bc199931db720359cb70b467bcde989d9e4dc940 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Thu, 20 Nov 2025 15:31:16 +0200 Subject: [PATCH] Fix CFs restore and reevaluation; improve alarm rule duration check --- ...CalculatedFieldEntityMessageProcessor.java | 17 +++++---- ...alculatedFieldManagerMessageProcessor.java | 2 +- .../AbstractCalculatedFieldStateService.java | 2 +- .../cf/ctx/state/CalculatedFieldCtx.java | 34 ++++++++--------- .../cf/ctx/state/alarm/AlarmEvalResult.java | 1 + .../cf/ctx/state/alarm/AlarmRuleState.java | 37 ++++++++++--------- .../server/utils/CalculatedFieldUtils.java | 5 ++- common/proto/src/main/proto/queue.proto | 2 +- 8 files changed, 52 insertions(+), 48 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 8a6b2cc899..6cc1c50325 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -342,17 +342,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldReevaluateMsg msg) throws CalculatedFieldException { - CalculatedFieldId cfId = msg.getCtx().getCfId(); + CalculatedFieldCtx ctx = msg.getCtx(); + CalculatedFieldId cfId = ctx.getCfId(); CalculatedFieldState state = states.get(cfId); if (state == null) { - log.debug("[{}][{}] Failed to find CF state for entity to handle {}", entityId, cfId, msg); + log.warn("[{}][{}] Failed to find CF state (probably wasn't restored properly) for entity to handle {}", entityId, cfId, msg); + state = createState(ctx); + } + if (state.isSizeOk()) { + log.debug("[{}][{}] Reevaluating CF state", entityId, cfId); + processStateIfReady(state, null, ctx, Collections.singletonList(cfId), null, null, msg.getCallback()); } else { - if (state.isSizeOk()) { - log.debug("[{}][{}] Reevaluating CF state", entityId, cfId); - processStateIfReady(state, null, msg.getCtx(), Collections.singletonList(cfId), null, null, msg.getCallback()); - } else { - throw new RuntimeException(msg.getCtx().getSizeExceedsLimitMessage()); - } + throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 9dc1342b23..dc6757f88f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -165,7 +165,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware if (ctx != null) { msg.setCtx(ctx); log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId()); - getOrCreateActor(msg.getId().entityId()).tell(msg); + getOrCreateActor(msg.getId().entityId()).tellWithHighPriority(msg); } else { cfStateService.deleteState(msg.getId(), msg.getCallback()); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index e673577742..dd0bf45eb9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -84,7 +84,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TopicPartitionInfo partition) { partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME); - actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state, partition)); + actorSystemContext.tellWithHighPriority(new CalculatedFieldStateRestoreMsg(id, state, partition)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 1d2e2f505a..4ed1b3ea01 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -210,14 +210,14 @@ public class CalculatedFieldCtx implements Closeable { public boolean isRequiresScheduledReevaluation() { long now = System.currentTimeMillis(); - long cfCheckIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval()); if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) { Watermark watermark = entityAggregationConfig.getWatermark(); if (watermark != null && watermark.getDuration() > 0) { return true; } - long intervalEndTs = entityAggregationConfig.getInterval().getCurrentIntervalEndTs(); - if (now + cfCheckIntervalMillis >= intervalEndTs) { + long intervalDurationMillis = entityAggregationConfig.getInterval().getCurrentIntervalDurationMillis(); + if (now - lastReevaluationTs >= intervalDurationMillis) { + lastReevaluationTs = now; return true; } } @@ -225,7 +225,7 @@ public class CalculatedFieldCtx implements Closeable { if (calculatedField.getConfiguration() instanceof AlarmCalculatedFieldConfiguration) { long reevaluationIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getAlarmRulesReevaluationInterval()); if (requiresScheduledReevaluation) { - if (now + cfCheckIntervalMillis >= lastReevaluationTs + reevaluationIntervalMillis) { + if (now - lastReevaluationTs >= reevaluationIntervalMillis) { lastReevaluationTs = now; return true; } @@ -618,8 +618,8 @@ public class CalculatedFieldCtx implements Closeable { return true; } if (calculatedField.getConfiguration() instanceof SimpleCalculatedFieldConfiguration thisConfig - && other.calculatedField.getConfiguration() instanceof SimpleCalculatedFieldConfiguration otherConfig - && thisConfig.isUseLatestTs() != otherConfig.isUseLatestTs()) { + && other.calculatedField.getConfiguration() instanceof SimpleCalculatedFieldConfiguration otherConfig + && thisConfig.isUseLatestTs() != otherConfig.isUseLatestTs()) { return true; } if (cfType == CalculatedFieldType.ALARM) { @@ -638,12 +638,12 @@ public class CalculatedFieldCtx implements Closeable { return true; } if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration thisConfig - && other.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig - && (thisConfig.getDeduplicationIntervalInSec() != otherConfig.getDeduplicationIntervalInSec() || !thisConfig.getMetrics().equals(otherConfig.getMetrics()))) { + && other.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig + && (thisConfig.getDeduplicationIntervalInSec() != otherConfig.getDeduplicationIntervalInSec() || !thisConfig.getMetrics().equals(otherConfig.getMetrics()))) { return true; } if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig - && other.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) { + && other.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) { boolean metricsChanged = thisConfig.getMetrics().equals(otherConfig.getMetrics()); boolean watermarkChanged = thisConfig.getWatermark().equals(otherConfig.getWatermark()); return metricsChanged || watermarkChanged; @@ -679,7 +679,7 @@ public class CalculatedFieldCtx implements Closeable { private boolean hasGeofencingZoneGroupConfigurationChanges(CalculatedFieldCtx other) { if (calculatedField.getConfiguration() instanceof GeofencingCalculatedFieldConfiguration thisConfig - && other.calculatedField.getConfiguration() instanceof GeofencingCalculatedFieldConfiguration otherConfig) { + && other.calculatedField.getConfiguration() instanceof GeofencingCalculatedFieldConfiguration otherConfig) { return !thisConfig.getZoneGroups().equals(otherConfig.getZoneGroups()); } return false; @@ -687,7 +687,7 @@ public class CalculatedFieldCtx implements Closeable { private boolean hasRelatedEntitiesAggregationConfigurationChanges(CalculatedFieldCtx other) { if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration thisConfig - && other.calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig) { + && other.calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig) { return !thisConfig.getRelation().equals(otherConfig.getRelation()); } return false; @@ -695,7 +695,7 @@ public class CalculatedFieldCtx implements Closeable { private boolean hasEntityAggregationConfigurationChanges(CalculatedFieldCtx other) { if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig - && other.calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) { + && other.calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) { return !thisConfig.getInterval().equals(otherConfig.getInterval()); } return false; @@ -720,7 +720,7 @@ public class CalculatedFieldCtx implements Closeable { yield true; } yield geofencingState.getLastDynamicArgumentsRefreshTs() < - System.currentTimeMillis() - scheduledUpdateIntervalMillis; + System.currentTimeMillis() - scheduledUpdateIntervalMillis; } default -> false; }; @@ -764,10 +764,10 @@ public class CalculatedFieldCtx implements Closeable { @Override public String toString() { return "CalculatedFieldCtx{" + - "cfId=" + cfId + - ", cfType=" + cfType + - ", entityId=" + entityId + - '}'; + "cfId=" + cfId + + ", cfType=" + cfType + + ", entityId=" + entityId + + '}'; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java index 424a977c75..4f1a8638ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java @@ -25,6 +25,7 @@ public class AlarmEvalResult { public static final AlarmEvalResult TRUE = new AlarmEvalResult(Status.TRUE); public static final AlarmEvalResult FALSE = new AlarmEvalResult(Status.FALSE); public static final AlarmEvalResult NOT_YET_TRUE = new AlarmEvalResult(Status.NOT_YET_TRUE); + public static final AlarmEvalResult EMPTY = new AlarmEvalResult(null); private final Status status; private final long leftDuration; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java index 8612607dfb..4c189887b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java @@ -55,7 +55,7 @@ public class AlarmRuleState { private long eventCount; private long firstEventTs; // when duration condition started - private long lastEventTs; + private long lastCheckTs; private transient long duration; private ScheduledFuture durationCheckFuture; private Boolean active; @@ -98,10 +98,17 @@ public class AlarmRuleState { return AlarmEvalResult.FALSE; } long requiredDuration = getRequiredDurationInMs(); - if (requiredDuration > 0 && lastEventTs > 0 && ts > lastEventTs) { + if (requiredDuration > 0 && firstEventTs > 0 && ts > firstEventTs) { duration = ts - firstEventTs; + long prevDuration = lastCheckTs - firstEventTs; + lastCheckTs = ts; + long leftDuration = requiredDuration - duration; if (leftDuration <= 0) { + if (prevDuration >= requiredDuration) { + // already evaluated as true on previous check, no need to update alarm + return AlarmEvalResult.EMPTY; + } return AlarmEvalResult.TRUE; } else { return AlarmEvalResult.notYetTrue(0, leftDuration); @@ -143,21 +150,15 @@ public class AlarmRuleState { private AlarmEvalResult evalDuration(CalculatedFieldCtx ctx) { if (eval(condition.getExpression(), ctx)) { - long eventTs = state.getLatestTimestamp(); - if (lastEventTs > 0) { - if (eventTs > lastEventTs) { - if (firstEventTs == 0) { - firstEventTs = lastEventTs; - } - lastEventTs = eventTs; - } - } else { - firstEventTs = eventTs; - lastEventTs = eventTs; + long ts = System.currentTimeMillis(); + if (firstEventTs == 0) { + firstEventTs = state.getLatestTimestamp(); } - duration = lastEventTs - firstEventTs; + lastCheckTs = ts; + long requiredDuration = getRequiredDurationInMs(); - if (requiredDuration > 0) { + if (requiredDuration > 0 && firstEventTs > 0 && ts > firstEventTs) { + duration = ts - firstEventTs; long leftDuration = requiredDuration - duration; if (leftDuration <= 0) { return AlarmEvalResult.TRUE; @@ -247,7 +248,7 @@ public class AlarmRuleState { private void clearDurationConditionState() { firstEventTs = 0L; - lastEventTs = 0L; + lastCheckTs = 0L; duration = 0L; if (durationCheckFuture != null) { durationCheckFuture.cancel(true); @@ -256,7 +257,7 @@ public class AlarmRuleState { } public boolean isEmpty() { - return eventCount == 0L && firstEventTs == 0L && lastEventTs == 0L && durationCheckFuture == null; + return eventCount == 0L && firstEventTs == 0L && lastCheckTs == 0L && durationCheckFuture == null; } private AlarmSchedule parseSchedule(String str) { @@ -330,7 +331,7 @@ public class AlarmRuleState { ", condition=" + condition + ", eventCount=" + eventCount + ", firstEventTs=" + firstEventTs + - ", lastEventTs=" + lastEventTs + + ", lastCheckTs=" + lastCheckTs + ", duration=" + duration + ", durationCheckFuture=" + durationCheckFuture + '}'; diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 710ce48ef6..09ba7917e9 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -124,6 +124,7 @@ public class CalculatedFieldUtils { if (alarmState.getClearRuleState() != null) { alarmStateProto.setClearRuleState(toAlarmRuleStateProto(alarmState.getClearRuleState())); } + builder.setAlarmState(alarmStateProto); } if (state instanceof RelatedEntitiesAggregationCalculatedFieldState aggState) { builder.setLastArgsUpdateTs(aggState.getLastArgsRefreshTs()); @@ -137,7 +138,7 @@ public class CalculatedFieldUtils { .setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse("")) .setEventCount(ruleState.getEventCount()) .setFirstEventTs(ruleState.getFirstEventTs()) - .setLastEventTs(ruleState.getLastEventTs()) + .setLastCheckTs(ruleState.getLastCheckTs()) .build(); } @@ -146,7 +147,7 @@ public class CalculatedFieldUtils { AlarmRuleState ruleState = new AlarmRuleState(severity, null, state); ruleState.setEventCount(proto.getEventCount()); ruleState.setFirstEventTs(proto.getFirstEventTs()); - ruleState.setLastEventTs(proto.getLastEventTs()); + ruleState.setLastCheckTs(proto.getLastCheckTs()); return ruleState; } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index a2f7ccf80a..9fb8528bce 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1929,5 +1929,5 @@ message AlarmRuleStateProto { string severity = 1; int64 eventCount = 2; int64 firstEventTs = 3; - int64 lastEventTs = 4; + int64 lastCheckTs = 4; }