@ -69,6 +69,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream ;
import static org.assertj.core.api.Assertions.assertThat ;
import static org.awaitility.Awaitility.await ;
import static org.junit.jupiter.api.Assertions.assertEquals ;
import static org.junit.jupiter.api.Assertions.assertFalse ;
import static org.junit.jupiter.api.Assertions.assertNotNull ;
@ -900,4 +901,43 @@ public class TbMathNodeTest {
}
}
@Test
public void givenConcurrentAccess_whenOnMsg_thenMessagesProcessedSerially ( ) throws InterruptedException {
assertThat ( RULE_DISPATCHER_POOL_SIZE ) . as ( "dispatcher pool size have to be > 1" ) . isGreaterThan ( 1 ) ;
TbMathNode node = spy ( initNode ( TbRuleNodeMathFunctionType . ADD ,
new TbMathResult ( TbMathArgumentType . MESSAGE_BODY , "result" , 2 , false , false , null ) ,
new TbMathArgument ( TbMathArgumentType . MESSAGE_BODY , "a" ) ,
new TbMathArgument ( TbMathArgumentType . MESSAGE_BODY , "b" )
) ) ;
int messageCount = RULE_DISPATCHER_POOL_SIZE * 2 ;
List < TbMsg > tbMsgList = IntStream . range ( 0 , messageCount ) . mapToObj ( x - > {
return TbMsg . newMsg ( )
. type ( TbMsgType . POST_TELEMETRY_REQUEST )
. originator ( originator )
. copyMetaData ( TbMsgMetaData . EMPTY )
. data ( JacksonUtil . newObjectNode ( ) . put ( "a" , 2 ) . put ( "b" , 2 ) . toString ( ) )
. build ( ) ;
} ) . toList ( ) ;
CountDownLatch processingLatch = new CountDownLatch ( messageCount ) ;
willAnswer ( invocation - > {
processingLatch . countDown ( ) ;
return invocation . callRealMethod ( ) ;
} ) . given ( node ) . processMsgAsync ( any ( ) , any ( ) ) ;
// Submit all messages concurrently from different threads
tbMsgList . forEach ( msg - > ruleEngineDispatcherExecutor . executeAsync ( ( ) - > node . onMsg ( ctx , msg ) ) ) ;
// Wait for all messages to be processed
assertThat ( processingLatch . await ( 30 , TimeUnit . SECONDS ) ) . as ( "await on processingLatch" ) . isTrue ( ) ;
// Verify processMsgAsync was called exactly once per message (serial processing guaranteed by single queue)
verify ( node , times ( messageCount ) ) . processMsgAsync ( any ( ) , any ( ) ) ;
await ( ) . atMost ( 30 , TimeUnit . SECONDS ) . untilAsserted ( ( ) - > verify ( ctx , times ( messageCount ) ) . tellSuccess ( any ( ) ) ) ;
verify ( ctx , never ( ) ) . tellFailure ( any ( ) , any ( ) ) ;
}
}