Browse Source

Added configurable iteration limit for rule chain input nodes

New property actors.rule.chain.input_loop_max_iterations
(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS), default 0:

* <= 0 keeps strict cycle detection: any rule chain input node revisited
  within one message is rejected immediately.
* > 0 allows that input node to fire up to N times per message; exceeding
  the limit fails the message with a clear error.

Direct self-references (configured target chain == own chain) remain
blocked unconditionally by TbRuleChainInputNode regardless of this
setting. Strict-mode error message now points the admin to the new flag.
pull/15658/head
Oleksandra Matviienko 2 weeks ago
parent
commit
a6210d0911
  1. 4
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 36
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 19
      application/src/main/resources/thingsboard.yml
  4. 173
      application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java
  5. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
  6. 13
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java
  7. 46
      common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

4
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -587,6 +587,10 @@ public class ActorSystemContext {
@Getter
private long ruleChainErrorPersistFrequency;
@Value("${actors.rule.chain.input_loop_max_iterations:0}")
@Getter
private int ruleChainInputLoopMaxIterations;
@Value("${actors.rule.node.error_persist_frequency:3000}")
@Getter
private long ruleNodeErrorPersistFrequency;

36
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -186,15 +186,33 @@ public class DefaultTbContext implements TbContext {
}
RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId selfId = nodeCtx.getSelf().getId();
if (msg.isAlreadyInStack(selfRuleChainId, selfId)) {
log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " +
"The message will be failed to prevent infinite loop. " +
"Please check the rule chain configuration for circular references.",
nodeCtx.getTenantId(), selfId, selfRuleChainId);
tellFailure(msg, new RuntimeException(
"Detected rule chain processing loop for rule node [" + selfId + "] " +
"in rule chain [" + selfRuleChainId + "]. " +
"Please check the rule chain configuration for circular references."));
// Stack-based revisit check (Layer 2). Layer 1 — direct A->A where the input node's
// configured target chain equals its own chain — is enforced unconditionally in
// TbRuleChainInputNode.onMsg() and is not affected by this setting.
int maxIterations = mainCtx.getRuleChainInputLoopMaxIterations();
if (maxIterations <= 0) {
if (msg.isAlreadyInStack(selfRuleChainId, selfId)) {
log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " +
"The message will be failed to prevent infinite loop. " +
"Please check the rule chain configuration for circular references. " +
"If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " +
"(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value.",
nodeCtx.getTenantId(), selfId, selfRuleChainId);
tellFailure(msg, new RuntimeException(
"Detected rule chain processing loop for rule node [" + selfId + "] " +
"in rule chain [" + selfRuleChainId + "]. " +
"Please check the rule chain configuration for circular references. " +
"If the loop is intentional, configure actors.rule.chain.input_loop_max_iterations " +
"(env TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS) to a positive value."));
return;
}
} else if (msg.countOccurrences(selfRuleChainId, selfId) >= maxIterations) {
String reason = "Rule chain input loop iteration limit " + maxIterations + " reached for rule node ["
+ selfId + "] in rule chain [" + selfRuleChainId + "]. "
+ "Check the rule chain configuration or raise TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS "
+ "(yaml: actors.rule.chain.input_loop_max_iterations) if the loop is intentional.";
log.warn("[{}] {}", nodeCtx.getTenantId(), reason);
tellFailure(msg, new RuntimeException(reason));
return;
}
TbMsg tbMsg = msg.copy()

19
application/src/main/resources/thingsboard.yml

@ -549,6 +549,25 @@ actors:
chain:
# Errors for particular actors are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
# Maximum number of times a single rule chain input node may fire for one message.
# <= 0 (default) = strict cycle detection: any rule chain input node revisited within the
# same message is rejected immediately (introduced in PR #15102).
# > 0 = permissive mode: legacy designs that intentionally loop through the same
# input node (e.g. paginated external API calls) are allowed up to this many
# iterations; exceeding the limit fails the message with a clear error. The
# tenant profile `maxRuleNodeExecutionsPerMessage` cap remains in effect.
# Counted per individual input-node fire: for typical linear pagination
# this matches the iteration count, but chains that route the same message
# through the same input node multiple times per logical iteration (e.g.
# fan-out) consume one unit per visit.
# Permissive mode relaxes only the stack-based cycle check for indirect loops (A->B->...->A).
# A direct self-reference where a rule chain input node forwards to its own rule chain is
# always rejected by TbRuleChainInputNode regardless of this setting — there is no legitimate
# use case for it and it can pin rule-engine at 100% CPU.
# Note: 0 here means "strict", unlike `maxRuleNodeExecutionsPerMessage` (tenant profile)
# where 0 means "unlimited". Affects all tenants on this rule-engine instance — use
# carefully in multi-tenant deployments.
input_loop_max_iterations: "${TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS:0}"
debug_mode_rate_limits_per_tenant:
# Enable/Disable the rate limit of persisted debug events for all rule nodes per tenant
enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}"

