Browse Source

Merge pull request #15102 from smatvienko-tb/fix/rule-chain-input-loop-detection

Fixed infinite loop when rule chain input node forwards to its own rule chain
pull/15140/head
Viacheslav Klimov 3 months ago
committed by GitHub
parent
commit
d11b588eba
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 15
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  2. 43
      application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java
  3. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
  4. 12
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java
  5. 89
      common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java
  6. 7
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNode.java
  7. 70
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNodeTest.java

15
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -183,11 +183,24 @@ public class DefaultTbContext implements TbContext {
if (!msg.isValid()) {
return;
}
RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId selfId = nodeCtx.getSelf().getId();
if (msg.isAlreadyInStack(selfRuleChainId, selfId)) {
log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " +
"The message will be failed to prevent infinite loop. " +
"Please check the rule chain configuration for circular references.",
nodeCtx.getTenantId(), selfId, selfRuleChainId);
tellFailure(msg, new RuntimeException(
"Detected rule chain processing loop for rule node [" + selfId + "] " +
"in rule chain [" + selfRuleChainId + "]. " +
"Please check the rule chain configuration for circular references."));
return;
}
TbMsg tbMsg = msg.copy()
.ruleChainId(ruleChainId)
.resetRuleNodeId()
.build();
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
tbMsg.pushToStack(selfRuleChainId, selfId);
TopicPartitionInfo tpi = resolvePartition(msg);
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
}

43
application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java

@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.TbMsgProcessingStackItem;
@ -134,6 +135,48 @@ class DefaultTbContextTest {
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
}
@Test
public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() {
// GIVEN - simulate the second iteration: current node is already in the return stack,
// which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain
// is the same as the rule chain containing this node
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.off());
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
// Push current node into the stack - simulates a previous iteration that already forwarded to this rule chain
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID);
var targetRuleChainId = new RuleChainId(UUID.randomUUID());
// WHEN
defaultTbContext.input(msg, targetRuleChainId);
// THEN - loop detected: tellFailure is called and no message is enqueued
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class);
then(chainActorMock).should().tell(tellCaptor.capture());
TbActorMsg capturedTellMsg = tellCaptor.getValue();
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class);
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg;
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE);
assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop");
then(mainCtxMock).shouldHaveNoInteractions(); // no message was enqueued
}
@MethodSource
@ParameterizedTest
public void givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) {

4
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java

@ -265,6 +265,10 @@ public final class TbMsg implements Serializable {
ctx.push(ruleChainId, ruleNodeId);
}
public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return ctx.isAlreadyInStack(ruleChainId, ruleNodeId);
}
public TbMsgProcessingStackItem popFormStack() {
return ctx.pop();
}

12
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java

@ -63,6 +63,18 @@ public final class TbMsgProcessingCtx implements Serializable {
stack.add(new TbMsgProcessingStackItem(ruleChainId, ruleNodeId));
}
public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
if (stack == null || stack.isEmpty()) {
return false;
}
for (TbMsgProcessingStackItem item : stack) {
if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) {
return true;
}
}
return false;
}
public TbMsgProcessingStackItem pop() {
if (stack == null || stack.isEmpty()) {
return null;

89
common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java

@ -0,0 +1,89 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
class TbMsgProcessingCtxTest {
private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11"));
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6"));
@Test
void givenEmptyStack_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithDifferentEntry_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithMatchingEntry_whenIsAlreadyInStack_thenReturnTrue() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue();
}
@Test
void givenStackWithMatchingEntryAmongOthers_whenIsAlreadyInStack_thenReturnTrue() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue();
}
@Test
void givenStackWithSameChainButDifferentNode_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID()));
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithSameNodeButDifferentChain_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID);
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
@Test
void givenStackWithEntryThenPopped_whenIsAlreadyInStack_thenReturnFalse() {
TbMsgProcessingCtx ctx = new TbMsgProcessingCtx();
ctx.push(RULE_CHAIN_ID, RULE_NODE_ID);
ctx.pop();
assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse();
}
}

7
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNode.java

@ -76,6 +76,13 @@ public class TbRuleChainInputNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
RuleChainId targetRuleChainId = forwardMsgToDefaultRuleChain ?
getOriginatorDefaultRuleChainId(ctx, msg).orElse(ruleChainId) : ruleChainId;
if (targetRuleChainId.equals(ctx.getSelf().getRuleChainId())) {
ctx.tellFailure(msg, new RuntimeException(
"TbRuleChainInputNode in rule chain [" + targetRuleChainId +
"] is configured to forward messages back to the same rule chain it belongs to. " +
"This would cause an infinite loop. Please check the rule chain configuration."));
return;
}
ctx.input(msg, targetRuleChainId);
}

