From 062c577d1fd044d6ef250e3d18b95af1c084d9fe Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Feb 2026 23:54:00 +0100 Subject: [PATCH 1/2] Fix infinite loop in rule engine caused by TbRuleChainInputNode self-referencing default rule chain When a TbRuleChainInputNode has `forwardMsgToDefaultRuleChain=true` and the originator's default rule chain is the same as the rule chain containing this node, the message enters an infinite loop: the node forwards to the default rule chain, which routes back to the same node, which forwards again, causing unbounded recursion and 100% CPU on rule-engine. Fix: detect the loop in DefaultTbContext.input() by checking whether the calling rule node is already present in the message's return stack (TbMsgProcessingCtx). On the second+ iteration the stack already contains the (ruleChainId, ruleNodeId) pair of the node, so the call is a cycle. In that case tellFailure() is called with a descriptive message and a WARN log is emitted instead of re-enqueuing the message. Co-Authored-By: Claude Sonnet 4.6 --- .../actors/ruleChain/DefaultTbContext.java | 15 +++- .../actors/rule/DefaultTbContextTest.java | 43 +++++++++ .../thingsboard/server/common/msg/TbMsg.java | 4 + .../server/common/msg/TbMsgProcessingCtx.java | 12 +++ .../common/msg/TbMsgProcessingCtxTest.java | 89 +++++++++++++++++++ 5 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 511479aa73..66a15a651e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -183,11 +183,24 @@ public class DefaultTbContext implements TbContext { if (!msg.isValid()) { return; } + RuleChainId selfRuleChainId = nodeCtx.getSelf().getRuleChainId(); + RuleNodeId selfId = nodeCtx.getSelf().getId(); + if (msg.isAlreadyInStack(selfRuleChainId, selfId)) { + log.warn("[{}] Detected rule chain processing loop for rule node [{}] in rule chain [{}]. " + + "The message will be failed to prevent infinite loop. " + + "Please check the rule chain configuration for circular references.", + nodeCtx.getTenantId(), selfId, selfRuleChainId); + tellFailure(msg, new RuntimeException( + "Detected rule chain processing loop for rule node [" + selfId + "] " + + "in rule chain [" + selfRuleChainId + "]. " + + "Please check the rule chain configuration for circular references.")); + return; + } TbMsg tbMsg = msg.copy() .ruleChainId(ruleChainId) .resetRuleNodeId() .build(); - tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + tbMsg.pushToStack(selfRuleChainId, selfId); TopicPartitionInfo tpi = resolvePartition(msg); doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index 759b7ffa2d..6337067b9c 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgProcessingStackItem; @@ -134,6 +135,48 @@ class DefaultTbContextTest { then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); } + @Test + public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() { + // GIVEN - simulate the second iteration: current node is already in the return stack, + // which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain + // is the same as the rule chain containing this node + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.off()); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + // Push current node into the stack - simulates a previous iteration that already forwarded to this rule chain + msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); + + var targetRuleChainId = new RuleChainId(UUID.randomUUID()); + + // WHEN + defaultTbContext.input(msg, targetRuleChainId); + + // THEN - loop detected: tellFailure is called and no message is enqueued + ArgumentCaptor tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); + then(chainActorMock).should().tell(tellCaptor.capture()); + TbActorMsg capturedTellMsg = tellCaptor.getValue(); + assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class); + RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; + assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); + assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop"); + + then(mainCtxMock).shouldHaveNoInteractions(); // no message was enqueued + } + @MethodSource @ParameterizedTest public void givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 831d0557cc..950d62b081 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -266,6 +266,10 @@ public final class TbMsg implements Serializable { ctx.push(ruleChainId, ruleNodeId); } + public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return ctx.isAlreadyInStack(ruleChainId, ruleNodeId); + } + public TbMsgProcessingStackItem popFormStack() { return ctx.pop(); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java index b71845db31..968b5d4fc2 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java @@ -63,6 +63,18 @@ public final class TbMsgProcessingCtx implements Serializable { stack.add(new TbMsgProcessingStackItem(ruleChainId, ruleNodeId)); } + public boolean isAlreadyInStack(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + if (stack == null || stack.isEmpty()) { + return false; + } + for (TbMsgProcessingStackItem item : stack) { + if (ruleChainId.equals(item.getRuleChainId()) && ruleNodeId.equals(item.getRuleNodeId())) { + return true; + } + } + return false; + } + public TbMsgProcessingStackItem pop() { if (stack == null || stack.isEmpty()) { return null; diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java new file mode 100644 index 0000000000..65e5bda925 --- /dev/null +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/TbMsgProcessingCtxTest.java @@ -0,0 +1,89 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg; + +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +class TbMsgProcessingCtxTest { + + private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11")); + private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6")); + + @Test + void givenEmptyStack_whenIsAlreadyInStack_thenReturnFalse() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); + } + + @Test + void givenStackWithDifferentEntry_whenIsAlreadyInStack_thenReturnFalse() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); + } + + @Test + void givenStackWithMatchingEntry_whenIsAlreadyInStack_thenReturnTrue() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue(); + } + + @Test + void givenStackWithMatchingEntryAmongOthers_whenIsAlreadyInStack_thenReturnTrue() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + ctx.push(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isTrue(); + } + + @Test + void givenStackWithSameChainButDifferentNode_whenIsAlreadyInStack_thenReturnFalse() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(RULE_CHAIN_ID, new RuleNodeId(UUID.randomUUID())); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); + } + + @Test + void givenStackWithSameNodeButDifferentChain_whenIsAlreadyInStack_thenReturnFalse() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(new RuleChainId(UUID.randomUUID()), RULE_NODE_ID); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); + } + + @Test + void givenStackWithEntryThenPopped_whenIsAlreadyInStack_thenReturnFalse() { + TbMsgProcessingCtx ctx = new TbMsgProcessingCtx(); + ctx.push(RULE_CHAIN_ID, RULE_NODE_ID); + ctx.pop(); + + assertThat(ctx.isAlreadyInStack(RULE_CHAIN_ID, RULE_NODE_ID)).isFalse(); + } + +} From 6615c45cb5271a623020a55f483e2551af712bb5 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 25 Feb 2026 00:20:50 +0100 Subject: [PATCH 2/2] Add direct self-reference loop detection to TbRuleChainInputNode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stack-based check in DefaultTbContext.input() only catches a loop on the second Kafka round-trip through the same rule chain. When the target rule chain equals the node's own rule chain (direct self-loop), the first iteration is always unnecessary and should be rejected immediately before any enqueue occurs. Add an eager check in TbRuleChainInputNode.onMsg(): if the resolved targetRuleChainId equals ctx.getSelf().getRuleChainId() (the rule chain that contains this node), tellFailure() is called right away. Together with the existing stack-based check in DefaultTbContext.input() this forms a two-layer defence: - Layer 1 (TbRuleChainInputNode): direct loops A→A, caught immediately - Layer 2 (DefaultTbContext): indirect cycles A→B→A, caught on second pass Co-Authored-By: Claude Sonnet 4.6 --- .../engine/flow/TbRuleChainInputNode.java | 7 ++ .../engine/flow/TbRuleChainInputNodeTest.java | 70 ++++++++++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNode.java index ae6f69a2d4..78bae6fe75 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNode.java @@ -76,6 +76,13 @@ public class TbRuleChainInputNode implements TbNode { public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { RuleChainId targetRuleChainId = forwardMsgToDefaultRuleChain ? getOriginatorDefaultRuleChainId(ctx, msg).orElse(ruleChainId) : ruleChainId; + if (targetRuleChainId.equals(ctx.getSelf().getRuleChainId())) { + ctx.tellFailure(msg, new RuntimeException( + "TbRuleChainInputNode in rule chain [" + targetRuleChainId + + "] is configured to forward messages back to the same rule chain it belongs to. " + + "This would cause an infinite loop. Please check the rule chain configuration.")); + return; + } ctx.input(msg, targetRuleChainId); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNodeTest.java index 95028e7368..3ae26e6560 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbRuleChainInputNodeTest.java @@ -41,8 +41,10 @@ import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -52,8 +54,9 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -64,6 +67,8 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = new TenantId(UUID.fromString("4ba69ea5-6b27-42df-ab66-e7a727a67027")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("97731954-2147-4176-8f1a-d14f1b73e4e6")); private final AssetId ASSET_ID = new AssetId(UUID.fromString("841a47bd-4e8e-4ea5-88e6-420da0d70e51")); + // A stable "current" rule chain ID: this node lives here. Must differ from any target used in tests. + private final RuleChainId CURRENT_RULE_CHAIN_ID = new RuleChainId(UUID.fromString("11111111-1111-1111-1111-111111111111")); private TbRuleChainInputNode node; private TbRuleChainInputNodeConfiguration config; @@ -149,6 +154,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { when(ctxMock.getTenantId()).thenReturn(TENANT_ID); when(ctxMock.getDeviceProfileCache()).thenReturn(deviceProfileCacheMock); when(deviceProfileCacheMock.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); node.init(ctxMock, nodeConfiguration); @@ -182,6 +188,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { when(ctxMock.getTenantId()).thenReturn(TENANT_ID); when(ctxMock.getAssetProfileCache()).thenReturn(assetProfileCacheMock); when(assetProfileCacheMock.get(any(TenantId.class), any(AssetId.class))).thenReturn(assetProfile); + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); node.init(ctxMock, nodeConfiguration); @@ -214,6 +221,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { when(ctxMock.getTenantId()).thenReturn(TENANT_ID); when(ctxMock.getDeviceProfileCache()).thenReturn(deviceProfileCacheMock); when(deviceProfileCacheMock.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); node.init(ctxMock, nodeConfiguration); @@ -242,6 +250,7 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { when(ctxMock.getTenantId()).thenReturn(TENANT_ID); when(ctxMock.getAssetProfileCache()).thenReturn(assetProfileCacheMock); when(assetProfileCacheMock.get(any(TenantId.class), any(AssetId.class))).thenReturn(assetProfile); + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); node.init(ctxMock, nodeConfiguration); @@ -266,6 +275,8 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { config.setForwardMsgToDefaultRuleChain(false); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); + node.init(ctxMock, nodeConfiguration); //WHEN @@ -277,6 +288,56 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { assertThat(ruleChainArgumentCaptor.getValue()).isEqualTo(ruleChainIdFromConfig); } + @Test + public void givenForwardMsgToDefaultIsTrueAndDeviceDefaultRuleChainIsSameAsCurrent_whenOnMsg_thenTellFailureToPreventDirectLoop() throws TbNodeException { + // GIVEN - device's default rule chain == the rule chain that contains this node (self-loop) + // This is caught immediately, before any Kafka round-trip + DeviceProfile deviceProfile = new DeviceProfile(); + deviceProfile.setDefaultRuleChainId(CURRENT_RULE_CHAIN_ID); + + TbMsg msg = getMsg(DEVICE_ID); + + String fallbackRuleChainIdStr = "acbc924f-7f95-4a9b-a854-e4822deb74c7"; + config.setRuleChainId(fallbackRuleChainIdStr); + config.setForwardMsgToDefaultRuleChain(true); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + when(ctxMock.getDeviceProfileCache()).thenReturn(deviceProfileCacheMock); + when(deviceProfileCacheMock.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); + + node.init(ctxMock, nodeConfiguration); + + // WHEN + node.onMsg(ctxMock, msg); + + // THEN - tellFailure immediately; ctx.input() must never be called + verify(ctxMock).tellFailure(eq(msg), any(RuntimeException.class)); + verify(ctxMock, never()).input(any(), any()); + } + + @Test + public void givenConfiguredRuleChainIsSameAsCurrent_whenOnMsg_thenTellFailureToPreventDirectLoop() throws TbNodeException { + // GIVEN - static config points to the same rule chain that contains this node + config.setRuleChainId(CURRENT_RULE_CHAIN_ID.getId().toString()); + config.setForwardMsgToDefaultRuleChain(false); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + TbMsg msg = getMsg(DEVICE_ID); + + when(ctxMock.getSelf()).thenReturn(ruleNodeInCurrentChain()); + + node.init(ctxMock, nodeConfiguration); + + // WHEN + node.onMsg(ctxMock, msg); + + // THEN - tellFailure immediately; ctx.input() must never be called + verify(ctxMock).tellFailure(eq(msg), any(RuntimeException.class)); + verify(ctxMock, never()).input(any(), any()); + } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of( //config for version 0 @@ -299,6 +360,13 @@ public class TbRuleChainInputNodeTest extends AbstractRuleNodeUpgradeTest { return node; } + /** Creates a RuleNode that lives in {@link #CURRENT_RULE_CHAIN_ID}. */ + private RuleNode ruleNodeInCurrentChain() { + RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))); + ruleNode.setRuleChainId(CURRENT_RULE_CHAIN_ID); + return ruleNode; + } + private TbMsg getMsg(EntityId entityId) { return TbMsg.newMsg() .type(TbMsgType.POST_TELEMETRY_REQUEST)