diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index dc7f2ee07f..4e83b1887b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -177,7 +177,7 @@ public class TbMsgDeduplicationNode implements TbNode { } if (resultMsg != null) { deduplicationResults.add(TbMsg.newMsg( - queueName, + queueName != null ? queueName : resultMsg.getQueueName(), resultMsg.getType(), resultMsg.getOriginator(), resultMsg.getCustomerId(), diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java index 2e188bf002..f0ba55e03b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java @@ -50,4 +50,4 @@ public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest { protected TbNode getTestNode() { return spy(TbMsgGeneratorNode.class); } -} \ No newline at end of file +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java index edb7a7c3b1..c7d50528d5 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java @@ -52,4 +52,4 @@ public class TbCheckpointNodeTest extends AbstractRuleNodeUpgradeTest { protected TbNode getTestNode() { return spy(TbCheckpointNode.class); } -} \ No newline at end of file +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index b5bbcdc0fe..89eda98090 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -22,7 +22,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; @@ -143,13 +145,24 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { node.destroy(); } - @Test - public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { + private static Stream given_100_messages_strategy_first_then_verifyOutput() { + return Stream.of( + Arguments.of((String) null), + Arguments.of(DataConstants.MAIN_QUEUE_NAME), + Arguments.of(DataConstants.HP_QUEUE_NAME) + ); + } + + @ParameterizedTest + @MethodSource + public void given_100_messages_strategy_first_then_verifyOutput(String queueName) throws TbNodeException, ExecutionException, InterruptedException { int wantedNumberOfTellSelfInvocation = 2; int msgCount = 100; awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation); + when(ctx.getQueueName()).thenReturn(queueName); + config.setInterval(deduplicationInterval); config.setMaxPendingMsgs(msgCount); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -186,6 +199,12 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(firstMsg.getData(), actualMsg.getData()); Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(firstMsg.getType(), actualMsg.getType()); + + if (queueName == null) { + Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName()); + } else { + Assertions.assertEquals(ctx.getQueueName(), actualMsg.getQueueName()); + } } @Test