diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index a905738a2f..49d1af2b0f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -112,6 +112,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } else { states.remove(cfId); } + msg.getCallback().onSuccess(); } public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 590e097928..b8bc74db7a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -125,10 +125,13 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware if (msg.getState() != null) { msg.getState().setRequiredArguments(calculatedField.getArgNames()); } - log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId()); + log.debug("[{}] Pushing CF state restore msg to specific actor [{}]", tenantId, msg.getId().entityId()); getOrCreateActor(msg.getId().entityId()).tell(msg); - } else { + } else if (msg.getState() != null) { + log.debug("[{}] Received CF state restore msg for non-existing CF [{}]. Removing state", tenantId, cfId); cfStateService.removeState(msg.getId(), msg.getCallback()); + } else { + msg.getCallback().onSuccess(); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java index 19be7c02fa..fbe666c662 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java @@ -19,6 +19,7 @@ import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @@ -27,6 +28,7 @@ public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMs private final CalculatedFieldEntityCtxId id; private final CalculatedFieldState state; + private final TbCallback callback; @Override public MsgType getMsgType() { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 91d25a633b..fa17c0ca2c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -43,7 +43,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainType; -import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.TbMsg; @@ -138,13 +137,22 @@ public class TenantActor extends RuleChainManagerActor { @Override protected boolean doProcess(TbActorMsg msg) { if (cantFindTenant) { - log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg); - if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) { - QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg; - queueMsg.getMsg().getCallback().onSuccess(); - } else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) { - TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg; - transportMsg.getCallback().onSuccess(); + log.debug("[{}] Processing message for non-existing tenant: {}", tenantId, msg); + switch (msg.getMsgType()) { + case QUEUE_TO_RULE_ENGINE_MSG -> { + ((QueueToRuleEngineMsg) msg).getMsg().getCallback().onSuccess(); + } + case TRANSPORT_TO_DEVICE_ACTOR_MSG -> { + ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); + } + case CF_STATE_RESTORE_MSG -> { + ((CalculatedFieldStateRestoreMsg) msg).getCallback().onSuccess(); + } + default -> { + if (!log.isDebugEnabled()) { + log.info("[{}] Processing message for non-existing tenant: {}", tenantId, msg); + } + } } return true; } @@ -390,6 +398,7 @@ public class TenantActor extends RuleChainManagerActor { public TbActor createActor() { return new TenantActor(context, tenantId); } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index 70b41f069e..87bdda2b30 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -62,14 +62,14 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback); - protected void processRestoredState(CalculatedFieldStateProto stateMsg) { + protected void processRestoredState(CalculatedFieldStateProto stateMsg, TbCallback callback) { var id = fromProto(stateMsg.getId()); var state = fromProto(stateMsg); - processRestoredState(id, state); + processRestoredState(id, state, callback); } - protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state) { - actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state)); + protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TbCallback callback) { + actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state, callback)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 2b52892744..2773e13fa3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -43,6 +43,8 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString; @@ -61,6 +63,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Value("${queue.calculated_fields.poll_interval:25}") private long pollInterval; + @Value("${queue.calculated_fields.pack_processing_timeout:60000}") + private long packProcessingTimeout; private TbKafkaProducerTemplate> stateProducer; @@ -74,21 +78,39 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor((msgs, consumer, consumerKey, config) -> { + CountDownLatch completionLatch = new CountDownLatch(msgs.size()); for (TbProtoQueueMsg msg : msgs) { + TbCallback callback = new TbCallback() { + @Override + public void onSuccess() { + int processedMsgCount = counter.incrementAndGet(); + if (processedMsgCount % 10000 == 0) { + log.info("Processed {} CF state messages", processedMsgCount); + } + completionLatch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + log.error("Failed to process CF state message: {}", msg, t); + completionLatch.countDown(); + } + }; + try { if (msg.getValue() != null) { - processRestoredState(msg.getValue()); + processRestoredState(msg.getValue(), callback); } else { - processRestoredState(getStateId(msg.getHeaders()), null); + processRestoredState(getStateId(msg.getHeaders()), null, callback); } } catch (Throwable t) { - log.error("Failed to process state message: {}", msg, t); + callback.onFailure(t); } + } - int processedMsgCount = counter.incrementAndGet(); - if (processedMsgCount % 10000 == 0) { - log.info("Processed {} calculated field state msgs", processedMsgCount); - } + boolean success = completionLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS); + if (!success) { + log.error("Timeout to process CF state messages pack of size {}", msgs.size()); } }) .consumerCreator((queueConfig, tpi) -> queueFactory.createCalculatedFieldStateConsumer()) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 9dc6139ca5..d17060d093 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import com.google.protobuf.InvalidProtocolBufferException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -63,11 +62,22 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS public void restore(QueueKey queueKey, Set partitions) { if (stateService.getPartitions().isEmpty()) { cfRocksDb.forEach((key, value) -> { + CalculatedFieldStateProto stateMsg; try { - processRestoredState(CalculatedFieldStateProto.parseFrom(value)); - } catch (InvalidProtocolBufferException e) { - log.error("[{}] Failed to process restored state", key, e); + stateMsg = CalculatedFieldStateProto.parseFrom(value); + } catch (Exception e) { + log.error("Failed to parse CalculatedFieldStateProto for key {}", key, e); + return; } + processRestoredState(stateMsg, new TbCallback() { + @Override + public void onSuccess() {} + + @Override + public void onFailure(Throwable t) { + log.error("Failed to process CF state message: {}", stateMsg, t); + } + }); }); } super.restore(queueKey, partitions);