diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 18b9b02b25..df0e5d864d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -587,9 +587,9 @@ public class ActorSystemContext { @Getter private long ruleChainErrorPersistFrequency; - @Value("${actors.rule.chain.input_loop_max_iterations:0}") + @Value("${actors.rule.chain.input_loop_max_visits:1}") @Getter - private int ruleChainInputLoopMaxIterations; + private int ruleChainInputLoopMaxVisits; @Value("${actors.rule.node.error_persist_frequency:3000}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3cbb2d126f..c70fd0f975 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -186,31 +186,12 @@ public class DefaultTbContext implements TbContext { } RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId(); RuleNodeId selfId = nodeCtx.getSelf().getId(); - // Stack-based revisit check (Layer 2). Layer 1 — direct A->A where the input node's - // configured target chain equals its own chain — is enforced unconditionally in - // TbRuleChainInputNode.onMsg() and is not affected by this setting. - int maxIterations = mainCtx.getRuleChainInputLoopMaxIterations(); - if (maxIterations <= 0) { - if (msg.isAlreadyInStack(selfRuleChainId, selfId)) { - log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " + - "The message will be failed to prevent infinite loop. " + - "Please check the rule chain configuration for circular references. " + - "If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " + - "(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value.", - nodeCtx.getTenantId(), selfId, selfRuleChainId); - tellFailure(msg, new RuntimeException( - "Detected rule chain processing loop for rule node [" + selfId + "] " + - "in rule chain [" + selfRuleChainId + "]. " + - "Please check the rule chain configuration for circular references. " + - "If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " + - "(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value.")); - return; - } - } else if (msg.countOccurrences(selfRuleChainId, selfId) >= maxIterations) { - String reason = "Rule chain input loop iteration limit " + maxIterations + " reached for rule node [" + int maxVisits = Math.max(1, mainCtx.getRuleChainInputLoopMaxVisits()); + if (msg.countOccurrences(selfRuleChainId, selfId) >= maxVisits) { + String reason = "Rule chain input node visit limit " + maxVisits + " reached for rule node [" + selfId + "] in rule chain [" + selfRuleChainId + "]. " - + "Check the rule chain configuration or raise TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS " - + "(yaml: actors.rule.chain.input_loop_max_iterations) if the loop is intentional."; + + "If the loop is intentional, raise actors.rule.chain.input_loop_max_visits " + + "(env TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS)."; log.warn("[{}] {}", nodeCtx.getTenantId(), reason); tellFailure(msg, new RuntimeException(reason)); return; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9ea63ec284..9b43c68ced 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -550,24 +550,13 @@ actors: # Errors for particular actors are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}" # Maximum number of times a single rule chain input node may fire for one message. - # <= 0 (default) = strict cycle detection: any rule chain input node revisited within the - # same message is rejected immediately (introduced in PR #15102). - # > 0 = permissive mode: legacy designs that intentionally loop through the same - # input node (e.g. paginated external API calls) are allowed up to this many - # iterations; exceeding the limit fails the message with a clear error. The - # tenant profile `maxRuleNodeExecutionsPerMessage` cap remains in effect. - # Counted per individual input-node fire: for typical linear pagination - # this matches the iteration count, but chains that route the same message - # through the same input node multiple times per logical iteration (e.g. - # fan-out) consume one unit per visit. - # Permissive mode relaxes only the stack-based cycle check for indirect loops (A->B->...->A). - # A direct self-reference where a rule chain input node forwards to its own rule chain is - # always rejected by TbRuleChainInputNode regardless of this setting — there is no legitimate - # use case for it and it can pin rule-engine at 100% CPU. - # Note: 0 here means "strict", unlike `maxRuleNodeExecutionsPerMessage` (tenant profile) - # where 0 means "unlimited". Affects all tenants on this rule-engine instance — use - # carefully in multi-tenant deployments. - input_loop_max_iterations: "${TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS:0}" + # Default 1 = the node fires at most once per message — any revisit (direct A->A or + # indirect A->B->...->A) is failed immediately to prevent infinite loops. + # Raise to N > 1 to allow legacy designs that intentionally loop through the same input + # node up to N times (e.g. paginated external API calls). Values below 1 are clamped to 1. + # Direct self-references (a rule chain input node configured to forward to its own rule + # chain) are always rejected by TbRuleChainInputNode regardless of this setting. + input_loop_max_visits: "${TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS:1}" debug_mode_rate_limits_per_tenant: # Enable/Disable the rate limit of persisted debug events for all rule nodes per tenant enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index af5d4c3b3b..834c6c3d66 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -136,12 +136,10 @@ class DefaultTbContextTest { } @Test - public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() { - // GIVEN - simulate the second iteration: current node is already in the return stack, - // which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain - // is the same as the rule chain containing this node. Explicit stub of 0 fixes the - // "0 = strict" contract independent of Mockito's default for primitive int. - given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(0); + public void givenDefaultCapAndCurrentNodeAlreadyInStack_whenInput_thenTellFailure() { + // GIVEN - default cap = 1: any revisit of the same (chain, node) within one message fails. + // Simulates the second visit of an input node whose first push is still in the stack. + given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(1); var callbackMock = mock(TbMsgCallback.class); given(callbackMock.isMsgValid()).willReturn(true); @@ -160,7 +158,6 @@ class DefaultTbContextTest { .data(TbMsg.EMPTY_STRING) .callback(callbackMock) .build(); - // Push current node into the stack - simulates a previous iteration that already forwarded to this rule chain msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); var targetRuleChainId = new RuleChainId(UUID.randomUUID()); @@ -168,28 +165,25 @@ class DefaultTbContextTest { // WHEN defaultTbContext.input(msg, targetRuleChainId); - // THEN - loop detected: tellFailure is called and no message is enqueued + // THEN - failure with the unified visit-limit message that names the property ArgumentCaptor tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); then(chainActorMock).should().tell(tellCaptor.capture()); TbActorMsg capturedTellMsg = tellCaptor.getValue(); assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class); RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); - assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop"); - // discoverability: strict-mode message must point admin to the new flag - assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS"); - assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations"); + assertThat(failureMsg.getFailureMessage()).contains("visit limit 1 reached"); + assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS"); + assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_visits"); - // only the loop-detection config was read; no enqueue path was taken - then(mainCtxMock).should().getRuleChainInputLoopMaxIterations(); + then(mainCtxMock).should().getRuleChainInputLoopMaxVisits(); then(mainCtxMock).shouldHaveNoMoreInteractions(); } @Test - public void givenPermissiveModeAndIterationsBelowCap_whenInput_thenEnqueue() { - // GIVEN - admin opted into permissive mode by setting TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS, - // fresh message, this input node has fired 0 times so far - given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(5); + public void givenVisitsBelowCap_whenInput_thenEnqueue() { + // GIVEN - admin raised cap to 5; this is the first visit of the input node for the message + given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(5); var tpi = resolve(null); given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); var clusterService = mock(TbClusterService.class); @@ -222,9 +216,9 @@ class DefaultTbContextTest { } @Test - public void givenPermissiveModeAndIterationsAtCap_whenInput_thenTellFailureWithIterationsMessage() { - // GIVEN - permissive mode with cap=2, this input node has already fired twice for this message - given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(2); + public void givenVisitsAtCap_whenInput_thenTellFailure() { + // GIVEN - cap=2, this input node has already been pushed twice for this message + given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(2); var callbackMock = mock(TbMsgCallback.class); given(callbackMock.isMsgValid()).willReturn(true); @@ -253,27 +247,26 @@ class DefaultTbContextTest { // WHEN defaultTbContext.input(msg, targetRuleChainId); - // THEN - iteration-cap failure: tellFailure with message naming the limit, ENV name and yaml key + // THEN - cap failure: tellFailure with message naming the limit, ENV name and yaml key ArgumentCaptor tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); then(chainActorMock).should().tell(tellCaptor.capture()); TbActorMsg capturedTellMsg = tellCaptor.getValue(); assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class); RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); - assertThat(failureMsg.getFailureMessage()).contains("iteration limit 2 reached"); - assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS"); - assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations"); + assertThat(failureMsg.getFailureMessage()).contains("visit limit 2 reached"); + assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS"); + assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_visits"); - then(mainCtxMock).should().getRuleChainInputLoopMaxIterations(); + then(mainCtxMock).should().getRuleChainInputLoopMaxVisits(); then(mainCtxMock).shouldHaveNoMoreInteractions(); } @Test - public void givenPermissiveModeAndCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() { + public void givenCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() { // GIVEN - customer's pagination case: same RuleChainInputNode visited three times already. - // In strict mode this would fail (the existing test above); - // in permissive mode with cap > occurrences it must proceed. - given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(10); + // With cap > occurrences it must proceed. + given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(10); var tpi = resolve(null); given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); var clusterService = mock(TbClusterService.class); @@ -311,10 +304,47 @@ class DefaultTbContextTest { } @Test - public void givenPermissiveModeWithCapOneAndFreshMessage_whenInput_thenEnqueue() { - // GIVEN - smallest non-strict cap; this is the boundary where strict and permissive diverge. - // First fire of the input node must pass (count=0, 0 >= 1 is false). - given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(1); + public void givenNegativeCapAndCurrentNodeAlreadyInStack_whenInput_thenTellFailure() { + // GIVEN - misconfiguration: admin set a non-positive value. Per yaml contract, values < 1 + // are clamped to 1 — strict behavior is preserved instead of failing at startup. + given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(-3); + + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.off()); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + + var targetRuleChainId = new RuleChainId(UUID.randomUUID()); + + // WHEN + defaultTbContext.input(msg, targetRuleChainId); + + // THEN - clamped cap = 1, count = 1, fails with limit 1 in the message + ArgumentCaptor tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); + then(chainActorMock).should().tell(tellCaptor.capture()); + TbActorMsg capturedTellMsg = tellCaptor.getValue(); + RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; + assertThat(failureMsg.getFailureMessage()).contains("visit limit 1 reached"); + } + + @Test + public void givenDefaultCapAndFreshMessage_whenInput_thenEnqueue() { + // GIVEN - default cap = 1, fresh message (count = 0): first visit must pass. + given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(1); var tpi = resolve(null); given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); var clusterService = mock(TbClusterService.class); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index b5de0093f6..db31723a77 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -253,10 +253,6 @@ public final class TbMsg implements Serializable { ctx.push(ruleChainId, ruleNodeId); } - public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return ctx.isAlreadyInStack(ruleChainId, ruleNodeId); - } - public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { return ctx.countOccurrences(ruleChainId, ruleNodeId); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java index d84345492d..8b8793dbd1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java @@ -63,18 +63,6 @@ public final class TbMsgProcessingCtx implements Serializable { stack.add(new TbMsgProcessingStackItem(ruleChainId, ruleNodeId)); } - public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - if (stack == null || stack.isEmpty()) { - return false; - } - for (TbMsgProcessingStackItem item : stack) { - if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) { - return true; - } - } - return false; - } - public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { if (stack == null || stack.isEmpty()) { return 0; diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java index 8fb0da02e2..79919c6d74 100644 --- a/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java @@ -28,64 +28,6 @@ class TbMsgProcessingCtxTest { private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11")); private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6")); - @Test - void givenEmptyStack_whenIsAlreadyInStack_thenReturnFalse() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); - } - - @Test - void givenStackWithDifferentEntry_whenIsAlreadyInStack_thenReturnFalse() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); - } - - @Test - void givenStackWithMatchingEntry_whenIsAlreadyInStack_thenReturnTrue() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue(); - } - - @Test - void givenStackWithMatchingEntryAmongOthers_whenIsAlreadyInStack_thenReturnTrue() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); - ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); - ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue(); - } - - @Test - void givenStackWithSameChainButDifferentNode_whenIsAlreadyInStack_thenReturnFalse() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID())); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); - } - - @Test - void givenStackWithSameNodeButDifferentChain_whenIsAlreadyInStack_thenReturnFalse() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); - } - - @Test - void givenStackWithEntryThenPopped_whenIsAlreadyInStack_thenReturnFalse() { - TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); - ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); - ctx.pop(); - - assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); - } - @Test void givenEmptyStack_whenCountOccurrences_thenReturnZero() { TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); @@ -132,4 +74,20 @@ class TbMsgProcessingCtxTest { assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero(); } + @Test + void givenStackWithSameChainButDifferentNode_whenCountOccurrences_thenReturnZero() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID())); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero(); + } + + @Test + void givenStackWithSameNodeButDifferentChain_whenCountOccurrences_thenReturnZero() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID); + + assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero(); + } + }