Browse Source

Alarm rules CF: real-time duration condition checks

pull/14193/head
VIacheslavKlimov 8 months ago
parent
commit
0b305e6f2a
  1. 9
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 1
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  3. 4
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java
  4. 43
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  5. 4
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java
  6. 30
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  7. 2
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldReevaluateMsg.java
  8. 4
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java
  9. 2
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  10. 35
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java
  11. 18
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  12. 15
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  13. 12
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java
  14. 10
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java
  15. 5
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java
  16. 133
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java
  17. 27
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java
  18. 63
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java
  19. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java
  20. 3
      application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java
  21. 24
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java
  22. 3
      application/src/main/resources/thingsboard.yml
  23. 7
      application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java
  24. 3
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java
  25. 3
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java
  26. 3
      application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java
  27. 19
      application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java
  28. 4
      application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java
  29. 2
      common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java
  30. 7
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AlarmCalculatedFieldConfiguration.java
  31. 4
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java
  32. 37
      common/message/src/main/java/org/thingsboard/server/common/msg/CalculatedFieldStatePartitionRestoreMsg.java
  33. 1
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  34. 6
      common/proto/src/main/proto/queue.proto
  35. 5
      common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java
  36. 17
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java
  37. 18
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java
  38. 19
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java
  39. 9
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java
  40. 26
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java
  41. 5
      common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java

9
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -664,10 +664,6 @@ public class ActorSystemContext {
@Getter
private long cfCalculationResultTimeout;
@Value("${actors.alarms.reevaluation_interval:60}")
@Getter
private long alarmsReevaluationInterval;
@Autowired
@Getter
private MqttClientSettings mqttClientSettings;
@ -895,12 +891,13 @@ public class ActorSystemContext {
return getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS);
}
public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) {
public ScheduledFuture<?> scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) {
log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
if (delayInMs > 0) {
getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
return getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
} else {
ctx.tell(msg);
return null;
}
}

