|
|
|
@ -124,6 +124,7 @@ import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.function.BiConsumer; |
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; |
|
|
|
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; |
|
|
|
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; |
|
|
|
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED; |
|
|
|
@ -175,7 +176,7 @@ public class DefaultTbContext implements TbContext { |
|
|
|
} |
|
|
|
TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); |
|
|
|
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); |
|
|
|
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); |
|
|
|
TopicPartitionInfo tpi = resolvePartition(msg); |
|
|
|
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); |
|
|
|
} |
|
|
|
|
|
|
|
@ -192,7 +193,7 @@ public class DefaultTbContext implements TbContext { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) { |
|
|
|
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); |
|
|
|
TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME); |
|
|
|
enqueue(tpi, tbMsg, onFailure, onSuccess); |
|
|
|
} |
|
|
|
|
|
|
|
|