From 0b305e6f2a37b2e1f280ab95aaa6837196a7d3e4 Mon Sep 17 00:00:00 2001 From: VIacheslavKlimov Date: Mon, 6 Oct 2025 11:16:46 +0300 Subject: [PATCH] Alarm rules CF: real-time duration condition checks --- .../server/actors/ActorSystemContext.java | 9 +- .../server/actors/app/AppActor.java | 1 + .../CalculatedFieldEntityActor.java | 4 + ...CalculatedFieldEntityMessageProcessor.java | 43 ++++-- .../CalculatedFieldManagerActor.java | 4 + ...alculatedFieldManagerMessageProcessor.java | 30 +--- .../CalculatedFieldReevaluateMsg.java | 2 +- .../CalculatedFieldStateRestoreMsg.java | 4 + .../server/actors/tenant/TenantActor.java | 2 + .../AbstractCalculatedFieldStateService.java | 35 ++++- .../ctx/state/BaseCalculatedFieldState.java | 18 ++- .../cf/ctx/state/CalculatedFieldCtx.java | 15 +- .../cf/ctx/state/CalculatedFieldState.java | 12 +- .../KafkaCalculatedFieldStateService.java | 10 +- .../RocksDBCalculatedFieldStateService.java | 5 +- .../alarm/AlarmCalculatedFieldState.java | 133 ++++++++++++------ .../cf/ctx/state/alarm/AlarmEvalResult.java | 27 +++- .../cf/ctx/state/alarm/AlarmRuleState.java | 63 ++++++--- .../GeofencingCalculatedFieldState.java | 4 +- .../TbRuleEngineQueueConsumerManager.java | 3 +- .../server/utils/CalculatedFieldUtils.java | 24 ++-- .../src/main/resources/thingsboard.yml | 3 - .../thingsboard/server/cf/AlarmRulesTest.java | 7 +- .../GeofencingCalculatedFieldStateTest.java | 3 +- .../state/ScriptCalculatedFieldStateTest.java | 3 +- .../state/SimpleCalculatedFieldStateTest.java | 3 +- .../TbRuleEngineQueueConsumerManagerTest.java | 19 ++- .../ruleengine/TbRuleEngineStrategyTest.java | 4 +- .../server/queue/TbQueueConsumer.java | 2 + .../AlarmCalculatedFieldConfiguration.java | 7 - .../CalculatedFieldConfiguration.java | 4 - ...lculatedFieldStatePartitionRestoreMsg.java | 37 +++++ .../server/common/msg/MsgType.java | 1 + common/proto/src/main/proto/queue.proto | 6 +- .../AbstractTbQueueConsumerTemplate.java | 5 + .../consumer/MainQueueConsumerManager.java | 17 ++- .../common/consumer/TbQueueConsumerTask.java | 18 ++- .../state/DefaultQueueStateService.java | 19 +++ .../common/state/KafkaQueueStateService.java | 9 +- .../queue/common/state/QueueStateService.java | 26 ++-- .../queue/memory/InMemoryTbQueueConsumer.java | 5 + 41 files changed, 449 insertions(+), 197 deletions(-) create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/CalculatedFieldStatePartitionRestoreMsg.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 38c409036d..f3e636296a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -664,10 +664,6 @@ public class ActorSystemContext { @Getter private long cfCalculationResultTimeout; - @Value("${actors.alarms.reevaluation_interval:60}") - @Getter - private long alarmsReevaluationInterval; - @Autowired @Getter private MqttClientSettings mqttClientSettings; @@ -895,12 +891,13 @@ public class ActorSystemContext { return getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS); } - public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) { + public ScheduledFuture scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) { log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); if (delayInMs > 0) { - getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS); + return getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS); } else { ctx.tell(msg); + return null; } } diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index e515d58695..4715ea64d4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -88,6 +88,7 @@ public class AppActor extends ContextAwareActor { break; case PARTITION_CHANGE_MSG: case CF_PARTITIONS_CHANGE_MSG: + case CF_STATE_PARTITION_RESTORE_MSG: ctx.broadcastToChildren(msg, true); break; case COMPONENT_LIFE_CYCLE_MSG: diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java index e2a2b93436..cababd4b6d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -21,6 +21,7 @@ import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg; import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; @@ -63,6 +64,9 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor { case CF_STATE_RESTORE_MSG: processor.process((CalculatedFieldStateRestoreMsg) msg); break; + case CF_STATE_PARTITION_RESTORE_MSG: + processor.process((CalculatedFieldStatePartitionRestoreMsg) msg); + break; case CF_ENTITY_INIT_CF_MSG: processor.process((EntityInitCalculatedFieldMsg) msg); break; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index b90b20fb4b..0a175b4899 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -83,7 +84,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM final CalculatedFieldProcessingService cfService; final CalculatedFieldStateService cfStateService; - TbActorCtx ctx; + TbActorCtx actorCtx; Map states = new HashMap<>(); CalculatedFieldEntityMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) { @@ -95,7 +96,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } void init(TbActorCtx ctx) { - this.ctx = ctx; + this.actorCtx = ctx; } public void stop(boolean partitionChanged) { @@ -104,7 +105,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM "[{}][{}] Stopping entity actor.", tenantId, entityId); states.clear(); - ctx.stop(ctx.getSelf()); + actorCtx.stop(actorCtx.getSelf()); } public void process(CalculatedFieldPartitionChangeMsg msg) { @@ -116,13 +117,25 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM public void process(CalculatedFieldStateRestoreMsg msg) { CalculatedFieldId cfId = msg.getId().cfId(); log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); - if (msg.getState() != null) { - states.put(cfId, msg.getState()); + CalculatedFieldState state = msg.getState(); + if (state != null) { + state.setCtx(msg.getCtx(), actorCtx); + state.setPartition(msg.getPartition()); + states.put(cfId, state); } else { states.remove(cfId); } } + public void process(CalculatedFieldStatePartitionRestoreMsg msg) { + log.debug("Processing CF state partition restore msg: {}", msg); + for (CalculatedFieldState state : states.values()) { + if (msg.getPartition().equals(state.getPartition())) { + state.init(); + } + } + } + public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { log.debug("[{}] Processing entity init CF msg: {}", msg.getCtx().getCfId(), msg); var ctx = msg.getCtx(); @@ -138,10 +151,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM state = createState(ctx); } else if (msg.getStateAction() == StateAction.REINIT) { log.debug("Force reinitialization of CF: [{}].", ctx.getCfId()); - state.reset(ctx); + state.reset(); initState(state, ctx); } else { - state.init(ctx); + state.setCtx(ctx, actorCtx); + state.init(); } if (state.isSizeOk()) { processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback()); @@ -183,7 +197,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } else { MultipleTbCallback multipleTbCallback = new MultipleTbCallback(states.size(), msg.getCallback()); states.forEach((cfId, state) -> cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), multipleTbCallback)); - ctx.stop(ctx.getSelf()); + actorCtx.stop(actorCtx.getSelf()); } } else { var cfId = new CalculatedFieldId(msg.getEntityId().getId()); @@ -266,30 +280,30 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldReevaluateMsg msg) throws CalculatedFieldException { - CalculatedFieldId cfId = msg.getCfCtx().getCfId(); + CalculatedFieldId cfId = msg.getCtx().getCfId(); CalculatedFieldState state = states.get(cfId); if (state == null) { log.debug("[{}][{}] Failed to find CF state for entity to handle {}", entityId, cfId, msg); } else { if (state.isSizeOk()) { log.debug("[{}][{}] Reevaluating CF state", entityId, cfId); - processStateIfReady(state, null, msg.getCfCtx(), Collections.singletonList(cfId), null, null, msg.getCallback()); + processStateIfReady(state, null, msg.getCtx(), Collections.singletonList(cfId), null, null, msg.getCallback()); } else { - throw new RuntimeException(msg.getCfCtx().getSizeExceedsLimitMessage()); + throw new RuntimeException(msg.getCtx().getSizeExceedsLimitMessage()); } } } public void process(CalculatedFieldAlarmActionMsg msg) { log.debug("[{}] Processing alarm action event msg: {}", entityId, msg); - states.values().forEach(state -> { + for (CalculatedFieldState state : states.values()) { if (state instanceof AlarmCalculatedFieldState alarmCfState) { Alarm stateAlarm = alarmCfState.getCurrentAlarm(); if (stateAlarm != null && stateAlarm.getId().equals(msg.getAlarm().getId())) { alarmCfState.processAlarmAction(msg.getAlarm(), msg.getAction()); } } - }); + } msg.getCallback().onSuccess(); } @@ -352,7 +366,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } private void initState(CalculatedFieldState state, CalculatedFieldCtx ctx) { - state.init(ctx); + state.setCtx(ctx, actorCtx); + state.init(); if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.hasRelationQueryDynamicArguments()) { GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state; geofencingState.setLastDynamicArgumentsRefreshTs(System.currentTimeMillis()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index f1e15866eb..80daff07ef 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -20,6 +20,7 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorException; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg; import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg; @@ -70,6 +71,9 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { case CF_STATE_RESTORE_MSG: processor.onStateRestoreMsg((CalculatedFieldStateRestoreMsg) msg); break; + case CF_STATE_PARTITION_RESTORE_MSG: + processor.onStatePartitionRestoreMsg((CalculatedFieldStatePartitionRestoreMsg) msg); + break; case CF_ENTITY_LIFECYCLE_MSG: processor.onEntityLifecycleMsg((CalculatedFieldEntityLifecycleMsg) msg); break; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index d43a685c89..5e7437d12d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; @@ -63,8 +64,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; @@ -79,7 +78,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private final Map> entityIdCalculatedFields = new HashMap<>(); private final Map> entityIdCalculatedFieldLinks = new HashMap<>(); private final Map> ownerEntities = new HashMap<>(); - private ScheduledFuture cfsReevaluationTask; private final CalculatedFieldProcessingService cfExecService; private final CalculatedFieldStateService cfStateService; @@ -120,10 +118,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware calculatedFields.clear(); entityIdCalculatedFields.clear(); entityIdCalculatedFieldLinks.clear(); - if (cfsReevaluationTask != null) { - cfsReevaluationTask.cancel(true); - cfsReevaluationTask = null; - } ctx.stop(ctx.getSelf()); } @@ -131,7 +125,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.debug("[{}] Processing CF actor init message.", msg.getTenantId().getId()); initEntitiesCache(); initCalculatedFields(); - scheduleCfsReevaluation(); msg.getCallback().onSuccess(); } @@ -140,9 +133,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware var ctx = calculatedFields.get(cfId); if (ctx != null) { - if (msg.getState() != null) { - msg.getState().init(ctx); - } + msg.setCtx(ctx); log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId()); getOrCreateActor(msg.getId().entityId()).tell(msg); } else { @@ -150,21 +141,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } } - private void scheduleCfsReevaluation() { - cfsReevaluationTask = systemContext.getScheduler().scheduleWithFixedDelay(() -> { - try { - calculatedFields.values().forEach(cf -> { - if (cf.isRequiresScheduledReevaluation()) { - applyToTargetCfEntityActors(cf, TbCallback.EMPTY, (entityId, callback) -> { - log.debug("[{}][{}] Pushing scheduled CF reevaluate msg", entityId, cf.getCfId()); - getOrCreateActor(entityId).tell(new CalculatedFieldReevaluateMsg(tenantId, cf)); - }); - } - }); - } catch (Exception e) { - log.warn("[{}] Failed to trigger CFs reevaluation", tenantId, e); - } - }, systemContext.getAlarmsReevaluationInterval(), systemContext.getAlarmsReevaluationInterval(), TimeUnit.SECONDS); + public void onStatePartitionRestoreMsg(CalculatedFieldStatePartitionRestoreMsg msg) { + ctx.broadcastToChildren(msg, true); } public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldReevaluateMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldReevaluateMsg.java index a0b75d1a72..b617736ee0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldReevaluateMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldReevaluateMsg.java @@ -25,7 +25,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; public class CalculatedFieldReevaluateMsg implements ToCalculatedFieldSystemMsg { private final TenantId tenantId; - private final CalculatedFieldCtx cfCtx; + private final CalculatedFieldCtx ctx; @Override public MsgType getMsgType() { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java index 19be7c02fa..d1c2f11aeb 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java @@ -19,7 +19,9 @@ import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @Data @@ -27,6 +29,8 @@ public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMs private final CalculatedFieldEntityCtxId id; private final CalculatedFieldState state; + private final TopicPartitionInfo partition; + private CalculatedFieldCtx ctx; @Override public MsgType getMsgType() { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 35cbf3ccd8..e33965a42e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -186,6 +186,7 @@ public class TenantActor extends RuleChainManagerActor { case CF_CACHE_INIT_MSG: case CF_STATE_RESTORE_MSG: case CF_PARTITIONS_CHANGE_MSG: + case CF_STATE_PARTITION_RESTORE_MSG: forwardToCfActor((ToCalculatedFieldSystemMsg) msg, true); break; case CF_TELEMETRY_MSG: @@ -394,6 +395,7 @@ public class TenantActor extends RuleChainManagerActor { public TbActor createActor() { return new TenantActor(context, tenantId); } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index a77eb71343..c8b99afd9e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -15,10 +15,15 @@ */ package org.thingsboard.server.service.cf; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.exception.TenantNotFoundException; +import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.exception.CalculatedFieldStateException; @@ -37,6 +42,7 @@ import java.util.stream.Collectors; import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; +@Slf4j public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { @Autowired @@ -62,19 +68,38 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback); - protected void processRestoredState(CalculatedFieldStateProto stateMsg) { + protected void processRestoredState(CalculatedFieldStateProto stateMsg, TopicPartitionInfo partition) { var id = fromProto(stateMsg.getId()); + if (partition == null) { + try { + partition = actorSystemContext.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, id.tenantId(), id.entityId()); + } catch (TenantNotFoundException e) { + log.debug("Skipping CF state msg for non-existing tenant {}", id.tenantId()); + return; + } + } var state = fromProto(id, stateMsg); - processRestoredState(id, state); + processRestoredState(id, state, partition); } - protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state) { - actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state)); + protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TopicPartitionInfo partition) { + partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME); + actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state, partition)); } @Override public void restore(QueueKey queueKey, Set partitions) { - stateService.update(queueKey, partitions, null); + stateService.update(queueKey, partitions, new QueueStateService.RestoreCallback() { + @Override + public void onAllPartitionsRestored() { + } + + @Override + public void onPartitionRestored(TopicPartitionInfo partition) { + partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME); + actorSystemContext.tellWithHighPriority(new CalculatedFieldStatePartitionRestoreMsg(partition)); + } + }); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index d027a2f9fe..153b83f8d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -16,7 +16,10 @@ package org.thingsboard.server.service.cf.ctx.state; import lombok.Getter; +import lombok.Setter; +import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.utils.CalculatedFieldUtils; @@ -29,21 +32,32 @@ import java.util.Map; public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected final EntityId entityId; + protected CalculatedFieldCtx ctx; + protected TbActorRef actorCtx; protected List requiredArguments; protected Map arguments = new HashMap<>(); protected boolean sizeExceedsLimit; protected long latestTimestamp = -1; + @Setter + private TopicPartitionInfo partition; + public BaseCalculatedFieldState(EntityId entityId) { this.entityId = entityId; } @Override - public void init(CalculatedFieldCtx ctx) { + public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) { + this.ctx = ctx; + this.actorCtx = actorCtx; this.requiredArguments = ctx.getArgNames(); } + @Override + public void init() { + } + @Override public Map update(Map argumentValues, CalculatedFieldCtx ctx) { Map updatedArguments = null; @@ -82,7 +96,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { } @Override - public void reset(CalculatedFieldCtx ctx) { // must reset everything dependent on arguments + public void reset() { // must reset everything dependent on arguments requiredArguments = null; arguments.clear(); sizeExceedsLimit = false; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 12e4dc0a3a..db755d3e17 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf.ctx.state; import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import net.objecthunter.exp4j.Expression; import org.mvel2.MVEL; import org.thingsboard.common.util.ExpressionUtils; @@ -25,6 +26,8 @@ import org.thingsboard.script.api.tbel.TbelCfCtx; import org.thingsboard.script.api.tbel.TbelCfSingleValueArg; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldReevaluateMsg; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.alarm.rule.AlarmRule; import org.thingsboard.server.common.data.alarm.rule.condition.expression.TbelAlarmConditionExpression; @@ -61,9 +64,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledFuture; import java.util.stream.Stream; @Data +@Slf4j public class CalculatedFieldCtx { private CalculatedField calculatedField; @@ -80,8 +85,8 @@ public class CalculatedFieldCtx { private Output output; private String expression; private boolean useLatestTs; - private boolean requiresScheduledReevaluation; + private ActorSystemContext systemContext; private TbelInvokeService tbelInvokeService; private RelationService relationService; private AlarmSubscriptionService alarmService; @@ -158,7 +163,7 @@ public class CalculatedFieldCtx { if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) { this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L; } - this.requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation(); + this.systemContext = systemContext; this.tbelInvokeService = systemContext.getTbelInvokeService(); this.relationService = systemContext.getRelationService(); this.alarmService = systemContext.getAlarmService(); @@ -236,6 +241,12 @@ public class CalculatedFieldCtx { return tbelExpressions.get(expression).executeScriptAsync(args.toArray()); } + public ScheduledFuture scheduleReevaluation(long delayMs, TbActorRef actorCtx) { + log.debug("[{}] Scheduling CF reevaluation in {} ms", cfId, delayMs); + // TODO: use single lazy-loaded instance of CalculatedFieldReevaluateMsg + return systemContext.scheduleMsgWithDelay(actorCtx, new CalculatedFieldReevaluateMsg(tenantId, this), delayMs); + } + private TbelCfArg toTbelArgument(String key, CalculatedFieldState state) { return state.getArguments().get(key).toTbelCfArg(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index ad8005af83..ff94206220 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -20,7 +20,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; @@ -47,11 +49,13 @@ public interface CalculatedFieldState { long getLatestTimestamp(); - void init(CalculatedFieldCtx ctx); + void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx); + + void init(); Map update(Map arguments, CalculatedFieldCtx ctx); - void reset(CalculatedFieldCtx ctx); + void reset(); ListenableFuture performCalculation(Map updatedArgs, CalculatedFieldCtx ctx); @@ -65,6 +69,10 @@ public interface CalculatedFieldState { return !isSizeExceedsLimit(); } + TopicPartitionInfo getPartition(); + + void setPartition(TopicPartitionInfo partition); + void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize); default void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 2b52892744..e8174bfd57 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString; @@ -77,9 +78,9 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta for (TbProtoQueueMsg msg : msgs) { try { if (msg.getValue() != null) { - processRestoredState(msg.getValue()); + processRestoredState(msg.getValue(), consumerKey.partition()); } else { - processRestoredState(getStateId(msg.getHeaders()), null); + processRestoredState(getStateId(msg.getHeaders()), null, consumerKey.partition()); } } catch (Throwable t) { log.error("Failed to process state message: {}", msg, t); @@ -104,6 +105,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); } + @Override + public void restore(QueueKey queueKey, Set partitions) { + stateService.update(queueKey, partitions, null); + } + @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 9dc6139ca5..05bfb8b717 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import com.google.protobuf.InvalidProtocolBufferException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -64,8 +63,8 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS if (stateService.getPartitions().isEmpty()) { cfRocksDb.forEach((key, value) -> { try { - processRestoredState(CalculatedFieldStateProto.parseFrom(value)); - } catch (InvalidProtocolBufferException e) { + processRestoredState(CalculatedFieldStateProto.parseFrom(value), null); + } catch (Exception e) { log.error("[{}] Failed to process restored state", key, e); } }); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java index c071d8bf75..838cb2779d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java @@ -21,11 +21,13 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.KvUtil; import org.thingsboard.rule.engine.action.TbAlarmResult; +import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; @@ -61,10 +63,15 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import java.util.Comparator; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import static org.thingsboard.server.common.data.StringUtils.equalsAny; import static org.thingsboard.server.common.data.StringUtils.splitByCommaWithoutQuotes; +import static org.thingsboard.server.service.cf.ctx.state.alarm.AlarmEvalResult.Status.FALSE; +import static org.thingsboard.server.service.cf.ctx.state.alarm.AlarmEvalResult.Status.NOT_YET_TRUE; +import static org.thingsboard.server.service.cf.ctx.state.alarm.AlarmEvalResult.Status.TRUE; @EqualsAndHashCode(callSuper = true) @Slf4j @@ -76,6 +83,7 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { @Getter private final Map createRuleStates = new TreeMap<>(Comparator.comparing(Enum::ordinal)); @Getter + @Setter private AlarmRuleState clearRuleState; @Getter @@ -87,36 +95,71 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { } @Override - public void init(CalculatedFieldCtx ctx) { - super.init(ctx); - + public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) { + super.setCtx(ctx, actorCtx); this.alarmType = ctx.getCalculatedField().getName(); this.configuration = getConfiguration(ctx); + } + @Override + public void init() { // todo: properly close state! + super.init(); + AtomicBoolean reevalNeeded = new AtomicBoolean(false); Map createRules = configuration.getCreateRules(); - createRules.forEach((severity, rule) -> { - AlarmRuleState ruleState = createRuleStates.get(severity); - if (ruleState == null) { - ruleState = new AlarmRuleState(severity, rule, this); - createRuleStates.put(severity, ruleState); - } else { // can be null if was restored - ruleState.setAlarmRule(rule); - // todo: is it enough to just set new alarm rule to alarm rule state? is it ok to leave the state as were?? + for (AlarmSeverity severity : AlarmSeverity.values()) { + AlarmRule rule = createRules.get(severity); + if (rule != null) { + createRuleStates.compute(severity, (__, ruleState) -> { + return initRuleState(severity, rule, ruleState, reevalNeeded); + }); + } else { + AlarmRuleState state = createRuleStates.remove(severity); + if (state != null) { + clearState(state); + } } - }); - createRuleStates.keySet().removeIf(severity -> !createRules.containsKey(severity)); + } AlarmRule clearRule = configuration.getClearRule(); if (clearRule != null) { - if (clearRuleState == null) { - clearRuleState = new AlarmRuleState(null, clearRule, this); - } else { - clearRuleState.setAlarmRule(clearRule); + clearRuleState = initRuleState(null, clearRule, clearRuleState, reevalNeeded); + } else { + if (clearRuleState != null) { + clearState(clearRuleState); + clearRuleState = null; } + } + log.debug("Initialized create rule states {} and clear rule state {} for {}", createRuleStates, clearRuleState, configuration); + + if (reevalNeeded.get()) { + initCurrentAlarm(ctx); + createOrClearAlarms(state -> { + if (state.getCondition().getType() == AlarmConditionType.DURATION) { + AlarmEvalResult evalResult = state.reeval(System.currentTimeMillis()); + if (evalResult.getStatus() == TRUE || evalResult.getStatus() == NOT_YET_TRUE) { + ScheduledFuture future = ctx.scheduleReevaluation(evalResult.getLeftDuration(), actorCtx); + // TODO: use single task for multiple durations if durations are close enough. but be careful when cancelling the task in one of the states + if (future != null) { + state.setDurationCheckFuture(future); + } + } + } + return AlarmEvalResult.NOT_YET_TRUE; + }, ctx); + } + } + + private AlarmRuleState initRuleState(AlarmSeverity severity, AlarmRule rule, AlarmRuleState ruleState, AtomicBoolean reevalNeeded) { + if (ruleState == null) { + ruleState = new AlarmRuleState(severity, rule, this); } else { - clearRuleState = null; + // when restored + ruleState.setAlarmRule(rule); + if (rule.getCondition().getType() == AlarmConditionType.DURATION && !ruleState.isEmpty()) { + reevalNeeded.set(true); + } } - log.debug("Initialized create rule states {} and clear rule state {} for {}", createRuleStates, clearRuleState, ctx.getCalculatedField()); + return ruleState; } @Override @@ -125,8 +168,12 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { } @Override - public void reset(CalculatedFieldCtx ctx) { - super.reset(ctx); + public void reset() { + super.reset(); + createRuleStates.values().forEach(AlarmRuleState::clear); + if (clearRuleState != null) { + clearRuleState.clear(); + } } @Override @@ -135,9 +182,19 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { TbAlarmResult result = createOrClearAlarms(state -> { if (updatedArgs != null) { boolean newEvent = !updatedArgs.isEmpty(); - return state.eval(newEvent, ctx); + AlarmEvalResult evalResult = state.eval(newEvent, ctx); + if (evalResult.getStatus() == NOT_YET_TRUE && evalResult.getLeftDuration() > 0) { + // rounding up to the closest second +// long leftDuration = (long) Math.ceil(evalResult.getLeftDuration() / 1000.0) * 1000; + long leftDuration = evalResult.getLeftDuration(); + ScheduledFuture future = ctx.scheduleReevaluation(leftDuration, actorCtx); // TODO: use single task for multiple durations if durations are close enough. but be careful when cancelling the task in one of the states + if (future != null) { + state.setDurationCheckFuture(future); + } + } + return evalResult; } else { - return state.eval(System.currentTimeMillis()); + return state.reeval(System.currentTimeMillis()); } }, ctx); return Futures.immediateFuture(AlarmCalculatedFieldResult.builder() @@ -177,11 +234,11 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { for (AlarmRuleState state : createRuleStates.values()) { AlarmEvalResult evalResult = evalFunction.apply(state); log.debug("Evaluated create rule {} with args {}. Result: {}", state, arguments, evalResult); - if (evalResult == AlarmEvalResult.TRUE) { + if (evalResult.getStatus() == TRUE) { resultState = state; break; - } else if (evalResult == AlarmEvalResult.FALSE) { - clearAlarmState(state); + } else if (evalResult.getStatus() == FALSE) { + clearState(state); } } @@ -189,15 +246,15 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { result = calculateAlarmResult(resultState, ctx); resultStateInfo = resultState.getStateInfo(); log.debug("Alarm result for state {}: {}", resultState, result); - clearAlarmState(clearRuleState); + clearState(clearRuleState); } else if (currentAlarm != null && clearRuleState != null) { AlarmEvalResult evalResult = evalFunction.apply(clearRuleState); log.debug("Evaluated clear rule {} with args {}. Result: {}", clearRuleState, arguments, evalResult); - if (evalResult == AlarmEvalResult.TRUE) { + if (evalResult.getStatus() == TRUE) { resultStateInfo = clearRuleState.getStateInfo(); - clearAlarmState(clearRuleState); + clearState(clearRuleState); for (AlarmRuleState state : createRuleStates.values()) { - clearAlarmState(state); + clearState(state); } AlarmApiCallResult clearResult = ctx.getAlarmService().clearAlarm( ctx.getTenantId(), currentAlarm.getId(), System.currentTimeMillis(), createDetails(clearRuleState), true @@ -207,12 +264,11 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { .isCleared(true) .alarm(clearResult.getAlarm()) .build(); - addStateInfo(result, clearRuleState); resultState = clearRuleState; } currentAlarm = null; - } else if (evalResult == AlarmEvalResult.FALSE) { - clearAlarmState(clearRuleState); + } else if (evalResult.getStatus() == FALSE) { + clearState(clearRuleState); } } if (result != null && resultState != null) { @@ -222,8 +278,9 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { return result; } - private void clearAlarmState(AlarmRuleState state) { + private void clearState(AlarmRuleState state) { if (state != null) { + log.debug("Clearing rule state {}", state); state.clear(); } } @@ -283,14 +340,6 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState { } } - private void addStateInfo(TbAlarmResult alarmResult, AlarmRuleState ruleState) { - if (ruleState.getCondition().getType() == AlarmConditionType.REPEATING) { - alarmResult.setConditionRepeats(ruleState.getEventCount()); - } else if (ruleState.getCondition().getType() == AlarmConditionType.DURATION) { - alarmResult.setConditionDuration(ruleState.getDuration()); - } - } - private JsonNode createDetails(AlarmRuleState ruleState) { JsonNode alarmDetails; String alarmDetailsStr = ruleState.getAlarmRule().getAlarmDetails(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java index 6775b14586..424a977c75 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java @@ -15,8 +15,31 @@ */ package org.thingsboard.server.service.cf.ctx.state.alarm; -public enum AlarmEvalResult { +import lombok.Data; +import lombok.RequiredArgsConstructor; - FALSE, NOT_YET_TRUE, TRUE; +@Data +@RequiredArgsConstructor +public class AlarmEvalResult { + + public static final AlarmEvalResult TRUE = new AlarmEvalResult(Status.TRUE); + public static final AlarmEvalResult FALSE = new AlarmEvalResult(Status.FALSE); + public static final AlarmEvalResult NOT_YET_TRUE = new AlarmEvalResult(Status.NOT_YET_TRUE); + + private final Status status; + private final long leftDuration; + private final long leftEvents; + + public AlarmEvalResult(Status status) { + this(status, 0, 0); + } + + public static AlarmEvalResult notYetTrue(long leftEvents, long leftDuration) { + return new AlarmEvalResult(Status.NOT_YET_TRUE, leftDuration, leftEvents); + } + + public enum Status { + FALSE, NOT_YET_TRUE, TRUE; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java index 0e5459af5e..0638543685 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java @@ -38,6 +38,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Optional; +import java.util.concurrent.ScheduledFuture; @Data @Slf4j @@ -49,9 +50,11 @@ public class AlarmRuleState { private AlarmCondition condition; - private long lastEventTs; - private long duration; private long eventCount; + private long firstEventTs; // when duration condition started + private long lastEventTs; + private transient long duration; + private ScheduledFuture durationCheckFuture; public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, AlarmCalculatedFieldState state) { this.severity = severity; @@ -70,17 +73,22 @@ public class AlarmRuleState { }; } - public AlarmEvalResult eval(long ts) { // on schedule + public AlarmEvalResult reeval(long ts) { switch (condition.getType()) { case SIMPLE, REPEATING -> { return AlarmEvalResult.NOT_YET_TRUE; } case DURATION -> { - long requiredDurationInMs = getRequiredDurationInMs(); - if (requiredDurationInMs > 0 && lastEventTs > 0 && ts > lastEventTs) { - long duration = this.duration + (ts - lastEventTs); + long requiredDuration = getRequiredDurationInMs(); + if (requiredDuration > 0 && lastEventTs > 0 && ts > lastEventTs) { + duration = ts - firstEventTs; if (isActive(ts)) { - return duration > requiredDurationInMs ? AlarmEvalResult.TRUE : AlarmEvalResult.NOT_YET_TRUE; + long leftDuration = requiredDuration - duration; + if (leftDuration <= 0) { + return AlarmEvalResult.TRUE; + } else { + return AlarmEvalResult.notYetTrue(0, leftDuration); + } } else { return AlarmEvalResult.FALSE; } @@ -101,7 +109,8 @@ public class AlarmRuleState { eventCount++; } long requiredRepeats = getIntValue(((RepeatingAlarmCondition) condition).getCount()); - return eventCount >= requiredRepeats ? AlarmEvalResult.TRUE : AlarmEvalResult.NOT_YET_TRUE; + long leftRepeats = requiredRepeats - eventCount; + return leftRepeats <= 0 ? AlarmEvalResult.TRUE : AlarmEvalResult.notYetTrue(leftRepeats, 0); } else { return AlarmEvalResult.FALSE; } @@ -109,17 +118,26 @@ public class AlarmRuleState { private AlarmEvalResult evalDuration(boolean active, CalculatedFieldCtx ctx) { if (active && eval(condition.getExpression(), ctx)) { + long eventTs = state.getLatestTimestamp(); if (lastEventTs > 0) { - if (state.getLatestTimestamp() > lastEventTs) { - duration = duration + (state.getLatestTimestamp() - lastEventTs); - lastEventTs = state.getLatestTimestamp(); + if (eventTs > lastEventTs) { + if (firstEventTs == 0) { + firstEventTs = lastEventTs; + } + lastEventTs = eventTs; } } else { - lastEventTs = state.getLatestTimestamp(); - duration = 0L; + firstEventTs = eventTs; + lastEventTs = eventTs; + } + duration = lastEventTs - firstEventTs; + long requiredDuration = getRequiredDurationInMs(); + long leftDuration = requiredDuration - duration; + if (leftDuration <= 0) { + return AlarmEvalResult.TRUE; + } else { + return AlarmEvalResult.notYetTrue(0, leftDuration); } - long requiredDurationInMs = getRequiredDurationInMs(); - return duration > requiredDurationInMs ? AlarmEvalResult.TRUE : AlarmEvalResult.NOT_YET_TRUE; } else { return AlarmEvalResult.FALSE; } @@ -190,8 +208,17 @@ public class AlarmRuleState { public void clear() { eventCount = 0L; + firstEventTs = 0L; lastEventTs = 0L; duration = 0L; + if (durationCheckFuture != null) { + durationCheckFuture.cancel(true); + durationCheckFuture = null; + } + } + + public boolean isEmpty() { + return eventCount == 0L && firstEventTs == 0L && lastEventTs == 0L && durationCheckFuture == null; } private Integer getIntValue(AlarmConditionValue value) { @@ -216,7 +243,7 @@ public class AlarmRuleState { if (condition.getType() == AlarmConditionType.REPEATING) { return new StateInfo(eventCount, null); } else if (condition.getType() == AlarmConditionType.DURATION) { - return new StateInfo(null, duration + (System.currentTimeMillis() - lastEventTs)); + return new StateInfo(null, duration); } else { return StateInfo.EMPTY; } @@ -227,9 +254,11 @@ public class AlarmRuleState { return "AlarmRuleState{" + "severity=" + severity + ", condition=" + condition + + ", eventCount=" + eventCount + + ", firstEventTs=" + firstEventTs + ", lastEventTs=" + lastEventTs + ", duration=" + duration + - ", eventCount=" + eventCount + + ", durationCheckFuture=" + durationCheckFuture + '}'; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java index 47dce596da..6f4d40ca0e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java @@ -141,8 +141,8 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState { } @Override - public void reset(CalculatedFieldCtx ctx) { - super.reset(ctx); + public void reset() { + super.reset(); lastDynamicArgumentsRefreshTs = -1; } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index d067be49a0..ff8d1cee2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -38,6 +38,7 @@ import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeleteQueueTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask.ConsumerKey; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.queue.TbMsgPackCallback; import org.thingsboard.server.service.queue.TbMsgPackProcessingContext; @@ -127,7 +128,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager> msgs, TbQueueConsumer> consumer, - Object consumerKey, + ConsumerKey consumerKey, Queue queue) throws Exception { TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue); TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue); diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 38aeb45a20..4319f72448 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -115,12 +115,21 @@ public class CalculatedFieldUtils { private static AlarmRuleStateProto toAlarmRuleStateProto(AlarmRuleState ruleState) { return AlarmRuleStateProto.newBuilder() .setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse("")) - .setLastEventTs(ruleState.getLastEventTs()) - .setDuration(ruleState.getDuration()) .setEventCount(ruleState.getEventCount()) + .setFirstEventTs(ruleState.getFirstEventTs()) + .setLastEventTs(ruleState.getLastEventTs()) .build(); } + private static AlarmRuleState fromAlarmRuleStateProto(AlarmRuleStateProto proto, AlarmCalculatedFieldState state) { + AlarmSeverity severity = StringUtils.isNotEmpty(proto.getSeverity()) ? AlarmSeverity.valueOf(proto.getSeverity()) : null; + AlarmRuleState ruleState = new AlarmRuleState(severity, null, state); + ruleState.setEventCount(proto.getEventCount()); + ruleState.setFirstEventTs(proto.getFirstEventTs()); + ruleState.setLastEventTs(proto.getLastEventTs()); + return ruleState; + } + public static SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) { SingleValueArgumentProto.Builder builder = SingleValueArgumentProto.newBuilder() .setArgName(argName); @@ -196,12 +205,11 @@ public class CalculatedFieldUtils { AlarmCalculatedFieldState alarmState = (AlarmCalculatedFieldState) state; AlarmStateProto alarmStateProto = proto.getAlarmState(); for (AlarmRuleStateProto ruleStateProto : alarmStateProto.getCreateRuleStatesList()) { - AlarmSeverity severity = StringUtils.isNotEmpty(ruleStateProto.getSeverity()) ? AlarmSeverity.valueOf(ruleStateProto.getSeverity()) : null; - AlarmRuleState ruleState = new AlarmRuleState(severity, null, alarmState); - ruleState.setLastEventTs(ruleStateProto.getLastEventTs()); - ruleState.setDuration(ruleStateProto.getDuration()); - ruleState.setEventCount(ruleStateProto.getEventCount()); - alarmState.getCreateRuleStates().put(severity, ruleState); + AlarmRuleState ruleState = fromAlarmRuleStateProto(ruleStateProto, alarmState); + alarmState.getCreateRuleStates().put(ruleState.getSeverity(), ruleState); + } + if (alarmStateProto.hasClearRuleState()) { + alarmState.setClearRuleState(fromAlarmRuleStateProto(alarmStateProto.getClearRuleState(), alarmState)); } } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 561275c2a6..192cb242d0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -526,9 +526,6 @@ actors: configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}" # Time in seconds to receive calculation result. calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}" - alarms: - # Interval in seconds to re-evaluate Alarm rules with duration condition - reevaluation_interval: "${ACTORS_ALARMS_REEVALUATION_INTERVAL_SEC:60}" debug: settings: diff --git a/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java b/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java index 1a28a5aef8..b0b28f06dc 100644 --- a/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java @@ -21,7 +21,6 @@ import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.bean.override.mockito.MockitoSpyBean; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.action.TbAlarmResult; @@ -72,9 +71,6 @@ import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @Slf4j @DaoSqlTest -@TestPropertySource(properties = { - "actors.alarms.reevaluation_interval=1" -}) public class AlarmRulesTest extends AbstractControllerTest { @MockitoSpyBean @@ -235,10 +231,9 @@ public class AlarmRulesTest extends AbstractControllerTest { Map createRules = Map.of( AlarmSeverity.CRITICAL, new Condition("return powerConsumption >= 3000;", null, createDurationMs) ); - long clearDurationMs = 2000L; Condition clearRule = new Condition("return powerConsumption < 3000;", null, createDurationMs); - CalculatedField calculatedField = createAlarmCf(deviceId, "High power consumption during 3 seconds", + CalculatedField calculatedField = createAlarmCf(deviceId, "High power consumption during 5 seconds", arguments, createRules, clearRule); postTelemetry(deviceId, "{\"powerConsumption\":3500}"); Thread.sleep(createDurationMs - 2000); diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java index e0cc2ce764..5ca68d4e1b 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java @@ -104,7 +104,8 @@ public class GeofencingCalculatedFieldStateTest { ctx = new CalculatedFieldCtx(getCalculatedField(), systemContext); ctx.init(); state = new GeofencingCalculatedFieldState(ctx.getEntityId()); - state.init(ctx); + state.setCtx(ctx, null); + state.init(); } @Test diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java index 56fc2c1086..e46f3e1c15 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java @@ -87,7 +87,8 @@ public class ScriptCalculatedFieldStateTest { ctx = new CalculatedFieldCtx(getCalculatedField(), systemContext); ctx.init(); state = new ScriptCalculatedFieldState(ctx.getEntityId()); - state.init(ctx); + state.setCtx(ctx, null); + state.init(); } @Test diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java index 7a6109b5bf..df8bf1fbba 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java @@ -79,7 +79,8 @@ public class SimpleCalculatedFieldStateTest { ctx = new CalculatedFieldCtx(getCalculatedField(), systemContext); ctx.init(); state = new SimpleCalculatedFieldState(ctx.getEntityId()); - state.init(ctx); + state.setCtx(ctx, null); + state.init(); } @Test diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index bcbe52b5c9..ace27c08c1 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -595,7 +595,7 @@ public class TbRuleEngineQueueConsumerManagerTest { await().atMost(5, TimeUnit.SECONDS).until(() -> { for (TopicPartitionInfo partition : expectedPartitions) { if (consumers.stream().noneMatch(consumer -> consumer.subscribed && - consumer.pollingStarted && Set.of(partition).equals(consumer.getPartitions()))) { + consumer.pollingStarted && Set.of(partition).equals(consumer.getPartitions()))) { return false; } } @@ -605,7 +605,7 @@ public class TbRuleEngineQueueConsumerManagerTest { await().atMost(5, TimeUnit.SECONDS).until(() -> { return consumers.size() == 1 && consumers.stream() .anyMatch(consumer -> consumer.subscribed && consumer.pollingStarted && - expectedPartitions.equals(consumer.getPartitions())); + expectedPartitions.equals(consumer.getPartitions())); }); } Mockito.reset(ruleEngineConsumerContext.getSubmitStrategyFactory()); @@ -667,8 +667,8 @@ public class TbRuleEngineQueueConsumerManagerTest { return await().atMost(5, TimeUnit.SECONDS) .until(() -> consumers.stream() .filter(consumer -> consumer.getPartitions() != null && - consumer.getPartitions().size() == 1 && - consumer.getPartitions().contains(tpi)) + consumer.getPartitions().size() == 1 && + consumer.getPartitions().contains(tpi)) .findFirst().orElse(null), Objects::nonNull); } @@ -676,9 +676,9 @@ public class TbRuleEngineQueueConsumerManagerTest { return await().atMost(5, TimeUnit.SECONDS) .until(() -> consumers.stream() .filter(consumer -> consumer.getPartitions() != null && - consumer.getPartitions().size() == 1 && - consumer.getPartitions().stream() - .anyMatch(tpi -> tpi.getPartition().get().equals(partition))) + consumer.getPartitions().size() == 1 && + consumer.getPartitions().stream() + .anyMatch(tpi -> tpi.getPartition().get().equals(partition))) .findFirst().orElse(null), Objects::nonNull); } @@ -778,10 +778,6 @@ public class TbRuleEngineQueueConsumerManagerTest { return false; } - public Set getPartitions() { - return partitions; - } - public void setUpTestMsg() { testMsg = TbMsg.newMsg() .type(TbMsgType.POST_TELEMETRY_REQUEST) @@ -790,6 +786,7 @@ public class TbRuleEngineQueueConsumerManagerTest { .data("{}") .build(); } + } } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java index 1106fad5b6..9bd9bb2e7a 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java @@ -43,6 +43,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask.ConsumerKey; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; @@ -191,6 +192,7 @@ public class TbRuleEngineStrategyTest { queue.setProcessingStrategy(processingStrategy); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); + ConsumerKey consumerKey = new ConsumerKey(queueKey, null); var consumerManager = TbRuleEngineQueueConsumerManager.create() .ctx(ruleEngineConsumerContext) .queueKey(queueKey) @@ -238,7 +240,7 @@ public class TbRuleEngineStrategyTest { .map(this::toProto) .toList(); - consumerManager.processMsgs(protoMsgs, consumer, queueKey, queue); + consumerManager.processMsgs(protoMsgs, consumer, consumerKey, queue); processingData.forEach(data -> { verify(actorContext, times(data.attempts)).tell(argThat(msg -> diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java index f9483965cc..3e1462b445 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java @@ -38,6 +38,8 @@ public interface TbQueueConsumer { boolean isStopped(); + Set getPartitions(); + List getFullTopicNames(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AlarmCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AlarmCalculatedFieldConfiguration.java index c2925d5ed6..0b0f34ad50 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AlarmCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AlarmCalculatedFieldConfiguration.java @@ -20,7 +20,6 @@ import jakarta.validation.constraints.NotEmpty; import lombok.Data; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.alarm.rule.AlarmRule; -import org.thingsboard.server.common.data.alarm.rule.condition.AlarmConditionType; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import java.util.List; @@ -59,10 +58,4 @@ public class AlarmCalculatedFieldConfiguration implements ArgumentsBasedCalculat } - @Override - public boolean requiresScheduledReevaluation() { - return createRules.values().stream().anyMatch(rule -> rule.getCondition().getType() == AlarmConditionType.DURATION) || - (clearRule != null && clearRule.getCondition().getType() == AlarmConditionType.DURATION); - } - } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java index d3622a2dcf..7b608192db 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java @@ -72,8 +72,4 @@ public interface CalculatedFieldConfiguration { .collect(Collectors.toList()); } - default boolean requiresScheduledReevaluation() { - return false; - } - } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/CalculatedFieldStatePartitionRestoreMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/CalculatedFieldStatePartitionRestoreMsg.java new file mode 100644 index 0000000000..b16e2adb85 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/CalculatedFieldStatePartitionRestoreMsg.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; + +@Data +public class CalculatedFieldStatePartitionRestoreMsg implements ToCalculatedFieldSystemMsg { + + private final TopicPartitionInfo partition; + + @Override + public TenantId getTenantId() { + return TenantId.SYS_TENANT_ID; + } + + @Override + public MsgType getMsgType() { + return MsgType.CF_STATE_PARTITION_RESTORE_MSG; + } + +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index e655011baa..c13b0200c7 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -138,6 +138,7 @@ public enum MsgType { CF_CACHE_INIT_MSG, // Sent to init caches for CF actor; CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state; + CF_STATE_PARTITION_RESTORE_MSG, CF_PARTITIONS_CHANGE_MSG, // Sent when cluster event occures; CF_ENTITY_LIFECYCLE_MSG, // Sent on CF/Device/Asset create/update/delete; diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index fac1116a30..4d99608a6d 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1914,7 +1914,7 @@ message AlarmStateProto { message AlarmRuleStateProto { string severity = 1; - int64 lastEventTs = 2; - int64 duration = 3; - int64 eventCount = 4; + int64 eventCount = 2; + int64 firstEventTs = 3; + int64 lastEventTs = 4; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 7e7de64a5c..04fe2443ef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -194,6 +194,11 @@ public abstract class AbstractTbQueueConsumerTemplate i abstract protected void doUnsubscribe(); + @Override + public Set getPartitions() { + return partitions; + } + @Override public List getFullTopicNames() { if (partitions == null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index db5bac7170..b300e8c1b2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -25,6 +25,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask.ConsumerKey; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.Collection; @@ -218,7 +219,7 @@ public class MainQueueConsumerManager consumer) { + private void consumerLoop(ConsumerKey consumerKey, TbQueueConsumer consumer) { try { while (!stopped && !consumer.isStopped()) { try { @@ -250,7 +251,7 @@ public class MainQueueConsumerManager msgs, TbQueueConsumer consumer, Object consumerKey, C config) throws Exception { + protected void processMsgs(List msgs, TbQueueConsumer consumer, ConsumerKey consumerKey, C config) throws Exception { log.trace("Processing {} messages", msgs.size()); msgPackProcessor.process(msgs, consumer, consumerKey, config); log.trace("Processed {} messages", msgs.size()); @@ -273,7 +274,7 @@ public class MainQueueConsumerManager { - void process(List msgs, TbQueueConsumer consumer, Object consumerKey, C config) throws Exception; + void process(List msgs, TbQueueConsumer consumer, ConsumerKey consumerKey, C config) throws Exception; } public interface ConsumerWrapper { @@ -285,6 +286,7 @@ public class MainQueueConsumerManager { + private final Map> consumers = new HashMap<>(); @Override @@ -307,8 +309,7 @@ public class MainQueueConsumerManager partitions, Consumer onStop, Function startOffsetProvider) { partitions.forEach(tpi -> { - Integer partitionId = tpi.getPartition().orElse(-1); - String key = queueKey + "-" + partitionId; + ConsumerKey key = new ConsumerKey(queueKey, tpi); Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null; TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> { @@ -328,9 +329,11 @@ public class MainQueueConsumerManager> getConsumers() { return consumers.values(); } + } class SingleConsumerWrapper implements ConsumerWrapper { + private TbQueueConsumerTask consumer; @Override @@ -346,7 +349,7 @@ public class MainQueueConsumerManager(queueKey, () -> consumerCreator.apply(config, null), null); // no partitionId passed + consumer = new TbQueueConsumerTask<>(new ConsumerKey(queueKey, null), () -> consumerCreator.apply(config, null), null); // no partitionId passed } consumer.subscribe(partitions); if (!consumer.isRunning()) { @@ -361,5 +364,7 @@ public class MainQueueConsumerManager { @Getter - private final Object key; + private final ConsumerKey key; private volatile TbQueueConsumer consumer; private volatile Supplier> consumerSupplier; @Getter @@ -41,7 +41,7 @@ public class TbQueueConsumerTask { @Setter private Future task; - public TbQueueConsumerTask(Object key, Supplier> consumerSupplier, Runnable callback) { + public TbQueueConsumerTask(ConsumerKey key, Supplier> consumerSupplier, Runnable callback) { this.key = key; this.consumer = null; this.consumerSupplier = consumerSupplier; @@ -97,4 +97,18 @@ public class TbQueueConsumerTask { return task != null; } + public record ConsumerKey(Object queueKey, TopicPartitionInfo partition) { + + @Override + public String toString() { + if (partition != null) { + Integer partitionId = partition.getPartition().orElse(-1); + return queueKey + "-" + partitionId; + } else { + return queueKey.toString(); + } + } + + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java index be379fb76d..6bcc87af38 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java @@ -15,10 +15,15 @@ */ package org.thingsboard.server.queue.common.state; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.QueueKey; import java.util.Collections; +import java.util.Set; + +import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; public class DefaultQueueStateService extends QueueStateService { @@ -26,4 +31,18 @@ public class DefaultQueueStateService partitions, RestoreCallback callback) { + if (callback != null) { + for (TopicPartitionInfo partition : partitions) { + callback.onPartitionRestored(partition); + } + callback.onAllPartitionsRestored(); + } + eventConsumer.addPartitions(partitions); + for (PartitionedQueueConsumerManager consumer : otherConsumers) { + consumer.addPartitions(withTopic(partitions, consumer.getTopic())); + } + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java index 2a38c9a86c..60cfe4c98f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java @@ -50,7 +50,7 @@ public class KafkaQueueStateService } @Override - protected void addPartitions(QueueKey queueKey, Set partitions, Runnable whenAllProcessed) { + protected void addPartitions(QueueKey queueKey, Set partitions, RestoreCallback callback) { Map eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states Set statePartitions = withTopic(partitions, stateConsumer.getTopic()); @@ -61,10 +61,13 @@ public class KafkaQueueStateService try { partitionsInProgress.remove(statePartition); log.info("Finished partition {} (still in progress: {})", statePartition, partitionsInProgress); + if (callback != null) { + callback.onPartitionRestored(statePartition); + } if (partitionsInProgress.isEmpty()) { log.info("All partitions processed"); - if (whenAllProcessed != null) { - whenAllProcessed.run(); + if (callback != null) { + callback.onAllPartitionsRestored(); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java index e58d5eb036..e98e8dd7e4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java @@ -49,7 +49,7 @@ public abstract class QueueStateService newPartitions, Runnable whenAllProcessed) { + public void update(QueueKey queueKey, Set newPartitions, RestoreCallback callback) { newPartitions = withTopic(newPartitions, eventConsumer.getTopic()); var writeLock = partitionsLock.writeLock(); writeLock.lock(); @@ -71,23 +71,15 @@ public abstract class QueueStateService partitions, Runnable whenAllProcessed) { - if (whenAllProcessed != null) { - whenAllProcessed.run(); - } - eventConsumer.addPartitions(partitions); - for (PartitionedQueueConsumerManager consumer : otherConsumers) { - consumer.addPartitions(withTopic(partitions, consumer.getTopic())); - } - } + protected abstract void addPartitions(QueueKey queueKey, Set partitions, RestoreCallback callback) ; protected void removePartitions(QueueKey queueKey, Set partitions) { eventConsumer.removePartitions(partitions); @@ -122,4 +114,12 @@ public abstract class QueueStateService implements TbQueueCon return stopped; } + @Override + public Set getPartitions() { + return partitions; + } + @Override public List getFullTopicNames() { return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());