1
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -88,6 +88,7 @@ public class AppActor extends ContextAwareActor {
break;
case PARTITION_CHANGE_MSG:
case CF_PARTITIONS_CHANGE_MSG:
case CF_STATE_PARTITION_RESTORE_MSG:
ctx.broadcastToChildren(msg, true);
break;
case COMPONENT_LIFE_CYCLE_MSG:

4
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java

@ -21,6 +21,7 @@ import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
@ -63,6 +64,9 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor {
case CF_STATE_RESTORE_MSG:
processor.process((CalculatedFieldStateRestoreMsg) msg);
break;
case CF_STATE_PARTITION_RESTORE_MSG:
processor.process((CalculatedFieldStatePartitionRestoreMsg) msg);
break;
case CF_ENTITY_INIT_CF_MSG:
processor.process((EntityInitCalculatedFieldMsg) msg);
break;

43
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
@ -83,7 +84,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
final CalculatedFieldProcessingService cfService;
final CalculatedFieldStateService cfStateService;
TbActorCtx ctx;
TbActorCtx actorCtx;
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>();
CalculatedFieldEntityMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) {
@ -95,7 +96,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
void init(TbActorCtx ctx) {
this.ctx = ctx;
this.actorCtx = ctx;
}
public void stop(boolean partitionChanged) {
@ -104,7 +105,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
"[{}][{}] Stopping entity actor.",
tenantId, entityId);
states.clear();
ctx.stop(ctx.getSelf());
actorCtx.stop(actorCtx.getSelf());
}
public void process(CalculatedFieldPartitionChangeMsg msg) {
@ -116,13 +117,25 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
public void process(CalculatedFieldStateRestoreMsg msg) {
CalculatedFieldId cfId = msg.getId().cfId();
log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
if (msg.getState() != null) {
states.put(cfId, msg.getState());
CalculatedFieldState state = msg.getState();
if (state != null) {
state.setCtx(msg.getCtx(), actorCtx);
state.setPartition(msg.getPartition());
states.put(cfId, state);
} else {
states.remove(cfId);
}
}
public void process(CalculatedFieldStatePartitionRestoreMsg msg) {
log.debug("Processing CF state partition restore msg: {}", msg);
for (CalculatedFieldState state : states.values()) {
if (msg.getPartition().equals(state.getPartition())) {
state.init();
}
}
}
public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException {
log.debug("[{}] Processing entity init CF msg: {}", msg.getCtx().getCfId(), msg);
var ctx = msg.getCtx();
@ -138,10 +151,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
state = createState(ctx);
} else if (msg.getStateAction() == StateAction.REINIT) {
log.debug("Force reinitialization of CF: [{}].", ctx.getCfId());
state.reset(ctx);
state.reset();
initState(state, ctx);
} else {
state.init(ctx);
state.setCtx(ctx, actorCtx);
state.init();
}
if (state.isSizeOk()) {
processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback());
@ -183,7 +197,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} else {
MultipleTbCallback multipleTbCallback = new MultipleTbCallback(states.size(), msg.getCallback());
states.forEach((cfId, state) -> cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), multipleTbCallback));
ctx.stop(ctx.getSelf());
actorCtx.stop(actorCtx.getSelf());
}
} else {
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
@ -266,30 +280,30 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
public void process(CalculatedFieldReevaluateMsg msg) throws CalculatedFieldException {
CalculatedFieldId cfId = msg.getCfCtx().getCfId();
CalculatedFieldId cfId = msg.getCtx().getCfId();
CalculatedFieldState state = states.get(cfId);
if (state == null) {
log.debug("[{}][{}] Failed to find CF state for entity to handle {}", entityId, cfId, msg);
} else {
if (state.isSizeOk()) {
log.debug("[{}][{}] Reevaluating CF state", entityId, cfId);
processStateIfReady(state, null, msg.getCfCtx(), Collections.singletonList(cfId), null, null, msg.getCallback());
processStateIfReady(state, null, msg.getCtx(), Collections.singletonList(cfId), null, null, msg.getCallback());
} else {
throw new RuntimeException(msg.getCfCtx().getSizeExceedsLimitMessage());
throw new RuntimeException(msg.getCtx().getSizeExceedsLimitMessage());
}
}
}
public void process(CalculatedFieldAlarmActionMsg msg) {
log.debug("[{}] Processing alarm action event msg: {}", entityId, msg);
states.values().forEach(state -> {
for (CalculatedFieldState state : states.values()) {
if (state instanceof AlarmCalculatedFieldState alarmCfState) {
Alarm stateAlarm = alarmCfState.getCurrentAlarm();
if (stateAlarm != null && stateAlarm.getId().equals(msg.getAlarm().getId())) {
alarmCfState.processAlarmAction(msg.getAlarm(), msg.getAction());
}
}
});
}
msg.getCallback().onSuccess();
}
@ -352,7 +366,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
private void initState(CalculatedFieldState state, CalculatedFieldCtx ctx) {
state.init(ctx);
state.setCtx(ctx, actorCtx);
state.init();
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.hasRelationQueryDynamicArguments()) {
GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state;
geofencingState.setLastDynamicArgumentsRefreshTs(System.currentTimeMillis());

4
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java

@ -20,6 +20,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg;
@ -70,6 +71,9 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor {
case CF_STATE_RESTORE_MSG:
processor.onStateRestoreMsg((CalculatedFieldStateRestoreMsg) msg);
break;
case CF_STATE_PARTITION_RESTORE_MSG:
processor.onStatePartitionRestoreMsg((CalculatedFieldStatePartitionRestoreMsg) msg);
break;
case CF_ENTITY_LIFECYCLE_MSG:
processor.onEntityLifecycleMsg((CalculatedFieldEntityLifecycleMsg) msg);
break;

30
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
@ -63,8 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
@ -79,7 +78,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new HashMap<>();
private final Map<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new HashMap<>();
private final Map<EntityId, Set<EntityId>> ownerEntities = new HashMap<>();
private ScheduledFuture<?> cfsReevaluationTask;
private final CalculatedFieldProcessingService cfExecService;
private final CalculatedFieldStateService cfStateService;
@ -120,10 +118,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
calculatedFields.clear();
entityIdCalculatedFields.clear();
entityIdCalculatedFieldLinks.clear();
if (cfsReevaluationTask != null) {
cfsReevaluationTask.cancel(true);
cfsReevaluationTask = null;
}
ctx.stop(ctx.getSelf());
}
@ -131,7 +125,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.debug("[{}] Processing CF actor init message.", msg.getTenantId().getId());
initEntitiesCache();
initCalculatedFields();
scheduleCfsReevaluation();
msg.getCallback().onSuccess();
}
@ -140,9 +133,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
var ctx = calculatedFields.get(cfId);
if (ctx != null) {
if (msg.getState() != null) {
msg.getState().init(ctx);
}
msg.setCtx(ctx);
log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId());
getOrCreateActor(msg.getId().entityId()).tell(msg);
} else {
@ -150,21 +141,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private void scheduleCfsReevaluation() {
cfsReevaluationTask = systemContext.getScheduler().scheduleWithFixedDelay(() -> {
try {
calculatedFields.values().forEach(cf -> {
if (cf.isRequiresScheduledReevaluation()) {
applyToTargetCfEntityActors(cf, TbCallback.EMPTY, (entityId, callback) -> {
log.debug("[{}][{}] Pushing scheduled CF reevaluate msg", entityId, cf.getCfId());
getOrCreateActor(entityId).tell(new CalculatedFieldReevaluateMsg(tenantId, cf));
});
}
});
} catch (Exception e) {
log.warn("[{}] Failed to trigger CFs reevaluation", tenantId, e);
}
}, systemContext.getAlarmsReevaluationInterval(), systemContext.getAlarmsReevaluationInterval(), TimeUnit.SECONDS);
public void onStatePartitionRestoreMsg(CalculatedFieldStatePartitionRestoreMsg msg) {
ctx.broadcastToChildren(msg, true);
}
public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException {

2
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldReevaluateMsg.java

@ -25,7 +25,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
public class CalculatedFieldReevaluateMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedFieldCtx cfCtx;
private final CalculatedFieldCtx ctx;
@Override
public MsgType getMsgType() {

4
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java

@ -19,7 +19,9 @@ import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
@Data
@ -27,6 +29,8 @@ public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMs
private final CalculatedFieldEntityCtxId id;
private final CalculatedFieldState state;
private final TopicPartitionInfo partition;
private CalculatedFieldCtx ctx;
@Override
public MsgType getMsgType() {

2
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -186,6 +186,7 @@ public class TenantActor extends RuleChainManagerActor {
case CF_CACHE_INIT_MSG:
case CF_STATE_RESTORE_MSG:
case CF_PARTITIONS_CHANGE_MSG:
case CF_STATE_PARTITION_RESTORE_MSG:
forwardToCfActor((ToCalculatedFieldSystemMsg) msg, true);
break;
case CF_TELEMETRY_MSG:
@ -394,6 +395,7 @@ public class TenantActor extends RuleChainManagerActor {
public TbActor createActor() {
return new TenantActor(context, tenantId);
}
}
}

35
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java

@ -15,10 +15,15 @@
*/
package org.thingsboard.server.service.cf;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.msg.CalculatedFieldStatePartitionRestoreMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.exception.CalculatedFieldStateException;
@ -37,6 +42,7 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto;
@Slf4j
public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService {
@Autowired
@ -62,19 +68,38 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback);
protected void processRestoredState(CalculatedFieldStateProto stateMsg) {
protected void processRestoredState(CalculatedFieldStateProto stateMsg, TopicPartitionInfo partition) {
var id = fromProto(stateMsg.getId());
if (partition == null) {
try {
partition = actorSystemContext.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, id.tenantId(), id.entityId());
} catch (TenantNotFoundException e) {
log.debug("Skipping CF state msg for non-existing tenant {}", id.tenantId());
return;
}
}
var state = fromProto(id, stateMsg);
processRestoredState(id, state);
processRestoredState(id, state, partition);
}
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state) {
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state));
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TopicPartitionInfo partition) {
partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME);
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state, partition));
}
@Override
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
stateService.update(queueKey, partitions, null);
stateService.update(queueKey, partitions, new QueueStateService.RestoreCallback() {
@Override
public void onAllPartitionsRestored() {
}
@Override
public void onPartitionRestored(TopicPartitionInfo partition) {
partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME);
actorSystemContext.tellWithHighPriority(new CalculatedFieldStatePartitionRestoreMsg(partition));
}
});
}
@Override

