Browse Source

Fix infinite loop in rule engine caused by TbRuleChainInputNode self-referencing default rule chain

When a TbRuleChainInputNode has `forwardMsgToDefaultRuleChain=true` and the originator's
default rule chain is the same as the rule chain containing this node, the message enters
an infinite loop: the node forwards to the default rule chain, which routes back to the
same node, which forwards again, causing unbounded recursion and 100% CPU on rule-engine.

Fix: detect the loop in DefaultTbContext.input() by checking whether the calling rule node
is already present in the message's return stack (TbMsgProcessingCtx). On the second+
iteration the stack already contains the (ruleChainId, ruleNodeId) pair of the node,
so the call is a cycle. In that case tellFailure() is called with a descriptive message
and a WARN log is emitted instead of re-enqueuing the message.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
pull/15102/head
Sergey Matvienko 3 months ago
parent
commit
062c577d1f
  1. 15
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  2. 43
      application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java
  3. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
  4. 12
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java
  5. 89
      common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

15
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -183,11 +183,24 @@ public class DefaultTbContext implements TbContext {
if (!msg.isValid()) {
return;
}
RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId selfId = nodeCtx.getSelf().getId();
if (msg.isAlreadyInStack(selfRuleChainId, selfId)) {
log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " +
"The message will be failed to prevent infinite loop. " +
"Please check the rule chain configuration for circular references.",
nodeCtx.getTenantId(), selfId, selfRuleChainId);
tellFailure(msg, new RuntimeException(
"Detected rule chain processing loop for rule node [" + selfId + "] " +
"in rule chain [" + selfRuleChainId + "]. " +
"Please check the rule chain configuration for circular references."));
return;
}
TbMsg tbMsg = msg.copy()
.ruleChainId(ruleChainId)
.resetRuleNodeId()
.build();
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
tbMsg.pushToStack(selfRuleChainId, selfId);
TopicPartitionInfo tpi = resolvePartition(msg);
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
}

43
application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java

@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.TbMsgProcessingStackItem;
@ -134,6 +135,48 @@ class DefaultTbContextTest {
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
}
@Test
public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() {
// GIVEN - simulate the second iteration: current node is already in the return stack,
// which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain
// is the same as the rule chain containing this node
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
// Push current node into the stack - simulates a previous iteration that already forwarded to this rule chain
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN - loop detected: tellFailure is called and no message is enqueued
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class);
then(chainActorMock).should().tell(tellCaptor.capture());
TbActorMsg capturedTellMsg = tellCaptor.getValue();
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class);
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE);
assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop");
then(mainCtxMock).shouldHaveNoInteractions(); // no message was enqueued
}
@MethodSource
@ParameterizedTest
public void givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) {

4
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java

@ -266,6 +266,10 @@ public final class TbMsg implements Serializable {
ctx.push(ruleChainId, ruleNodeId);
}
public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return ctx.isAlreadyInStack(ruleChainId, ruleNodeId);
}
public TbMsgProcessingStackItem popFormStack() {
return ctx.pop();
}

12
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java

@ -63,6 +63,18 @@ public final class TbMsgProcessingCtx implements Serializable {
stack.add(new TbMsgProcessingStackItem(ruleChainId, ruleNodeId));
}
public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
if (stack == null || stack.isEmpty()) {
return false;
}
for (TbMsgProcessingStackItem item : stack) {
if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) {
return true;
}
}
return false;
}
public TbMsgProcessingStackItem pop() {
if (stack == null || stack.isEmpty()) {
return null;

89
common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

@ -0,0 +1,89 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
class TbMsgProcessingCtxTest {
private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11"));
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6"));
@Test
void givenEmptyStack_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithDifferentEntry_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithMatchingEntry_whenIsAlreadyInStack_thenReturnTrue() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue();
}
@Test
void givenStackWithMatchingEntryAmongOthers_whenIsAlreadyInStack_thenReturnTrue() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue();
}
@Test
void givenStackWithSameChainButDifferentNode_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithSameNodeButDifferentChain_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID);
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithEntryThenPopped_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.pop();
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
}
Loading…
Cancel
Save