Browse Source

Unified rule chain input loop guard around a single visit counter

Renamed actors.rule.chain.input_loop_max_iterations to input_loop_max_visits
(env TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS), default 1. DefaultTbContext.input()
now performs a single comparison msg.countOccurrences(...) >= Math.max(1, cap)
and emits one failure message that names the property. Values below 1 are
clamped to 1, matching DefaultActorService.poolSize.

Removed TbMsg.isAlreadyInStack and TbMsgProcessingCtx.isAlreadyInStack —
countOccurrences covers both checks.
pull/15658/head
Oleksandra Matviienko 6 days ago
parent
commit
7c199d5345
  1. 4
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 29
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 25
      application/src/main/resources/thingsboard.yml
  4. 98
      application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java
  5. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
  6. 12
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java
  7. 74
      common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

4
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -587,9 +587,9 @@ public class ActorSystemContext {
@Getter
private long ruleChainErrorPersistFrequency;
@Value("${actors.rule.chain.input_loop_max_iterations:0}")
@Value("${actors.rule.chain.input_loop_max_visits:1}")
@Getter
private int ruleChainInputLoopMaxIterations;
private int ruleChainInputLoopMaxVisits;
@Value("${actors.rule.node.error_persist_frequency:3000}")
@Getter

29
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -186,31 +186,12 @@ public class DefaultTbContext implements TbContext {
}
RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId selfId = nodeCtx.getSelf().getId();
// Stack-based revisit check (Layer 2). Layer 1 — direct A->A where the input node's
// configured target chain equals its own chain — is enforced unconditionally in
// TbRuleChainInputNode.onMsg() and is not affected by this setting.
int maxIterations = mainCtx.getRuleChainInputLoopMaxIterations();
if (maxIterations <= 0) {
if (msg.isAlreadyInStack(selfRuleChainId, selfId)) {
log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " +
"The message will be failed to prevent infinite loop. " +
"Please check the rule chain configuration for circular references. " +
"If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " +
"(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value.",
nodeCtx.getTenantId(), selfId, selfRuleChainId);
tellFailure(msg, new RuntimeException(
"Detected rule chain processing loop for rule node [" + selfId + "] " +
"in rule chain [" + selfRuleChainId + "]. " +
"Please check the rule chain configuration for circular references. " +
"If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " +
"(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value."));
return;
}
} else if (msg.countOccurrences(selfRuleChainId, selfId) >= maxIterations) {
String reason = "Rule chain input loop iteration limit " + maxIterations + " reached for rule node ["
int maxVisits = Math.max(1, mainCtx.getRuleChainInputLoopMaxVisits());
if (msg.countOccurrences(selfRuleChainId, selfId) >= maxVisits) {
String reason = "Rule chain input node visit limit " + maxVisits + " reached for rule node ["
+ selfId + "] in rule chain [" + selfRuleChainId + "]. "
+ "Check the rule chain configuration or raise TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS "
+ "(yaml: actors.rule.chain.input_loop_max_iterations) if the loop is intentional.";
+ "If the loop is intentional, raise actors.rule.chain.input_loop_max_visits "
+ "(env TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS).";
log.warn("[{}] {}", nodeCtx.getTenantId(), reason);
tellFailure(msg, new RuntimeException(reason));
return;

25
application/src/main/resources/thingsboard.yml

@ -550,24 +550,13 @@ actors:
# Errors for particular actors are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
# Maximum number of times a single rule chain input node may fire for one message.
# <= 0 (default) = strict cycle detection: any rule chain input node revisited within the
# same message is rejected immediately (introduced in PR #15102).
# > 0 = permissive mode: legacy designs that intentionally loop through the same
# input node (e.g. paginated external API calls) are allowed up to this many
# iterations; exceeding the limit fails the message with a clear error. The
# tenant profile `maxRuleNodeExecutionsPerMessage` cap remains in effect.
# Counted per individual input-node fire: for typical linear pagination
# this matches the iteration count, but chains that route the same message
# through the same input node multiple times per logical iteration (e.g.
# fan-out) consume one unit per visit.
# Permissive mode relaxes only the stack-based cycle check for indirect loops (A->B->...->A).
# A direct self-reference where a rule chain input node forwards to its own rule chain is
# always rejected by TbRuleChainInputNode regardless of this setting — there is no legitimate
# use case for it and it can pin rule-engine at 100% CPU.
# Note: 0 here means "strict", unlike `maxRuleNodeExecutionsPerMessage` (tenant profile)
# where 0 means "unlimited". Affects all tenants on this rule-engine instance — use
# carefully in multi-tenant deployments.
input_loop_max_iterations: "${TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS:0}"
# Default 1 = the node fires at most once per message — any revisit (direct A->A or
# indirect A->B->...->A) is failed immediately to prevent infinite loops.
# Raise to N > 1 to allow legacy designs that intentionally loop through the same input
# node up to N times (e.g. paginated external API calls). Values below 1 are clamped to 1.
# Direct self-references (a rule chain input node configured to forward to its own rule
# chain) are always rejected by TbRuleChainInputNode regardless of this setting.
input_loop_max_visits: "${TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS:1}"
debug_mode_rate_limits_per_tenant:
# Enable/Disable the rate limit of persisted debug events for all rule nodes per tenant
enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}"

98
application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java

@ -136,12 +136,10 @@ class DefaultTbContextTest {
}
@Test
public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() {
// GIVEN - simulate the second iteration: current node is already in the return stack,
// which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain
// is the same as the rule chain containing this node. Explicit stub of 0 fixes the
// "0 = strict" contract independent of Mockito's default for primitive int.
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(0);
public void givenDefaultCapAndCurrentNodeAlreadyInStack_whenInput_thenTellFailure() {
// GIVEN - default cap = 1: any revisit of the same (chain, node) within one message fails.
// Simulates the second visit of an input node whose first push is still in the stack.
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(1);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
@ -160,7 +158,6 @@ class DefaultTbContextTest {
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
// Push current node into the stack - simulates a previous iteration that already forwarded to this rule chain
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
@ -168,28 +165,25 @@ class DefaultTbContextTest {
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN - loop detected: tellFailure is called and no message is enqueued
// THEN - failure with the unified visit-limit message that names the property
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class);
then(chainActorMock).should().tell(tellCaptor.capture());
TbActorMsg capturedTellMsg = tellCaptor.getValue();
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class);
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE);
assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop");
// discoverability: strict-mode message must point admin to the new flag
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS");
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations");
assertThat(failureMsg.getFailureMessage()).contains("visit limit 1 reached");
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS");
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_visits");
// only the loop-detection config was read; no enqueue path was taken
then(mainCtxMock).should().getRuleChainInputLoopMaxIterations();
then(mainCtxMock).should().getRuleChainInputLoopMaxVisits();
then(mainCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenPermissiveModeAndIterationsBelowCap_whenInput_thenEnqueue() {
// GIVEN - admin opted into permissive mode by setting TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS,
// fresh message, this input node has fired 0 times so far
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(5);
public void givenVisitsBelowCap_whenInput_thenEnqueue() {
// GIVEN - admin raised cap to 5; this is the first visit of the input node for the message
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(5);
var tpi = resolve(null);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
@ -222,9 +216,9 @@ class DefaultTbContextTest {
}
@Test
public void givenPermissiveModeAndIterationsAtCap_whenInput_thenTellFailureWithIterationsMessage() {
// GIVEN - permissive mode with cap=2, this input node has already fired twice for this message
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(2);
public void givenVisitsAtCap_whenInput_thenTellFailure() {
// GIVEN - cap=2, this input node has already been pushed twice for this message
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(2);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
@ -253,27 +247,26 @@ class DefaultTbContextTest {
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN - iteration-cap failure: tellFailure with message naming the limit, ENV name and yaml key
// THEN - cap failure: tellFailure with message naming the limit, ENV name and yaml key
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class);
then(chainActorMock).should().tell(tellCaptor.capture());
TbActorMsg capturedTellMsg = tellCaptor.getValue();
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class);
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE);
assertThat(failureMsg.getFailureMessage()).contains("iteration limit 2 reached");
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS");
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations");
assertThat(failureMsg.getFailureMessage()).contains("visit limit 2 reached");
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS");
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_visits");
then(mainCtxMock).should().getRuleChainInputLoopMaxIterations();
then(mainCtxMock).should().getRuleChainInputLoopMaxVisits();
then(mainCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenPermissiveModeAndCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() {
public void givenCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() {
// GIVEN - customer's pagination case: same RuleChainInputNode visited three times already.
// In strict mode this would fail (the existing test above);
// in permissive mode with cap > occurrences it must proceed.
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(10);
// With cap > occurrences it must proceed.
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(10);
var tpi = resolve(null);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
@ -311,10 +304,47 @@ class DefaultTbContextTest {
}
@Test
public void givenPermissiveModeWithCapOneAndFreshMessage_whenInput_thenEnqueue() {
// GIVEN - smallest non-strict cap; this is the boundary where strict and permissive diverge.
// First fire of the input node must pass (count=0, 0 >= 1 is false).
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(1);
public void givenNegativeCapAndCurrentNodeAlreadyInStack_whenInput_thenTellFailure() {
// GIVEN - misconfiguration: admin set a non-positive value. Per yaml contract, values < 1
// are clamped to 1 — strict behavior is preserved instead of failing at startup.
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(-3);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN - clamped cap = 1, count = 1, fails with limit 1 in the message
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class);
then(chainActorMock).should().tell(tellCaptor.capture());
TbActorMsg capturedTellMsg = tellCaptor.getValue();
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getFailureMessage()).contains("visit limit 1 reached");
}
@Test
public void givenDefaultCapAndFreshMessage_whenInput_thenEnqueue() {
// GIVEN - default cap = 1, fresh message (count = 0): first visit must pass.
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(1);
var tpi = resolve(null);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);

4
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java

@ -253,10 +253,6 @@ public final class TbMsg implements Serializable {
ctx.push(ruleChainId, ruleNodeId);
}
public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return ctx.isAlreadyInStack(ruleChainId, ruleNodeId);
}
public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return ctx.countOccurrences(ruleChainId, ruleNodeId);
}

12
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java

@ -63,18 +63,6 @@ public final class TbMsgProcessingCtx implements Serializable {
stack.add(new TbMsgProcessingStackItem(ruleChainId, ruleNodeId));
}
public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
if (stack == null || stack.isEmpty()) {
return false;
}
for (TbMsgProcessingStackItem item : stack) {
if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) {
return true;
}
}
return false;
}
public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
if (stack == null || stack.isEmpty()) {
return 0;

74
common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

@ -28,64 +28,6 @@ class TbMsgProcessingCtxTest {
private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11"));
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6"));
@Test
void givenEmptyStack_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithDifferentEntry_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithMatchingEntry_whenIsAlreadyInStack_thenReturnTrue() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue();
}
@Test
void givenStackWithMatchingEntryAmongOthers_whenIsAlreadyInStack_thenReturnTrue() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue();
}
@Test
void givenStackWithSameChainButDifferentNode_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithSameNodeButDifferentChain_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID);
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithEntryThenPopped_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.pop();
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenEmptyStack_whenCountOccurrences_thenReturnZero() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
@ -132,4 +74,20 @@ class TbMsgProcessingCtxTest {
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero();
}
@Test
void givenStackWithSameChainButDifferentNode_whenCountOccurrences_thenReturnZero() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero();
}
@Test
void givenStackWithSameNodeButDifferentChain_whenCountOccurrences_thenReturnZero() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID);
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero();
}
}

Loading…
Cancel
Save