18
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -16,7 +16,10 @@
package org.thingsboard.server.service.cf.ctx.state;
import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.utils.CalculatedFieldUtils;
@ -29,21 +32,32 @@ import java.util.Map;
public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected final EntityId entityId;
protected CalculatedFieldCtx ctx;
protected TbActorRef actorCtx;
protected List<String> requiredArguments;
protected Map<String, ArgumentEntry> arguments = new HashMap<>();
protected boolean sizeExceedsLimit;
protected long latestTimestamp = -1;
@Setter
private TopicPartitionInfo partition;
public BaseCalculatedFieldState(EntityId entityId) {
this.entityId = entityId;
}
@Override
public void init(CalculatedFieldCtx ctx) {
public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) {
this.ctx = ctx;
this.actorCtx = actorCtx;
this.requiredArguments = ctx.getArgNames();
}
@Override
public void init() {
}
@Override
public Map<String, ArgumentEntry> update(Map<String, ArgumentEntry> argumentValues, CalculatedFieldCtx ctx) {
Map<String, ArgumentEntry> updatedArguments = null;
@ -82,7 +96,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
}
@Override
public void reset(CalculatedFieldCtx ctx) { // must reset everything dependent on arguments
public void reset() { // must reset everything dependent on arguments
requiredArguments = null;
arguments.clear();
sizeExceedsLimit = false;

15
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf.ctx.state;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.objecthunter.exp4j.Expression;
import org.mvel2.MVEL;
import org.thingsboard.common.util.ExpressionUtils;
@ -25,6 +26,8 @@ import org.thingsboard.script.api.tbel.TbelCfCtx;
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldReevaluateMsg;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.alarm.rule.AlarmRule;
import org.thingsboard.server.common.data.alarm.rule.condition.expression.TbelAlarmConditionExpression;
@ -61,9 +64,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Stream;
@Data
@Slf4j
public class CalculatedFieldCtx {
private CalculatedField calculatedField;
@ -80,8 +85,8 @@ public class CalculatedFieldCtx {
private Output output;
private String expression;
private boolean useLatestTs;
private boolean requiresScheduledReevaluation;
private ActorSystemContext systemContext;
private TbelInvokeService tbelInvokeService;
private RelationService relationService;
private AlarmSubscriptionService alarmService;
@ -158,7 +163,7 @@ public class CalculatedFieldCtx {
if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) {
this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L;
}
this.requiresScheduledReevaluation = calculatedField.getConfiguration().requiresScheduledReevaluation();
this.systemContext = systemContext;
this.tbelInvokeService = systemContext.getTbelInvokeService();
this.relationService = systemContext.getRelationService();
this.alarmService = systemContext.getAlarmService();
@ -236,6 +241,12 @@ public class CalculatedFieldCtx {
return tbelExpressions.get(expression).executeScriptAsync(args.toArray());
}
public ScheduledFuture<?> scheduleReevaluation(long delayMs, TbActorRef actorCtx) {
log.debug("[{}] Scheduling CF reevaluation in {} ms", cfId, delayMs);
// TODO: use single lazy-loaded instance of CalculatedFieldReevaluateMsg
return systemContext.scheduleMsgWithDelay(actorCtx, new CalculatedFieldReevaluateMsg(tenantId, this), delayMs);
}
private TbelCfArg toTbelArgument(String key, CalculatedFieldState state) {
return state.getArguments().get(key).toTbelCfArg();
}

12
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java

@ -20,7 +20,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState;
@ -47,11 +49,13 @@ public interface CalculatedFieldState {
long getLatestTimestamp();
void init(CalculatedFieldCtx ctx);
void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx);
void init();
Map<String, ArgumentEntry> update(Map<String, ArgumentEntry> arguments, CalculatedFieldCtx ctx);
void reset(CalculatedFieldCtx ctx);
void reset();
ListenableFuture<CalculatedFieldResult> performCalculation(Map<String, ArgumentEntry> updatedArgs, CalculatedFieldCtx ctx);
@ -65,6 +69,10 @@ public interface CalculatedFieldState {
return !isSizeExceedsLimit();
}
TopicPartitionInfo getPartition();
void setPartition(TopicPartitionInfo partition);
void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize);
default void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) {

10
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java

@ -43,6 +43,7 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString;
@ -77,9 +78,9 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
try {
if (msg.getValue() != null) {
processRestoredState(msg.getValue());
processRestoredState(msg.getValue(), consumerKey.partition());
} else {
processRestoredState(getStateId(msg.getHeaders()), null);
processRestoredState(getStateId(msg.getHeaders()), null, consumerKey.partition());
}
} catch (Throwable t) {
log.error("Failed to process state message: {}", msg, t);
@ -104,6 +105,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>>) queueFactory.createCalculatedFieldStateProducer();
}
@Override
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
stateService.update(queueKey, partitions, null);
}
@Override
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId());

