From 002ecc63424e8485271711df6c908cf425f2ae69 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 26 Feb 2025 11:16:05 +0200 Subject: [PATCH 1/2] used correct queueName for TPI in input node --- .../server/actors/ruleChain/DefaultTbContext.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 0070eba4a1..091b97ac9d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -124,6 +124,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED; @@ -175,7 +176,7 @@ public class DefaultTbContext implements TbContext { } TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(msg); doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @@ -192,7 +193,7 @@ public class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME); enqueue(tpi, tbMsg, onFailure, onSuccess); } From 01e99efc4ed6e59c21c551ab0e5da11083d27fea Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 26 Feb 2025 12:28:01 +0200 Subject: [PATCH 2/2] Minor refactoring for TbContext --- .../server/actors/ruleChain/DefaultTbContext.java | 5 ++--- .../thingsboard/rule/engine/api/TbContext.java | 15 ++++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 091b97ac9d..24edd8f8bf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.EventLoopGroup; import lombok.extern.slf4j.Slf4j; import org.bouncycastle.util.Arrays; +import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.MailService; @@ -64,7 +65,6 @@ import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.data.script.ScriptLanguage; import org.thingsboard.server.common.msg.TbActorMsg; @@ -193,8 +193,7 @@ public class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME); - enqueue(tpi, tbMsg, onFailure, onSuccess); + enqueue(tbMsg, MAIN_QUEUE_NAME, onSuccess, onFailure); } @Override diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index b517bb0051..b3fc646b94 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -144,11 +144,19 @@ public interface TbContext { /** * Puts new message to queue for processing by the Root Rule Chain + * WARNING: message is put to the Main queue. To specify other queue name - use {@link #enqueue(TbMsg, String, Runnable, Consumer)} * * @param msg - message */ void enqueue(TbMsg msg, Runnable onSuccess, Consumer onFailure); + /** + * Puts new message to custom queue for processing + * + * @param msg - message + */ + void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer onFailure); + /** * Sends message to the nested rule chain. * Fails processing of the message if the nested rule chain is not found. @@ -167,13 +175,6 @@ public interface TbContext { */ void output(TbMsg msg, String relationType); - /** - * Puts new message to custom queue for processing - * - * @param msg - message - */ - void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer onFailure); - void enqueueForTellFailure(TbMsg msg, String failureMessage); void enqueueForTellFailure(TbMsg tbMsg, Throwable t);