From e2a356703f5c4b7e8909d182f210fb102cd1d246 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 18 Oct 2024 17:05:22 +0200 Subject: [PATCH] minor refactoring, ack used if push is success --- .../actors/ruleChain/DefaultTbContext.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index b5b389da78..23741ad965 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -109,6 +109,8 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -172,18 +174,27 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { - ack(msg); if (!msg.isValid()) { return; } - msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); + inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(inputMsg)).build(); TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + ack(msg); + } + + @Override + public void onFailure(Throwable error) { + tellFailure(msg, error); + } + }); } @Override