70
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNodeTest.java

@ -41,8 +41,10 @@ import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -52,8 +54,9 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -64,6 +67,8 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
private final TenantId TENANT_ID = new TenantId(UUID.fromString("4ba69ea5-6b27-42df-ab66-e7a727a67027"));
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("97731954-2147-4176-8f1a-d14f1b73e4e6"));
private final AssetId ASSET_ID = new AssetId(UUID.fromString("841a47bd-4e8e-4ea5-88e6-420da0d70e51"));
// A stable "current" rule chain ID: this node lives here. Must differ from any target used in tests.
private final RuleChainId CURRENT_RULE_CHAIN_ID = new RuleChainId(UUID.fromString("11111111-1111-1111-1111-111111111111"));
private TbRuleChainInputNode node;
private TbRuleChainInputNodeConfiguration config;
@ -149,6 +154,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(ctxMock.getDeviceProfileCache()).thenReturn(deviceProfileCacheMock);
when(deviceProfileCacheMock.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
@ -182,6 +188,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(ctxMock.getAssetProfileCache()).thenReturn(assetProfileCacheMock);
when(assetProfileCacheMock.get(any(TenantId.class), any(AssetId.class))).thenReturn(assetProfile);
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
@ -214,6 +221,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(ctxMock.getDeviceProfileCache()).thenReturn(deviceProfileCacheMock);
when(deviceProfileCacheMock.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
@ -242,6 +250,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(ctxMock.getAssetProfileCache()).thenReturn(assetProfileCacheMock);
when(assetProfileCacheMock.get(any(TenantId.class), any(AssetId.class))).thenReturn(assetProfile);
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
@ -266,6 +275,8 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
config.setForwardMsgToDefaultRuleChain(false);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
//WHEN
@ -277,6 +288,56 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
assertThat(ruleChainArgumentCaptor.getValue()).isEqualTo(ruleChainIdFromConfig);
}
@Test
public void givenForwardMsgToDefaultIsTrueAndDeviceDefaultRuleChainIsSameAsCurrent_whenOnMsg_thenTellFailureToPreventDirectLoop() throws TbNodeException {
// GIVEN - device's default rule chain == the rule chain that contains this node (self-loop)
// This is caught immediately, before any Kafka round-trip
DeviceProfile deviceProfile = new DeviceProfile();
deviceProfile.setDefaultRuleChainId(CURRENT_RULE_CHAIN_ID);
TbMsg msg = getMsg(DEVICE_ID);
String fallbackRuleChainIdStr = "acbc924f-7f95-4a9b-a854-e4822deb74c7";
config.setRuleChainId(fallbackRuleChainIdStr);
config.setForwardMsgToDefaultRuleChain(true);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(ctxMock.getDeviceProfileCache()).thenReturn(deviceProfileCacheMock);
when(deviceProfileCacheMock.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
// WHEN
node.onMsg(ctxMock, msg);
// THEN - tellFailure immediately; ctx.input() must never be called
verify(ctxMock).tellFailure(eq(msg), any(RuntimeException.class));
verify(ctxMock, never()).input(any(), any());
}
@Test
public void givenConfiguredRuleChainIsSameAsCurrent_whenOnMsg_thenTellFailureToPreventDirectLoop() throws TbNodeException {
// GIVEN - static config points to the same rule chain that contains this node
config.setRuleChainId(CURRENT_RULE_CHAIN_ID.getId().toString());
config.setForwardMsgToDefaultRuleChain(false);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
TbMsg msg = getMsg(DEVICE_ID);
when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain());
node.init(ctxMock, nodeConfiguration);
// WHEN
node.onMsg(ctxMock, msg);
// THEN - tellFailure immediately; ctx.input() must never be called
verify(ctxMock).tellFailure(eq(msg), any(RuntimeException.class));
verify(ctxMock, never()).input(any(), any());
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
//config for version 0
@ -299,6 +360,13 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest {
return node;
}
/** Creates a RuleNode that lives in {@link #CURRENT_RULE_CHAIN_ID}. */
private RuleNode ruleNodeInCurrentChain() {
RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")));
ruleNode.setRuleChainId(CURRENT_RULE_CHAIN_ID);
return ruleNode;
}
private TbMsg getMsg(EntityId entityId) {
return TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)

Loading…
Cancel
Save