diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index d2de12ab3b..01edaa8de4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -69,6 +69,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -900,4 +901,43 @@ public class TbMathNodeTest { } } + @Test + public void givenConcurrentAccess_whenOnMsg_thenMessagesProcessedSerially() throws InterruptedException { + assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1); + + TbMathNode node = spy(initNode(TbRuleNodeMathFunctionType.ADD, + new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b") + )); + + int messageCount = RULE_DISPATCHER_POOL_SIZE * 2; + List tbMsgList = IntStream.range(0, messageCount).mapToObj(x -> { + return TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(originator) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()) + .build(); + }).toList(); + + CountDownLatch processingLatch = new CountDownLatch(messageCount); + + willAnswer(invocation -> { + processingLatch.countDown(); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(any(), any()); + + // Submit all messages concurrently from different threads + tbMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + + // Wait for all messages to be processed + assertThat(processingLatch.await(30, TimeUnit.SECONDS)).as("await on processingLatch").isTrue(); + + // Verify processMsgAsync was called exactly once per message (serial processing guaranteed by single queue) + verify(node, times(messageCount)).processMsgAsync(any(), any()); + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> verify(ctx, times(messageCount)).tellSuccess(any())); + verify(ctx, never()).tellFailure(any(), any()); + } + }