Browse Source

Fix CFs restore and reevaluation; improve alarm rule duration check

pull/14387/head
Viacheslav Klimov 6 months ago
parent
commit
bc199931db
  1. 17
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 2
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  3. 2
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java
  4. 34
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  5. 1
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java
  6. 37
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java
  7. 5
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java
  8. 2
      common/proto/src/main/proto/queue.proto

17
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -342,17 +342,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
public void process(CalculatedFieldReevaluateMsg msg) throws CalculatedFieldException {
CalculatedFieldId cfId = msg.getCtx().getCfId();
CalculatedFieldCtx ctx = msg.getCtx();
CalculatedFieldId cfId = ctx.getCfId();
CalculatedFieldState state = states.get(cfId);
if (state == null) {
log.debug("[{}][{}] Failed to find CF state for entity to handle {}", entityId, cfId, msg);
log.warn("[{}][{}] Failed to find CF state (probably wasn't restored properly) for entity to handle {}", entityId, cfId, msg);
state = createState(ctx);
}
if (state.isSizeOk()) {
log.debug("[{}][{}] Reevaluating CF state", entityId, cfId);
processStateIfReady(state, null, ctx, Collections.singletonList(cfId), null, null, msg.getCallback());
} else {
if (state.isSizeOk()) {
log.debug("[{}][{}] Reevaluating CF state", entityId, cfId);
processStateIfReady(state, null, msg.getCtx(), Collections.singletonList(cfId), null, null, msg.getCallback());
} else {
throw new RuntimeException(msg.getCtx().getSizeExceedsLimitMessage());
}
throw new RuntimeException(ctx.getSizeExceedsLimitMessage());
}
}

2
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -165,7 +165,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
if (ctx != null) {
msg.setCtx(ctx);
log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId());
getOrCreateActor(msg.getId().entityId()).tell(msg);
getOrCreateActor(msg.getId().entityId()).tellWithHighPriority(msg);
} else {
cfStateService.deleteState(msg.getId(), msg.getCallback());
}

2
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java

@ -84,7 +84,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TopicPartitionInfo partition) {
partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME);
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state, partition));
actorSystemContext.tellWithHighPriority(new CalculatedFieldStateRestoreMsg(id, state, partition));
}
@Override