173
application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java

@ -139,7 +139,10 @@ class DefaultTbContextTest {
public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() {
// GIVEN - simulate the second iteration: current node is already in the return stack,
// which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain
// is the same as the rule chain containing this node
// is the same as the rule chain containing this node. Explicit stub of 0 fixes the
// "0 = strict" contract independent of Mockito's default for primitive int.
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(0);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
@ -173,8 +176,174 @@ class DefaultTbContextTest {
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE);
assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop");
// discoverability: strict-mode message must point admin to the new flag
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS");
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations");
// only the loop-detection config was read; no enqueue path was taken
then(mainCtxMock).should().getRuleChainInputLoopMaxIterations();
then(mainCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenPermissiveModeAndIterationsBelowCap_whenInput_thenEnqueue() {
// GIVEN - admin opted into permissive mode by setting TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS,
// fresh message, this input node has fired 0 times so far
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(5);
var tpi = resolve(null);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
given(mainCtxMock.getClusterService()).willReturn(clusterService);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
then(chainActorMock).shouldHaveNoInteractions();
}
then(mainCtxMock).shouldHaveNoInteractions(); // no message was enqueued
@Test
public void givenPermissiveModeAndIterationsAtCap_whenInput_thenTellFailureWithIterationsMessage() {
// GIVEN - permissive mode with cap=2, this input node has already fired twice for this message
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(2);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
// two prior visits of THIS node + an unrelated entry that must NOT contribute to the count
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
msg.pushToStack(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN - iteration-cap failure: tellFailure with message naming the limit, ENV name and yaml key
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class);
then(chainActorMock).should().tell(tellCaptor.capture());
TbActorMsg capturedTellMsg = tellCaptor.getValue();
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class);
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE);
assertThat(failureMsg.getFailureMessage()).contains("iteration limit 2 reached");
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_ITERATIONS");
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_iterations");
then(mainCtxMock).should().getRuleChainInputLoopMaxIterations();
then(mainCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenPermissiveModeAndCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() {
// GIVEN - customer's pagination case: same RuleChainInputNode visited three times already.
// In strict mode this would fail (the existing test above);
// in permissive mode with cap > occurrences it must proceed.
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(10);
var tpi = resolve(null);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
given(mainCtxMock.getClusterService()).willReturn(clusterService);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
// three prior iterations through the same input node — count 3 < cap 10
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
then(chainActorMock).shouldHaveNoInteractions();
}
@Test
public void givenPermissiveModeWithCapOneAndFreshMessage_whenInput_thenEnqueue() {
// GIVEN - smallest non-strict cap; this is the boundary where strict and permissive diverge.
// First fire of the input node must pass (count=0, 0 >= 1 is false).
given(mainCtxMock.getRuleChainInputLoopMaxIterations()).willReturn(1);
var tpi = resolve(null);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
given(mainCtxMock.getClusterService()).willReturn(clusterService);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
then(chainActorMock).shouldHaveNoInteractions();
}
@MethodSource

4
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java

@ -257,6 +257,10 @@ public final class TbMsg implements Serializable {
return ctx.isAlreadyInStack(ruleChainId, ruleNodeId);
}
public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return ctx.countOccurrences(ruleChainId, ruleNodeId);
}
public TbMsgProcessingStackItem popFormStack() {
return ctx.pop();
}

13
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java

@ -75,6 +75,19 @@ public final class TbMsgProcessingCtx implements Serializable {
return false;
}
public int countOccurrences(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
if (stack == null || stack.isEmpty()) {
return 0;
}
int count = 0;
for (TbMsgProcessingStackItem item : stack) {
if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) {
count++;
}
}
return count;
}
public TbMsgProcessingStackItem pop() {
if (stack == null || stack.isEmpty()) {
return null;

46
common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

@ -86,4 +86,50 @@ class TbMsgProcessingCtxTest {
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenEmptyStack_whenCountOccurrences_thenReturnZero() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero();
}
@Test
void givenStackWithoutMatch_whenCountOccurrences_thenReturnZero() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero();
}
@Test
void givenStackWithSingleMatch_whenCountOccurrences_thenReturnOne() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isEqualTo(1);
}
@Test
void givenStackWithThreeMatches_whenCountOccurrences_thenReturnThree() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isEqualTo(3);
}
@Test
void givenStackWithMatchThenPopped_whenCountOccurrences_thenReturnZero() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.pop();
assertThat(ctx.countOccurrences(RULE_CHAIN_ID, RULE_NODE_ID)).isZero();
}
}

Loading…
Cancel
Save