|
|
|
@ -22,7 +22,9 @@ import org.junit.jupiter.api.AfterEach; |
|
|
|
import org.junit.jupiter.api.Assertions; |
|
|
|
import org.junit.jupiter.api.BeforeEach; |
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
import org.junit.jupiter.params.ParameterizedTest; |
|
|
|
import org.junit.jupiter.params.provider.Arguments; |
|
|
|
import org.junit.jupiter.params.provider.MethodSource; |
|
|
|
import org.mockito.ArgumentCaptor; |
|
|
|
import org.mockito.ArgumentMatchers; |
|
|
|
import org.mockito.stubbing.Answer; |
|
|
|
@ -143,13 +145,24 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { |
|
|
|
node.destroy(); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { |
|
|
|
private static Stream<Arguments> given_100_messages_strategy_first_then_verifyOutput() { |
|
|
|
return Stream.of( |
|
|
|
Arguments.of((String) null), |
|
|
|
Arguments.of(DataConstants.MAIN_QUEUE_NAME), |
|
|
|
Arguments.of(DataConstants.HP_QUEUE_NAME) |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
@ParameterizedTest |
|
|
|
@MethodSource |
|
|
|
public void given_100_messages_strategy_first_then_verifyOutput(String queueName) throws TbNodeException, ExecutionException, InterruptedException { |
|
|
|
int wantedNumberOfTellSelfInvocation = 2; |
|
|
|
int msgCount = 100; |
|
|
|
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); |
|
|
|
invokeTellSelf(wantedNumberOfTellSelfInvocation); |
|
|
|
|
|
|
|
when(ctx.getQueueName()).thenReturn(queueName); |
|
|
|
|
|
|
|
config.setInterval(deduplicationInterval); |
|
|
|
config.setMaxPendingMsgs(msgCount); |
|
|
|
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|
|
|
@ -186,6 +199,12 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { |
|
|
|
Assertions.assertEquals(firstMsg.getData(), actualMsg.getData()); |
|
|
|
Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData()); |
|
|
|
Assertions.assertEquals(firstMsg.getType(), actualMsg.getType()); |
|
|
|
|
|
|
|
if (queueName == null) { |
|
|
|
Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName()); |
|
|
|
} else { |
|
|
|
Assertions.assertEquals(ctx.getQueueName(), actualMsg.getQueueName()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
|