From a6210d0911121f0804f24d5a80ffad423c8fa9fd Mon Sep 17 00:00:00 2001 From: Oleksandra Matviienko Date: Tue, 19 May 2026 14:05:35 +0200 Subject: [PATCH] Added configurable iteration limit for rule chain input nodes New property actors.rule.chain.input_loop_max_iterations (env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS), default 0: * <= 0 keeps strict cycle detection: any rule chain input node revisited within one message is rejected immediately. * > 0 allows that input node to fire up to N times per message; exceeding the limit fails the message with a clear error. Direct self-references (configured target chain == own chain) remain blocked unconditionally by TbRuleChainInputNode regardless of this setting. Strict-mode error message now points the admin to the new flag. --- .../server/actors/ActorSystemContext.java | 4 + .../actors/ruleChain/DefaultTbContext.java | 36 +++- .../src/main/resources/thingsboard.yml | 19 ++ .../actors/rule/DefaultTbContextTest.java | 173 +++++++++++++++++- .../thingsboard/server/common/msg/TbMsg.java | 4 + .../server/common/msg/TbMsgProcessingCtx.java | 13 ++ .../common/msg/TbMsgProcessingCtxTest.java | 46 +++++ 7 files changed, 284 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 925b60de76..18b9b02b25 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -587,6 +587,10 @@ public class ActorSystemContext { @Getter private long ruleChainErrorPersistFrequency; + @Value("${actors.rule.chain.input_loop_max_iterations:0}") + @Getter + private int ruleChainInputLoopMaxIterations; + @Value("${actors.rule.node.error_persist_frequency:3000}") @Getter private long ruleNodeErrorPersistFrequency; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3a572503d6..3cbb2d126f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -186,15 +186,33 @@ public class DefaultTbContext implements TbContext { } RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId(); RuleNodeId selfId = nodeCtx.getSelf().getId(); - if (msg.isAlreadyInStack(selfRuleChainId, selfId)) { - log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " + - "The message will be failed to prevent infinite loop. " + - "Please check the rule chain configuration for circular references.", - nodeCtx.getTenantId(), selfId, selfRuleChainId); - tellFailure(msg, new RuntimeException( - "Detected rule chain processing loop for rule node [" + selfId + "] " + - "in rule chain [" + selfRuleChainId + "]. " + - "Please check the rule chain configuration for circular references.")); + // Stack-based revisit check (Layer 2). Layer 1 — direct A->A where the input node's + // configured target chain equals its own chain — is enforced unconditionally in + // TbRuleChainInputNode.onMsg() and is not affected by this setting. + int maxIterations = mainCtx.getRuleChainInputLoopMaxIterations(); + if (maxIterations <= 0) { + if (msg.isAlreadyInStack(selfRuleChainId, selfId)) { + log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " + + "The message will be failed to prevent infinite loop. " + + "Please check the rule chain configuration for circular references. " + + "If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " + + "(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value.", + nodeCtx.getTenantId(), selfId, selfRuleChainId); + tellFailure(msg, new RuntimeException( + "Detected rule chain processing loop for rule node [" + selfId + "] " + + "in rule chain [" + selfRuleChainId + "]. " + + "Please check the rule chain configuration for circular references. " + + "If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " + + "(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value.")); + return; + } + } else if (msg.countOccurrences(selfRuleChainId, selfId) >= maxIterations) { + String reason = "Rule chain input loop iteration limit " + maxIterations + " reached for rule node [" + + selfId + "] in rule chain [" + selfRuleChainId + "]. " + + "Check the rule chain configuration or raise TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS " + + "(yaml: actors.rule.chain.input_loop_max_iterations) if the loop is intentional."; + log.warn("[{}] {}", nodeCtx.getTenantId(), reason); + tellFailure(msg, new RuntimeException(reason)); return; } TbMsg tbMsg = msg.copy() diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2c24caa1f2..9ea63ec284 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -549,6 +549,25 @@ actors: chain: # Errors for particular actors are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}" + # Maximum number of times a single rule chain input node may fire for one message. + # <= 0 (default) = strict cycle detection: any rule chain input node revisited within the + # same message is rejected immediately (introduced in PR #15102). + # > 0 = permissive mode: legacy designs that intentionally loop through the same + # input node (e.g. paginated external API calls) are allowed up to this many + # iterations; exceeding the limit fails the message with a clear error. The + # tenant profile `maxRuleNodeExecutionsPerMessage` cap remains in effect. + # Counted per individual input-node fire: for typical linear pagination + # this matches the iteration count, but chains that route the same message + # through the same input node multiple times per logical iteration (e.g. + # fan-out) consume one unit per visit. + # Permissive mode relaxes only the stack-based cycle check for indirect loops (A->B->...->A). + # A direct self-reference where a rule chain input node forwards to its own rule chain is + # always rejected by TbRuleChainInputNode regardless of this setting — there is no legitimate + # use case for it and it can pin rule-engine at 100% CPU. + # Note: 0 here means "strict", unlike `maxRuleNodeExecutionsPerMessage` (tenant profile) + # where 0 means "unlimited". Affects all tenants on this rule-engine instance — use + # carefully in multi-tenant deployments. + input_loop_max_iterations: "${TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS:0}" debug_mode_rate_limits_per_tenant: # Enable/Disable the rate limit of persisted debug events for all rule nodes per tenant enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index 6337067b9c..af5d4c3b3b 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -139,7 +139,10 @@ class DefaultTbContextTest { public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() { // GIVEN - simulate the second iteration: current node is already in the return stack, // which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain - // is the same as the rule chain containing this node + // is the same as the rule chain containing this node. Explicit stub of 0 fixes the + // "0 = strict" contract independent of Mockito's default for primitive int. + given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(0); + var callbackMock = mock(TbMsgCallback.class); given(callbackMock.isMsgValid()).willReturn(true); @@ -173,8 +176,174 @@ class DefaultTbContextTest { RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop"); + // discoverability: strict-mode message must point admin to the new flag + assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS"); + assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations"); + + // only the loop-detection config was read; no enqueue path was taken + then(mainCtxMock).should().getRuleChainInputLoopMaxIterations(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenPermissiveModeAndIterationsBelowCap_whenInput_thenEnqueue() { + // GIVEN - admin opted into permissive mode by setting TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS, + // fresh message, this input node has fired 0 times so far + given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(5); + var tpi = resolve(null); + given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); + var clusterService = mock(TbClusterService.class); + given(mainCtxMock.getClusterService()).willReturn(clusterService); + + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.off()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + var targetRuleChainId = new RuleChainId(UUID.randomUUID()); + + // WHEN + defaultTbContext.input(msg, targetRuleChainId); + + // THEN + then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); + then(chainActorMock).shouldHaveNoInteractions(); + } - then(mainCtxMock).shouldHaveNoInteractions(); // no message was enqueued + @Test + public void givenPermissiveModeAndIterationsAtCap_whenInput_thenTellFailureWithIterationsMessage() { + // GIVEN - permissive mode with cap=2, this input node has already fired twice for this message + given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(2); + + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.off()); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + // two prior visits of THIS node + an unrelated entry that must NOT contribute to the count + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + msg.pushToStack(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + + var targetRuleChainId = new RuleChainId(UUID.randomUUID()); + + // WHEN + defaultTbContext.input(msg, targetRuleChainId); + + // THEN - iteration-cap failure: tellFailure with message naming the limit, ENV name and yaml key + ArgumentCaptor tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); + then(chainActorMock).should().tell(tellCaptor.capture()); + TbActorMsg capturedTellMsg = tellCaptor.getValue(); + assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class); + RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; + assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); + assertThat(failureMsg.getFailureMessage()).contains("iteration limit 2 reached"); + assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS"); + assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations"); + + then(mainCtxMock).should().getRuleChainInputLoopMaxIterations(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenPermissiveModeAndCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() { + // GIVEN - customer's pagination case: same RuleChainInputNode visited three times already. + // In strict mode this would fail (the existing test above); + // in permissive mode with cap > occurrences it must proceed. + given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(10); + var tpi = resolve(null); + given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); + var clusterService = mock(TbClusterService.class); + given(mainCtxMock.getClusterService()).willReturn(clusterService); + + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.off()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + // three prior iterations through the same input node — count 3 < cap 10 + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + + var targetRuleChainId = new RuleChainId(UUID.randomUUID()); + + // WHEN + defaultTbContext.input(msg, targetRuleChainId); + + // THEN + then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); + then(chainActorMock).shouldHaveNoInteractions(); + } + + @Test + public void givenPermissiveModeWithCapOneAndFreshMessage_whenInput_thenEnqueue() { + // GIVEN - smallest non-strict cap; this is the boundary where strict and permissive diverge. + // First fire of the input node must pass (count=0, 0 >= 1 is false). + given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(1); + var tpi = resolve(null); + given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); + var clusterService = mock(TbClusterService.class); + given(mainCtxMock.getClusterService()).willReturn(clusterService); + + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.off()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + var targetRuleChainId = new RuleChainId(UUID.randomUUID()); + + // WHEN + defaultTbContext.input(msg, targetRuleChainId); + + // THEN + then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); + then(chainActorMock).shouldHaveNoInteractions(); } @MethodSource diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 79059a9a56..b5de0093f6 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -257,6 +257,10 @@ public final class TbMsg implements Serializable { return ctx.isAlreadyInStack(ruleChainId, ruleNodeId); } + public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return ctx.countOccurrences(ruleChainId, ruleNodeId); + } + public TbMsgProcessingStackItem popFormStack() { return ctx.pop(); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java index 968b5d4fc2..d84345492d 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java @@ -75,6 +75,19 @@ public final class TbMsgProcessingCtx implements Serializable { return false; } + public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + if (stack == null || stack.isEmpty()) { + return 0; + } + int count = 0; + for (TbMsgProcessingStackItem item : stack) { + if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) { + count++; + } + } + return count; + } + public TbMsgProcessingStackItem pop() { if (stack == null || stack.isEmpty()) { return null; diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java index 65e5bda925..8fb0da02e2 100644 --- a/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java @@ -86,4 +86,50 @@ class TbMsgProcessingCtxTest { assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); } + @Test + void givenEmptyStack_whenCountOccurrences_thenReturnZero() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero(); + } + + @Test + void givenStackWithoutMatch_whenCountOccurrences_thenReturnZero() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero(); + } + + @Test + void givenStackWithSingleMatch_whenCountOccurrences_thenReturnOne() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isEqualTo(1); + } + + @Test + void givenStackWithThreeMatches_whenCountOccurrences_thenReturnThree() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isEqualTo(3); + } + + @Test + void givenStackWithMatchThenPopped_whenCountOccurrences_thenReturnZero() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + ctx.pop(); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero(); + } + }