5
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@ -64,8 +63,8 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
if (stateService.getPartitions().isEmpty()) {
cfRocksDb.forEach((key, value) -> {
try {
processRestoredState(CalculatedFieldStateProto.parseFrom(value));
} catch (InvalidProtocolBufferException e) {
processRestoredState(CalculatedFieldStateProto.parseFrom(value), null);
} catch (Exception e) {
log.error("[{}] Failed to process restored state", key, e);
}
});

133
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmCalculatedFieldState.java

@ -21,11 +21,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.KvUtil;
import org.thingsboard.rule.engine.action.TbAlarmResult;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmApiCallResult;
@ -61,10 +63,15 @@ import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.thingsboard.server.common.data.StringUtils.equalsAny;
import static org.thingsboard.server.common.data.StringUtils.splitByCommaWithoutQuotes;
import static org.thingsboard.server.service.cf.ctx.state.alarm.AlarmEvalResult.Status.FALSE;
import static org.thingsboard.server.service.cf.ctx.state.alarm.AlarmEvalResult.Status.NOT_YET_TRUE;
import static org.thingsboard.server.service.cf.ctx.state.alarm.AlarmEvalResult.Status.TRUE;
@EqualsAndHashCode(callSuper = true)
@Slf4j
@ -76,6 +83,7 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
@Getter
private final Map<AlarmSeverity, AlarmRuleState> createRuleStates = new TreeMap<>(Comparator.comparing(Enum::ordinal));
@Getter
@Setter
private AlarmRuleState clearRuleState;
@Getter
@ -87,36 +95,71 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
}
@Override
public void init(CalculatedFieldCtx ctx) {
super.init(ctx);
public void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx) {
super.setCtx(ctx, actorCtx);
this.alarmType = ctx.getCalculatedField().getName();
this.configuration = getConfiguration(ctx);
}
@Override
public void init() { // todo: properly close state!
super.init();
AtomicBoolean reevalNeeded = new AtomicBoolean(false);
Map<AlarmSeverity, AlarmRule> createRules = configuration.getCreateRules();
createRules.forEach((severity, rule) -> {
AlarmRuleState ruleState = createRuleStates.get(severity);
if (ruleState == null) {
ruleState = new AlarmRuleState(severity, rule, this);
createRuleStates.put(severity, ruleState);
} else { // can be null if was restored
ruleState.setAlarmRule(rule);
// todo: is it enough to just set new alarm rule to alarm rule state? is it ok to leave the state as were??
for (AlarmSeverity severity : AlarmSeverity.values()) {
AlarmRule rule = createRules.get(severity);
if (rule != null) {
createRuleStates.compute(severity, (__, ruleState) -> {
return initRuleState(severity, rule, ruleState, reevalNeeded);
});
} else {
AlarmRuleState state = createRuleStates.remove(severity);
if (state != null) {
clearState(state);
}
}
});
createRuleStates.keySet().removeIf(severity -> !createRules.containsKey(severity));
}
AlarmRule clearRule = configuration.getClearRule();
if (clearRule != null) {
if (clearRuleState == null) {
clearRuleState = new AlarmRuleState(null, clearRule, this);
} else {
clearRuleState.setAlarmRule(clearRule);
clearRuleState = initRuleState(null, clearRule, clearRuleState, reevalNeeded);
} else {
if (clearRuleState != null) {
clearState(clearRuleState);
clearRuleState = null;
}
}
log.debug("Initialized create rule states {} and clear rule state {} for {}", createRuleStates, clearRuleState, configuration);
if (reevalNeeded.get()) {
initCurrentAlarm(ctx);
createOrClearAlarms(state -> {
if (state.getCondition().getType() == AlarmConditionType.DURATION) {
AlarmEvalResult evalResult = state.reeval(System.currentTimeMillis());
if (evalResult.getStatus() == TRUE || evalResult.getStatus() == NOT_YET_TRUE) {
ScheduledFuture<?> future = ctx.scheduleReevaluation(evalResult.getLeftDuration(), actorCtx);
// TODO: use single task for multiple durations if durations are close enough. but be careful when cancelling the task in one of the states
if (future != null) {
state.setDurationCheckFuture(future);
}
}
}
return AlarmEvalResult.NOT_YET_TRUE;
}, ctx);
}
}
private AlarmRuleState initRuleState(AlarmSeverity severity, AlarmRule rule, AlarmRuleState ruleState, AtomicBoolean reevalNeeded) {
if (ruleState == null) {
ruleState = new AlarmRuleState(severity, rule, this);
} else {
clearRuleState = null;
// when restored
ruleState.setAlarmRule(rule);
if (rule.getCondition().getType() == AlarmConditionType.DURATION && !ruleState.isEmpty()) {
reevalNeeded.set(true);
}
}
log.debug("Initialized create rule states {} and clear rule state {} for {}", createRuleStates, clearRuleState, ctx.getCalculatedField());
return ruleState;
}
@Override
@ -125,8 +168,12 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
}
@Override
public void reset(CalculatedFieldCtx ctx) {
super.reset(ctx);
public void reset() {
super.reset();
createRuleStates.values().forEach(AlarmRuleState::clear);
if (clearRuleState != null) {
clearRuleState.clear();
}
}
@Override
@ -135,9 +182,19 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
TbAlarmResult result = createOrClearAlarms(state -> {
if (updatedArgs != null) {
boolean newEvent = !updatedArgs.isEmpty();
return state.eval(newEvent, ctx);
AlarmEvalResult evalResult = state.eval(newEvent, ctx);
if (evalResult.getStatus() == NOT_YET_TRUE && evalResult.getLeftDuration() > 0) {
// rounding up to the closest second
// long leftDuration = (long) Math.ceil(evalResult.getLeftDuration() / 1000.0) * 1000;
long leftDuration = evalResult.getLeftDuration();
ScheduledFuture<?> future = ctx.scheduleReevaluation(leftDuration, actorCtx); // TODO: use single task for multiple durations if durations are close enough. but be careful when cancelling the task in one of the states
if (future != null) {
state.setDurationCheckFuture(future);
}
}
return evalResult;
} else {
return state.eval(System.currentTimeMillis());
return state.reeval(System.currentTimeMillis());
}
}, ctx);
return Futures.immediateFuture(AlarmCalculatedFieldResult.builder()
@ -177,11 +234,11 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
for (AlarmRuleState state : createRuleStates.values()) {
AlarmEvalResult evalResult = evalFunction.apply(state);
log.debug("Evaluated create rule {} with args {}. Result: {}", state, arguments, evalResult);
if (evalResult == AlarmEvalResult.TRUE) {
if (evalResult.getStatus() == TRUE) {
resultState = state;
break;
} else if (evalResult == AlarmEvalResult.FALSE) {
clearAlarmState(state);
} else if (evalResult.getStatus() == FALSE) {
clearState(state);
}
}
@ -189,15 +246,15 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
result = calculateAlarmResult(resultState, ctx);
resultStateInfo = resultState.getStateInfo();
log.debug("Alarm result for state {}: {}", resultState, result);
clearAlarmState(clearRuleState);
clearState(clearRuleState);
} else if (currentAlarm != null && clearRuleState != null) {
AlarmEvalResult evalResult = evalFunction.apply(clearRuleState);
log.debug("Evaluated clear rule {} with args {}. Result: {}", clearRuleState, arguments, evalResult);
if (evalResult == AlarmEvalResult.TRUE) {
if (evalResult.getStatus() == TRUE) {
resultStateInfo = clearRuleState.getStateInfo();
clearAlarmState(clearRuleState);
clearState(clearRuleState);
for (AlarmRuleState state : createRuleStates.values()) {
clearAlarmState(state);
clearState(state);
}
AlarmApiCallResult clearResult = ctx.getAlarmService().clearAlarm(
ctx.getTenantId(), currentAlarm.getId(), System.currentTimeMillis(), createDetails(clearRuleState), true
@ -207,12 +264,11 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
.isCleared(true)
.alarm(clearResult.getAlarm())
.build();
addStateInfo(result, clearRuleState);
resultState = clearRuleState;
}
currentAlarm = null;
} else if (evalResult == AlarmEvalResult.FALSE) {
clearAlarmState(clearRuleState);
} else if (evalResult.getStatus() == FALSE) {
clearState(clearRuleState);
}
}
if (result != null && resultState != null) {
@ -222,8 +278,9 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
return result;
}
private void clearAlarmState(AlarmRuleState state) {
private void clearState(AlarmRuleState state) {
if (state != null) {
log.debug("Clearing rule state {}", state);
state.clear();
}
}
@ -283,14 +340,6 @@ public class AlarmCalculatedFieldState extends BaseCalculatedFieldState {
}
}
private void addStateInfo(TbAlarmResult alarmResult, AlarmRuleState ruleState) {
if (ruleState.getCondition().getType() == AlarmConditionType.REPEATING) {
alarmResult.setConditionRepeats(ruleState.getEventCount());
} else if (ruleState.getCondition().getType() == AlarmConditionType.DURATION) {
alarmResult.setConditionDuration(ruleState.getDuration());
}
}
private JsonNode createDetails(AlarmRuleState ruleState) {
JsonNode alarmDetails;
String alarmDetailsStr = ruleState.getAlarmRule().getAlarmDetails();

27
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmEvalResult.java

@ -15,8 +15,31 @@
*/
package org.thingsboard.server.service.cf.ctx.state.alarm;
public enum AlarmEvalResult {
import lombok.Data;
import lombok.RequiredArgsConstructor;
FALSE, NOT_YET_TRUE, TRUE;
@Data
@RequiredArgsConstructor
public class AlarmEvalResult {
public static final AlarmEvalResult TRUE = new AlarmEvalResult(Status.TRUE);
public static final AlarmEvalResult FALSE = new AlarmEvalResult(Status.FALSE);
public static final AlarmEvalResult NOT_YET_TRUE = new AlarmEvalResult(Status.NOT_YET_TRUE);
private final Status status;
private final long leftDuration;
private final long leftEvents;
public AlarmEvalResult(Status status) {
this(status, 0, 0);
}
public static AlarmEvalResult notYetTrue(long leftEvents, long leftDuration) {
return new AlarmEvalResult(Status.NOT_YET_TRUE, leftDuration, leftEvents);
}
public enum Status {
FALSE, NOT_YET_TRUE, TRUE;
}
}

63
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/alarm/AlarmRuleState.java

@ -38,6 +38,7 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
@Data
@Slf4j
@ -49,9 +50,11 @@ public class AlarmRuleState {
private AlarmCondition condition;
private long lastEventTs;
private long duration;
private long eventCount;
private long firstEventTs; // when duration condition started
private long lastEventTs;
private transient long duration;
private ScheduledFuture<?> durationCheckFuture;
public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, AlarmCalculatedFieldState state) {
this.severity = severity;
@ -70,17 +73,22 @@ public class AlarmRuleState {
};
}
public AlarmEvalResult eval(long ts) { // on schedule
public AlarmEvalResult reeval(long ts) {
switch (condition.getType()) {
case SIMPLE, REPEATING -> {
return AlarmEvalResult.NOT_YET_TRUE;
}
case DURATION -> {
long requiredDurationInMs = getRequiredDurationInMs();
if (requiredDurationInMs > 0 && lastEventTs > 0 && ts > lastEventTs) {
long duration = this.duration + (ts - lastEventTs);
long requiredDuration = getRequiredDurationInMs();
if (requiredDuration > 0 && lastEventTs > 0 && ts > lastEventTs) {
duration = ts - firstEventTs;
if (isActive(ts)) {
return duration > requiredDurationInMs ? AlarmEvalResult.TRUE : AlarmEvalResult.NOT_YET_TRUE;
long leftDuration = requiredDuration - duration;
if (leftDuration <= 0) {
return AlarmEvalResult.TRUE;
} else {
return AlarmEvalResult.notYetTrue(0, leftDuration);
}
} else {
return AlarmEvalResult.FALSE;
}
@ -101,7 +109,8 @@ public class AlarmRuleState {
eventCount++;
}
long requiredRepeats = getIntValue(((RepeatingAlarmCondition) condition).getCount());
return eventCount >= requiredRepeats ? AlarmEvalResult.TRUE : AlarmEvalResult.NOT_YET_TRUE;
long leftRepeats = requiredRepeats - eventCount;
return leftRepeats <= 0 ? AlarmEvalResult.TRUE : AlarmEvalResult.notYetTrue(leftRepeats, 0);
} else {
return AlarmEvalResult.FALSE;
}
@ -109,17 +118,26 @@ public class AlarmRuleState {
private AlarmEvalResult evalDuration(boolean active, CalculatedFieldCtx ctx) {
if (active && eval(condition.getExpression(), ctx)) {
long eventTs = state.getLatestTimestamp();
if (lastEventTs > 0) {
if (state.getLatestTimestamp() > lastEventTs) {
duration = duration + (state.getLatestTimestamp() - lastEventTs);
lastEventTs = state.getLatestTimestamp();
if (eventTs > lastEventTs) {
if (firstEventTs == 0) {
firstEventTs = lastEventTs;
}
lastEventTs = eventTs;
}
} else {
lastEventTs = state.getLatestTimestamp();
duration = 0L;
firstEventTs = eventTs;
lastEventTs = eventTs;
}
duration = lastEventTs - firstEventTs;
long requiredDuration = getRequiredDurationInMs();
long leftDuration = requiredDuration - duration;
if (leftDuration <= 0) {
return AlarmEvalResult.TRUE;
} else {
return AlarmEvalResult.notYetTrue(0, leftDuration);
}
long requiredDurationInMs = getRequiredDurationInMs();
return duration > requiredDurationInMs ? AlarmEvalResult.TRUE : AlarmEvalResult.NOT_YET_TRUE;
} else {
return AlarmEvalResult.FALSE;
}
@ -190,8 +208,17 @@ public class AlarmRuleState {
public void clear() {
eventCount = 0L;
firstEventTs = 0L;
lastEventTs = 0L;
duration = 0L;
if (durationCheckFuture != null) {
durationCheckFuture.cancel(true);
durationCheckFuture = null;
}
}
public boolean isEmpty() {
return eventCount == 0L && firstEventTs == 0L && lastEventTs == 0L && durationCheckFuture == null;
}
private Integer getIntValue(AlarmConditionValue<Integer> value) {
@ -216,7 +243,7 @@ public class AlarmRuleState {
if (condition.getType() == AlarmConditionType.REPEATING) {
return new StateInfo(eventCount, null);
} else if (condition.getType() == AlarmConditionType.DURATION) {
return new StateInfo(null, duration + (System.currentTimeMillis() - lastEventTs));
return new StateInfo(null, duration);
} else {
return StateInfo.EMPTY;
}
@ -227,9 +254,11 @@ public class AlarmRuleState {
return "AlarmRuleState{" +
"severity=" + severity +
", condition=" + condition +
", eventCount=" + eventCount +
", firstEventTs=" + firstEventTs +
", lastEventTs=" + lastEventTs +
", duration=" + duration +
", eventCount=" + eventCount +
", durationCheckFuture=" + durationCheckFuture +
'}';
}

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java

@ -141,8 +141,8 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState {
}
@Override
public void reset(CalculatedFieldCtx ctx) {
super.reset(ctx);
public void reset() {
super.reset();
lastDynamicArgumentsRefreshTs = -1;
}

3
application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java

@ -38,6 +38,7 @@ import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeleteQueueTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask.ConsumerKey;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.TbMsgPackCallback;
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
@ -127,7 +128,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
@Override
protected void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs,
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer,
Object consumerKey,
ConsumerKey consumerKey,
Queue queue) throws Exception {
TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue);
TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue);

24
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java

@ -115,12 +115,21 @@ public class CalculatedFieldUtils {
private static AlarmRuleStateProto toAlarmRuleStateProto(AlarmRuleState ruleState) {
return AlarmRuleStateProto.newBuilder()
.setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse(""))
.setLastEventTs(ruleState.getLastEventTs())
.setDuration(ruleState.getDuration())
.setEventCount(ruleState.getEventCount())
.setFirstEventTs(ruleState.getFirstEventTs())
.setLastEventTs(ruleState.getLastEventTs())
.build();
}
private static AlarmRuleState fromAlarmRuleStateProto(AlarmRuleStateProto proto, AlarmCalculatedFieldState state) {
AlarmSeverity severity = StringUtils.isNotEmpty(proto.getSeverity()) ? AlarmSeverity.valueOf(proto.getSeverity()) : null;
AlarmRuleState ruleState = new AlarmRuleState(severity, null, state);
ruleState.setEventCount(proto.getEventCount());
ruleState.setFirstEventTs(proto.getFirstEventTs());
ruleState.setLastEventTs(proto.getLastEventTs());
return ruleState;
}
public static SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) {
SingleValueArgumentProto.Builder builder = SingleValueArgumentProto.newBuilder()
.setArgName(argName);
@ -196,12 +205,11 @@ public class CalculatedFieldUtils {
AlarmCalculatedFieldState alarmState = (AlarmCalculatedFieldState) state;
AlarmStateProto alarmStateProto = proto.getAlarmState();
for (AlarmRuleStateProto ruleStateProto : alarmStateProto.getCreateRuleStatesList()) {
AlarmSeverity severity = StringUtils.isNotEmpty(ruleStateProto.getSeverity()) ? AlarmSeverity.valueOf(ruleStateProto.getSeverity()) : null;
AlarmRuleState ruleState = new AlarmRuleState(severity, null, alarmState);
ruleState.setLastEventTs(ruleStateProto.getLastEventTs());
ruleState.setDuration(ruleStateProto.getDuration());
ruleState.setEventCount(ruleStateProto.getEventCount());
alarmState.getCreateRuleStates().put(severity, ruleState);
AlarmRuleState ruleState = fromAlarmRuleStateProto(ruleStateProto, alarmState);
alarmState.getCreateRuleStates().put(ruleState.getSeverity(), ruleState);
}
if (alarmStateProto.hasClearRuleState()) {
alarmState.setClearRuleState(fromAlarmRuleStateProto(alarmStateProto.getClearRuleState(), alarmState));
}
}
}

3
application/src/main/resources/thingsboard.yml

@ -526,9 +526,6 @@ actors:
configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}"
# Time in seconds to receive calculation result.
calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}"
alarms:
# Interval in seconds to re-evaluate Alarm rules with duration condition
reevaluation_interval: "${ACTORS_ALARMS_REEVALUATION_INTERVAL_SEC:60}"
debug:
settings:

7
application/src/test/java/org/thingsboard/server/cf/AlarmRulesTest.java

@ -21,7 +21,6 @@ import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.action.TbAlarmResult;
@ -72,9 +71,6 @@ import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@Slf4j
@DaoSqlTest
@TestPropertySource(properties = {
"actors.alarms.reevaluation_interval=1"
})
public class AlarmRulesTest extends AbstractControllerTest {
@MockitoSpyBean
@ -235,10 +231,9 @@ public class AlarmRulesTest extends AbstractControllerTest {
Map<AlarmSeverity, Condition> createRules = Map.of(
AlarmSeverity.CRITICAL, new Condition("return powerConsumption >= 3000;", null, createDurationMs)
);
long clearDurationMs = 2000L;
Condition clearRule = new Condition("return powerConsumption < 3000;", null, createDurationMs);
CalculatedField calculatedField = createAlarmCf(deviceId, "High power consumption during 3 seconds",
CalculatedField calculatedField = createAlarmCf(deviceId, "High power consumption during 5 seconds",
arguments, createRules, clearRule);
postTelemetry(deviceId, "{\"powerConsumption\":3500}");
Thread.sleep(createDurationMs - 2000);

3
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldStateTest.java

@ -104,7 +104,8 @@ public class GeofencingCalculatedFieldStateTest {
ctx = new CalculatedFieldCtx(getCalculatedField(), systemContext);
ctx.init();
state = new GeofencingCalculatedFieldState(ctx.getEntityId());
state.init(ctx);
state.setCtx(ctx, null);
state.init();
}
@Test

3
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java

@ -87,7 +87,8 @@ public class ScriptCalculatedFieldStateTest {
ctx = new CalculatedFieldCtx(getCalculatedField(), systemContext);
ctx.init();
state = new ScriptCalculatedFieldState(ctx.getEntityId());
state.init(ctx);
state.setCtx(ctx, null);
state.init();
}
@Test

3
application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java

@ -79,7 +79,8 @@ public class SimpleCalculatedFieldStateTest {
ctx = new CalculatedFieldCtx(getCalculatedField(), systemContext);
ctx.init();
state = new SimpleCalculatedFieldState(ctx.getEntityId());
state.init(ctx);
state.setCtx(ctx, null);
state.init();
}
@Test

19
application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java

@ -595,7 +595,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
await().atMost(5, TimeUnit.SECONDS).until(() -> {
for (TopicPartitionInfo partition : expectedPartitions) {
if (consumers.stream().noneMatch(consumer -> consumer.subscribed &&
consumer.pollingStarted && Set.of(partition).equals(consumer.getPartitions()))) {
consumer.pollingStarted && Set.of(partition).equals(consumer.getPartitions()))) {
return false;
}
}
@ -605,7 +605,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
await().atMost(5, TimeUnit.SECONDS).until(() -> {
return consumers.size() == 1 && consumers.stream()
.anyMatch(consumer -> consumer.subscribed && consumer.pollingStarted &&
expectedPartitions.equals(consumer.getPartitions()));
expectedPartitions.equals(consumer.getPartitions()));
});
}
Mockito.reset(ruleEngineConsumerContext.getSubmitStrategyFactory());
@ -667,8 +667,8 @@ public class TbRuleEngineQueueConsumerManagerTest {
return await().atMost(5, TimeUnit.SECONDS)
.until(() -> consumers.stream()
.filter(consumer -> consumer.getPartitions() != null &&
consumer.getPartitions().size() == 1 &&
consumer.getPartitions().contains(tpi))
consumer.getPartitions().size() == 1 &&
consumer.getPartitions().contains(tpi))
.findFirst().orElse(null), Objects::nonNull);
}
@ -676,9 +676,9 @@ public class TbRuleEngineQueueConsumerManagerTest {
return await().atMost(5, TimeUnit.SECONDS)
.until(() -> consumers.stream()
.filter(consumer -> consumer.getPartitions() != null &&
consumer.getPartitions().size() == 1 &&
consumer.getPartitions().stream()
.anyMatch(tpi -> tpi.getPartition().get().equals(partition)))
consumer.getPartitions().size() == 1 &&
consumer.getPartitions().stream()
.anyMatch(tpi -> tpi.getPartition().get().equals(partition)))
.findFirst().orElse(null), Objects::nonNull);
}
@ -778,10 +778,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
return false;
}
public Set<TopicPartitionInfo> getPartitions() {
return partitions;
}
public void setUpTestMsg() {
testMsg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
@ -790,6 +786,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
.data("{}")
.build();
}
}
}

4
application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java

@ -43,6 +43,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask.ConsumerKey;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
@ -191,6 +192,7 @@ public class TbRuleEngineStrategyTest {
queue.setProcessingStrategy(processingStrategy);
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
ConsumerKey consumerKey = new ConsumerKey(queueKey, null);
var consumerManager = TbRuleEngineQueueConsumerManager.create()
.ctx(ruleEngineConsumerContext)
.queueKey(queueKey)
@ -238,7 +240,7 @@ public class TbRuleEngineStrategyTest {
.map(this::toProto)
.toList();
consumerManager.processMsgs(protoMsgs, consumer, queueKey, queue);
consumerManager.processMsgs(protoMsgs, consumer, consumerKey, queue);
processingData.forEach(data -> {
verify(actorContext, times(data.attempts)).tell(argThat(msg ->

2
common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java

@ -38,6 +38,8 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
boolean isStopped();
Set<TopicPartitionInfo> getPartitions();
List<String> getFullTopicNames();
}

7
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AlarmCalculatedFieldConfiguration.java

@ -20,7 +20,6 @@ import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.rule.AlarmRule;
import org.thingsboard.server.common.data.alarm.rule.condition.AlarmConditionType;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import java.util.List;
@ -59,10 +58,4 @@ public class AlarmCalculatedFieldConfiguration implements ArgumentsBasedCalculat
}
@Override
public boolean requiresScheduledReevaluation() {
return createRules.values().stream().anyMatch(rule -> rule.getCondition().getType() == AlarmConditionType.DURATION) ||
(clearRule != null && clearRule.getCondition().getType() == AlarmConditionType.DURATION);
}
}

4
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java

@ -72,8 +72,4 @@ public interface CalculatedFieldConfiguration {
.collect(Collectors.toList());
}
default boolean requiresScheduledReevaluation() {
return false;
}
}

37
common/message/src/main/java/org/thingsboard/server/common/msg/CalculatedFieldStatePartitionRestoreMsg.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@Data
public class CalculatedFieldStatePartitionRestoreMsg implements ToCalculatedFieldSystemMsg {
private final TopicPartitionInfo partition;
@Override
public TenantId getTenantId() {
return TenantId.SYS_TENANT_ID;
}
@Override
public MsgType getMsgType() {
return MsgType.CF_STATE_PARTITION_RESTORE_MSG;
}
}

1
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -138,6 +138,7 @@ public enum MsgType {
CF_CACHE_INIT_MSG, // Sent to init caches for CF actor;
CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state;
CF_STATE_PARTITION_RESTORE_MSG,
CF_PARTITIONS_CHANGE_MSG, // Sent when cluster event occures;
CF_ENTITY_LIFECYCLE_MSG, // Sent on CF/Device/Asset create/update/delete;

6
common/proto/src/main/proto/queue.proto

@ -1914,7 +1914,7 @@ message AlarmStateProto {
message AlarmRuleStateProto {
string severity = 1;
int64 lastEventTs = 2;
int64 duration = 3;
int64 eventCount = 4;
int64 eventCount = 2;
int64 firstEventTs = 3;
int64 lastEventTs = 4;
}

5
common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java

@ -194,6 +194,11 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
abstract protected void doUnsubscribe();
@Override
public Set<TopicPartitionInfo> getPartitions() {
return partitions;
}
@Override
public List<String> getFullTopicNames() {
if (partitions == null) {

17
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java

@ -25,6 +25,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask.ConsumerKey;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.Collection;
@ -218,7 +219,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
consumerTask.setTask(consumerLoop);
}
private void consumerLoop(Object consumerKey, TbQueueConsumer<M> consumer) {
private void consumerLoop(ConsumerKey consumerKey, TbQueueConsumer<M> consumer) {
try {
while (!stopped && !consumer.isStopped()) {
try {
@ -250,7 +251,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
}
protected void processMsgs(List<M> msgs, TbQueueConsumer<M> consumer, Object consumerKey, C config) throws Exception {
protected void processMsgs(List<M> msgs, TbQueueConsumer<M> consumer, ConsumerKey consumerKey, C config) throws Exception {
log.trace("Processing {} messages", msgs.size());
msgPackProcessor.process(msgs, consumer, consumerKey, config);
log.trace("Processed {} messages", msgs.size());
@ -273,7 +274,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
public interface MsgPackProcessor<M extends TbQueueMsg, C extends QueueConfig> {
void process(List<M> msgs, TbQueueConsumer<M> consumer, Object consumerKey, C config) throws Exception;
void process(List<M> msgs, TbQueueConsumer<M> consumer, ConsumerKey consumerKey, C config) throws Exception;
}
public interface ConsumerWrapper<M extends TbQueueMsg> {
@ -285,6 +286,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
class ConsumerPerPartitionWrapper implements ConsumerWrapper<M> {
private final Map<TopicPartitionInfo, TbQueueConsumerTask<M>> consumers = new HashMap<>();
@Override
@ -307,8 +309,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
partitions.forEach(tpi -> {
Integer partitionId = tpi.getPartition().orElse(-1);
String key = queueKey + "-" + partitionId;
ConsumerKey key = new ConsumerKey(queueKey, tpi);
Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null;
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> {
@ -328,9 +329,11 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
public Collection<TbQueueConsumerTask<M>> getConsumers() {
return consumers.values();
}
}
class SingleConsumerWrapper implements ConsumerWrapper<M> {
private TbQueueConsumerTask<M> consumer;
@Override
@ -346,7 +349,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
if (consumer == null) {
consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null), null); // no partitionId passed
consumer = new TbQueueConsumerTask<>(new ConsumerKey(queueKey, null), () -> consumerCreator.apply(config, null), null); // no partitionId passed
}
consumer.subscribe(partitions);
if (!consumer.isRunning()) {
@ -361,5 +364,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
return List.of(consumer);
}
}
}

18
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java

@ -32,7 +32,7 @@ import java.util.function.Supplier;
public class TbQueueConsumerTask<M extends TbQueueMsg> {
@Getter
private final Object key;
private final ConsumerKey key;
private volatile TbQueueConsumer<M> consumer;
private volatile Supplier<TbQueueConsumer<M>> consumerSupplier;
@Getter
@ -41,7 +41,7 @@ public class TbQueueConsumerTask<M extends TbQueueMsg> {
@Setter
private Future<?> task;
public TbQueueConsumerTask(Object key, Supplier<TbQueueConsumer<M>> consumerSupplier, Runnable callback) {
public TbQueueConsumerTask(ConsumerKey key, Supplier<TbQueueConsumer<M>> consumerSupplier, Runnable callback) {
this.key = key;
this.consumer = null;
this.consumerSupplier = consumerSupplier;
@ -97,4 +97,18 @@ public class TbQueueConsumerTask<M extends TbQueueMsg> {
return task != null;
}
public record ConsumerKey(Object queueKey, TopicPartitionInfo partition) {
@Override
public String toString() {
if (partition != null) {
Integer partitionId = partition.getPartition().orElse(-1);
return queueKey + "-" + partitionId;
} else {
return queueKey.toString();
}
}
}
}

19
common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java

@ -15,10 +15,15 @@
*/
package org.thingsboard.server.queue.common.state;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Collections;
import java.util.Set;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
public class DefaultQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
@ -26,4 +31,18 @@ public class DefaultQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg
super(eventConsumer, Collections.emptyList());
}
@Override
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, RestoreCallback callback) {
if (callback != null) {
for (TopicPartitionInfo partition : partitions) {
callback.onPartitionRestored(partition);
}
callback.onAllPartitionsRestored();
}
eventConsumer.addPartitions(partitions);
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.addPartitions(withTopic(partitions, consumer.getTopic()));
}
}
}

9
common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java

@ -50,7 +50,7 @@ public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
}
@Override
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, Runnable whenAllProcessed) {
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, RestoreCallback callback) {
Map<String, Long> eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states
Set<TopicPartitionInfo> statePartitions = withTopic(partitions, stateConsumer.getTopic());
@ -61,10 +61,13 @@ public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
try {
partitionsInProgress.remove(statePartition);
log.info("Finished partition {} (still in progress: {})", statePartition, partitionsInProgress);
if (callback != null) {
callback.onPartitionRestored(statePartition);
}
if (partitionsInProgress.isEmpty()) {
log.info("All partitions processed");
if (whenAllProcessed != null) {
whenAllProcessed.run();
if (callback != null) {
callback.onAllPartitionsRestored();
}
}

26
common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java

@ -49,7 +49,7 @@ public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueM
this.otherConsumers = otherConsumers;
}
public void update(QueueKey queueKey, Set<TopicPartitionInfo> newPartitions, Runnable whenAllProcessed) {
public void update(QueueKey queueKey, Set<TopicPartitionInfo> newPartitions, RestoreCallback callback) {
newPartitions = withTopic(newPartitions, eventConsumer.getTopic());
var writeLock = partitionsLock.writeLock();
writeLock.lock();
@ -71,23 +71,15 @@ public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueM
}
if (!addedPartitions.isEmpty()) {
addPartitions(queueKey, addedPartitions, whenAllProcessed);
addPartitions(queueKey, addedPartitions, callback);
} else {
if (whenAllProcessed != null) {
whenAllProcessed.run();
if (callback != null) {
callback.onAllPartitionsRestored();
}
}
}
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, Runnable whenAllProcessed) {
if (whenAllProcessed != null) {
whenAllProcessed.run();
}
eventConsumer.addPartitions(partitions);
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.addPartitions(withTopic(partitions, consumer.getTopic()));
}
}
protected abstract void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, RestoreCallback callback) ;
protected void removePartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
eventConsumer.removePartitions(partitions);
@ -122,4 +114,12 @@ public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueM
eventConsumer.awaitStop();
}
public interface RestoreCallback {
void onAllPartitionsRestored();
default void onPartitionRestored(TopicPartitionInfo partition) {}
}
}

5
common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java

@ -109,6 +109,11 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
return stopped;
}
@Override
public Set<TopicPartitionInfo> getPartitions() {
return partitions;
}
@Override
public List<String> getFullTopicNames() {
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());

Loading…
Cancel
Save