|
|
|
@ -62,16 +62,21 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; |
|
|
|
import static org.mockito.ArgumentMatchers.any; |
|
|
|
import static org.mockito.ArgumentMatchers.eq; |
|
|
|
import static org.mockito.BDDMockito.given; |
|
|
|
import static org.mockito.BDDMockito.mock; |
|
|
|
import static org.mockito.BDDMockito.never; |
|
|
|
import static org.mockito.BDDMockito.spy; |
|
|
|
import static org.mockito.BDDMockito.then; |
|
|
|
import static org.mockito.BDDMockito.times; |
|
|
|
import static org.mockito.BDDMockito.willAnswer; |
|
|
|
import static org.mockito.BDDMockito.willReturn; |
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
import static org.mockito.Mockito.never; |
|
|
|
import static org.mockito.Mockito.spy; |
|
|
|
|
|
|
|
@ExtendWith(MockitoExtension.class) |
|
|
|
public class TbRabbitMqNodeTest { |
|
|
|
|
|
|
|
private final String supportedPropertiesStr = String.join(", ", |
|
|
|
"BASIC", "TEXT_PLAIN", "MINIMAL_BASIC", "MINIMAL_PERSISTENT_BASIC", "PERSISTENT_BASIC", "PERSISTENT_TEXT_PLAIN" |
|
|
|
); |
|
|
|
|
|
|
|
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("b3d6f9dd-15cc-4e61-acc0-13197a090406")); |
|
|
|
private final ListeningExecutor executor = new TestDbCallbackExecutor(); |
|
|
|
|
|
|
|
@ -109,6 +114,25 @@ public class TbRabbitMqNodeTest { |
|
|
|
assertThat(config.getClientProperties()).isEqualTo(Collections.emptyMap()); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void verifyGetConnectionFactoryMethod() { |
|
|
|
ReflectionTestUtils.setField(node, "config", config); |
|
|
|
|
|
|
|
ConnectionFactory connectionFactory = node.getConnectionFactory(); |
|
|
|
assertThat(connectionFactory).isNotNull(); |
|
|
|
assertThat(connectionFactory.getHost()).isEqualTo(config.getHost()); |
|
|
|
assertThat(connectionFactory.getPort()).isEqualTo(config.getPort()); |
|
|
|
assertThat(connectionFactory.getVirtualHost()).isEqualTo(config.getVirtualHost()); |
|
|
|
assertThat(connectionFactory.getUsername()).isEqualTo(config.getUsername()); |
|
|
|
assertThat(connectionFactory.getPassword()).isEqualTo(config.getPassword()); |
|
|
|
assertThat(connectionFactory.isAutomaticRecoveryEnabled()).isEqualTo(config.isAutomaticRecoveryEnabled()); |
|
|
|
assertThat(connectionFactory.getConnectionTimeout()).isEqualTo(config.getConnectionTimeout()); |
|
|
|
assertThat(connectionFactory.getHandshakeTimeout()).isEqualTo(config.getHandshakeTimeout()); |
|
|
|
Map<String, Object> expectedClientProperties = new ConnectionFactory().getClientProperties(); |
|
|
|
expectedClientProperties.putAll(config.getClientProperties()); |
|
|
|
assertThat(connectionFactory.getClientProperties()).isEqualTo(expectedClientProperties); |
|
|
|
} |
|
|
|
|
|
|
|
@ParameterizedTest |
|
|
|
@MethodSource |
|
|
|
public void givenForceAckIsTrueAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndEnqueueForTellNext( |
|
|
|
@ -138,10 +162,9 @@ public class TbRabbitMqNodeTest { |
|
|
|
|
|
|
|
private static Stream<Arguments> givenForceAckIsTrueAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndEnqueueForTellNext() { |
|
|
|
return Stream.of( |
|
|
|
Arguments.of("", "", null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), |
|
|
|
Arguments.of("topic_logs", "kern.critical", "", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), |
|
|
|
Arguments.of("${mdExchangeName}", "${mdRoutingKey}", "BASIC", |
|
|
|
new TbMsgMetaData(Map.of("mdExchangeName", "md_topic_logs","mdRoutingKey", "md.kern.critical")), |
|
|
|
new TbMsgMetaData(Map.of("mdExchangeName", "md_topic_logs", "mdRoutingKey", "md.kern.critical")), |
|
|
|
TbMsg.EMPTY_JSON_OBJECT), |
|
|
|
Arguments.of("$[msgExchangeName]", "$[msgRoutingKey]", "MINIMAL_PERSISTENT_BASIC", |
|
|
|
TbMsgMetaData.EMPTY, "{\"msgExchangeName\":\"msg_topic_logs\",\"msgRoutingKey\":\"msg.kern.critical\"}") |
|
|
|
@ -149,9 +172,27 @@ public class TbRabbitMqNodeTest { |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenForceAckIsFalseAndErrorOccursDuringPublishing_whenOnMsg_thenTellFailure() throws Exception { |
|
|
|
public void givenForceAckIsFalseAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndTellSuccess() throws Exception { |
|
|
|
given(ctxMock.isExternalNodeForceAck()).willReturn(false); |
|
|
|
mockOnInit(); |
|
|
|
given(ctxMock.getExternalCallExecutor()).willReturn(executor); |
|
|
|
|
|
|
|
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); |
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
then(ctxMock).should(never()).ack(any(TbMsg.class)); |
|
|
|
then(channelMock).should().basicPublish("", "", null, msg.getData().getBytes(StandardCharsets.UTF_8)); |
|
|
|
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); |
|
|
|
then(ctxMock).should().tellSuccess(actualMsg.capture()); |
|
|
|
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(msg); |
|
|
|
} |
|
|
|
|
|
|
|
@ParameterizedTest |
|
|
|
@ValueSource(booleans = {true, false}) |
|
|
|
public void givenForceAckAndErrorOccursDuringPublishing_whenOnMsg_thenVerifyTellFailure(boolean forceAck) throws Exception { |
|
|
|
given(ctxMock.isExternalNodeForceAck()).willReturn(forceAck); |
|
|
|
mockOnInit(); |
|
|
|
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); |
|
|
|
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); |
|
|
|
String errorMsg = "Something went wrong"; |
|
|
|
@ -163,10 +204,13 @@ public class TbRabbitMqNodeTest { |
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
then(ctxMock).should(never()).ack(any(TbMsg.class)); |
|
|
|
then(ctxMock).should(forceAck ? times(1) : never()).ack(any(TbMsg.class)); |
|
|
|
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); |
|
|
|
ArgumentCaptor<Throwable> throwable = ArgumentCaptor.forClass(Throwable.class); |
|
|
|
then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture()); |
|
|
|
Runnable verifyTellFailure = forceAck ? |
|
|
|
() -> then(ctxMock).should().enqueueForTellFailure(actualMsg.capture(), throwable.capture()) : |
|
|
|
() -> then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture()); |
|
|
|
verifyTellFailure.run(); |
|
|
|
metaData.putValue("error", RuntimeException.class + ": " + errorMsg); |
|
|
|
TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); |
|
|
|
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); |
|
|
|
@ -196,7 +240,8 @@ public class TbRabbitMqNodeTest { |
|
|
|
public void givenUndefinedProperties_whenConvert_thenThrowsException(String name) { |
|
|
|
assertThatThrownBy(() -> TbRabbitMqNode.convert(name)) |
|
|
|
.isInstanceOf(TbNodeException.class) |
|
|
|
.hasMessage("Message Properties: '" + name + "' is undefined!"); |
|
|
|
.hasMessage("Undefined message properties type '" + name + |
|
|
|
"'! Only " + supportedPropertiesStr + " message properties are supported!"); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
|