diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 0070eba4a1..091b97ac9d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -124,6 +124,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED; @@ -175,7 +176,7 @@ public class DefaultTbContext implements TbContext { } TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(msg); doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @@ -192,7 +193,7 @@ public class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME); enqueue(tpi, tbMsg, onFailure, onSuccess); }