34
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -210,14 +210,14 @@ public class CalculatedFieldCtx implements Closeable {
public boolean isRequiresScheduledReevaluation() {
long now = System.currentTimeMillis();
long cfCheckIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getCfCheckInterval());
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration entityAggregationConfig) {
Watermark watermark = entityAggregationConfig.getWatermark();
if (watermark != null && watermark.getDuration() > 0) {
return true;
}
long intervalEndTs = entityAggregationConfig.getInterval().getCurrentIntervalEndTs();
if (now + cfCheckIntervalMillis >= intervalEndTs) {
long intervalDurationMillis = entityAggregationConfig.getInterval().getCurrentIntervalDurationMillis();
if (now - lastReevaluationTs >= intervalDurationMillis) {
lastReevaluationTs = now;
return true;
}
}
@ -225,7 +225,7 @@ public class CalculatedFieldCtx implements Closeable {
if (calculatedField.getConfiguration() instanceof AlarmCalculatedFieldConfiguration) {
long reevaluationIntervalMillis = TimeUnit.SECONDS.toMillis(systemContext.getAlarmRulesReevaluationInterval());
if (requiresScheduledReevaluation) {
if (now + cfCheckIntervalMillis >= lastReevaluationTs + reevaluationIntervalMillis) {
if (now - lastReevaluationTs >= reevaluationIntervalMillis) {
lastReevaluationTs = now;
return true;
}
@ -618,8 +618,8 @@ public class CalculatedFieldCtx implements Closeable {
return true;
}
if (calculatedField.getConfiguration() instanceof SimpleCalculatedFieldConfiguration thisConfig
&& other.calculatedField.getConfiguration() instanceof SimpleCalculatedFieldConfiguration otherConfig
&& thisConfig.isUseLatestTs() != otherConfig.isUseLatestTs()) {
&& other.calculatedField.getConfiguration() instanceof SimpleCalculatedFieldConfiguration otherConfig
&& thisConfig.isUseLatestTs() != otherConfig.isUseLatestTs()) {
return true;
}
if (cfType == CalculatedFieldType.ALARM) {
@ -638,12 +638,12 @@ public class CalculatedFieldCtx implements Closeable {
return true;
}
if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration thisConfig
&& other.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig
&& (thisConfig.getDeduplicationIntervalInSec() != otherConfig.getDeduplicationIntervalInSec() || !thisConfig.getMetrics().equals(otherConfig.getMetrics()))) {
&& other.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig
&& (thisConfig.getDeduplicationIntervalInSec() != otherConfig.getDeduplicationIntervalInSec() || !thisConfig.getMetrics().equals(otherConfig.getMetrics()))) {
return true;
}
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig
&& other.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) {
&& other.getCalculatedField().getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) {
boolean metricsChanged = thisConfig.getMetrics().equals(otherConfig.getMetrics());
boolean watermarkChanged = thisConfig.getWatermark().equals(otherConfig.getWatermark());
return metricsChanged || watermarkChanged;
@ -679,7 +679,7 @@ public class CalculatedFieldCtx implements Closeable {
private boolean hasGeofencingZoneGroupConfigurationChanges(CalculatedFieldCtx other) {
if (calculatedField.getConfiguration() instanceof GeofencingCalculatedFieldConfiguration thisConfig
&& other.calculatedField.getConfiguration() instanceof GeofencingCalculatedFieldConfiguration otherConfig) {
&& other.calculatedField.getConfiguration() instanceof GeofencingCalculatedFieldConfiguration otherConfig) {
return !thisConfig.getZoneGroups().equals(otherConfig.getZoneGroups());
}
return false;
@ -687,7 +687,7 @@ public class CalculatedFieldCtx implements Closeable {
private boolean hasRelatedEntitiesAggregationConfigurationChanges(CalculatedFieldCtx other) {
if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration thisConfig
&& other.calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig) {
&& other.calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig) {
return !thisConfig.getRelation().equals(otherConfig.getRelation());
}
return false;
@ -695,7 +695,7 @@ public class CalculatedFieldCtx implements Closeable {
private boolean hasEntityAggregationConfigurationChanges(CalculatedFieldCtx other) {
if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig
&& other.calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) {
&& other.calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration otherConfig) {
return !thisConfig.getInterval().equals(otherConfig.getInterval());
}
return false;
@ -720,7 +720,7 @@ public class CalculatedFieldCtx implements Closeable {
yield true;
}
yield geofencingState.getLastDynamicArgumentsRefreshTs() <
System.currentTimeMillis() - scheduledUpdateIntervalMillis;
System.currentTimeMillis() - scheduledUpdateIntervalMillis;
}
default -> false;
};
@ -764,10 +764,10 @@ public class CalculatedFieldCtx implements Closeable {
@Override
public String toString() {
return "CalculatedFieldCtx{" +
"cfId=" + cfId +
", cfType=" + cfType +
", entityId=" + entityId +
'}';
"cfId=" + cfId +
", cfType=" + cfType +
", entityId=" + entityId +
'}';
}
}

1
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java

@ -25,6 +25,7 @@ public class AlarmEvalResult {
public static final AlarmEvalResult TRUE = new AlarmEvalResult(Status.TRUE);
public static final AlarmEvalResult FALSE = new AlarmEvalResult(Status.FALSE);
public static final AlarmEvalResult NOT_YET_TRUE = new AlarmEvalResult(Status.NOT_YET_TRUE);
public static final AlarmEvalResult EMPTY = new AlarmEvalResult(null);
private final Status status;
private final long leftDuration;

37
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java

@ -55,7 +55,7 @@ public class AlarmRuleState {
private long eventCount;
private long firstEventTs; // when duration condition started
private long lastEventTs;
private long lastCheckTs;
private transient long duration;
private ScheduledFuture<?> durationCheckFuture;
private Boolean active;
@ -98,10 +98,17 @@ public class AlarmRuleState {
return AlarmEvalResult.FALSE;
}
long requiredDuration = getRequiredDurationInMs();
if (requiredDuration > 0 && lastEventTs > 0 && ts > lastEventTs) {
if (requiredDuration > 0 && firstEventTs > 0 && ts > firstEventTs) {
duration = ts - firstEventTs;
long prevDuration = lastCheckTs - firstEventTs;
lastCheckTs = ts;
long leftDuration = requiredDuration - duration;
if (leftDuration <= 0) {
if (prevDuration >= requiredDuration) {
// already evaluated as true on previous check, no need to update alarm
return AlarmEvalResult.EMPTY;
}
return AlarmEvalResult.TRUE;
} else {
return AlarmEvalResult.notYetTrue(0, leftDuration);
@ -143,21 +150,15 @@ public class AlarmRuleState {
private AlarmEvalResult evalDuration(CalculatedFieldCtx ctx) {
if (eval(condition.getExpression(), ctx)) {
long eventTs = state.getLatestTimestamp();
if (lastEventTs > 0) {
if (eventTs > lastEventTs) {
if (firstEventTs == 0) {
firstEventTs = lastEventTs;
}
lastEventTs = eventTs;
}
} else {
firstEventTs = eventTs;
lastEventTs = eventTs;
long ts = System.currentTimeMillis();
if (firstEventTs == 0) {
firstEventTs = state.getLatestTimestamp();
}
duration = lastEventTs - firstEventTs;
lastCheckTs = ts;
long requiredDuration = getRequiredDurationInMs();
if (requiredDuration > 0) {
if (requiredDuration > 0 && firstEventTs > 0 && ts > firstEventTs) {
duration = ts - firstEventTs;
long leftDuration = requiredDuration - duration;
if (leftDuration <= 0) {
return AlarmEvalResult.TRUE;
@ -247,7 +248,7 @@ public class AlarmRuleState {
private void clearDurationConditionState() {
firstEventTs = 0L;
lastEventTs = 0L;
lastCheckTs = 0L;
duration = 0L;
if (durationCheckFuture != null) {
durationCheckFuture.cancel(true);
@ -256,7 +257,7 @@ public class AlarmRuleState {
}
public boolean isEmpty() {
return eventCount == 0L && firstEventTs == 0L && lastEventTs == 0L && durationCheckFuture == null;
return eventCount == 0L && firstEventTs == 0L && lastCheckTs == 0L && durationCheckFuture == null;
}
private AlarmSchedule parseSchedule(String str) {
@ -330,7 +331,7 @@ public class AlarmRuleState {
", condition=" + condition +
", eventCount=" + eventCount +
", firstEventTs=" + firstEventTs +
", lastEventTs=" + lastEventTs +
", lastCheckTs=" + lastCheckTs +
", duration=" + duration +
", durationCheckFuture=" + durationCheckFuture +
'}';

5
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java

@ -124,6 +124,7 @@ public class CalculatedFieldUtils {
if (alarmState.getClearRuleState() != null) {
alarmStateProto.setClearRuleState(toAlarmRuleStateProto(alarmState.getClearRuleState()));
}
builder.setAlarmState(alarmStateProto);
}
if (state instanceof RelatedEntitiesAggregationCalculatedFieldState aggState) {
builder.setLastArgsUpdateTs(aggState.getLastArgsRefreshTs());
@ -137,7 +138,7 @@ public class CalculatedFieldUtils {
.setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse(""))
.setEventCount(ruleState.getEventCount())
.setFirstEventTs(ruleState.getFirstEventTs())
.setLastEventTs(ruleState.getLastEventTs())
.setLastCheckTs(ruleState.getLastCheckTs())
.build();
}
@ -146,7 +147,7 @@ public class CalculatedFieldUtils {
AlarmRuleState ruleState = new AlarmRuleState(severity, null, state);
ruleState.setEventCount(proto.getEventCount());
ruleState.setFirstEventTs(proto.getFirstEventTs());
ruleState.setLastEventTs(proto.getLastEventTs());
ruleState.setLastCheckTs(proto.getLastCheckTs());
return ruleState;
}

2
common/proto/src/main/proto/queue.proto

@ -1929,5 +1929,5 @@ message AlarmRuleStateProto {
string severity = 1;
int64 eventCount = 2;
int64 firstEventTs = 3;
int64 lastEventTs = 4;
int64 lastCheckTs = 4;
}

Loading…
Cancel
Save