From dcdba9e75b82627fcff5eea5bfafea76508176b4 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 8 Aug 2024 17:03:28 +0300 Subject: [PATCH] changed access modifiers and added aqdditional tests --- .../rule/engine/rabbitmq/TbRabbitMqNode.java | 13 ++-- .../engine/rabbitmq/TbRabbitMqNodeTest.java | 63 ++++++++++++++++--- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java index 19bbbd10ec..3e26a076ec 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java @@ -51,6 +51,10 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; ) public class TbRabbitMqNode extends TbAbstractExternalNode { + private static final String supportedPropertiesStr = String.join(", ", + "BASIC", "TEXT_PLAIN", "MINIMAL_BASIC", "MINIMAL_PERSISTENT_BASIC", "PERSISTENT_BASIC", "PERSISTENT_TEXT_PLAIN" + ); + private static final Charset UTF8 = StandardCharsets.UTF_8; private static final String ERROR = "error"; @@ -81,7 +85,7 @@ public class TbRabbitMqNode extends TbAbstractExternalNode { t -> tellFailure(ctx, processException(tbMsg, t), t)); } - protected ConnectionFactory getConnectionFactory() { + ConnectionFactory getConnectionFactory() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(this.config.getHost()); factory.setPort(this.config.getPort()); @@ -91,7 +95,7 @@ public class TbRabbitMqNode extends TbAbstractExternalNode { factory.setAutomaticRecoveryEnabled(this.config.isAutomaticRecoveryEnabled()); factory.setConnectionTimeout(this.config.getConnectionTimeout()); factory.setHandshakeTimeout(this.config.getHandshakeTimeout()); - this.config.getClientProperties().forEach((k,v) -> factory.getClientProperties().put(k,v)); + this.config.getClientProperties().forEach((k, v) -> factory.getClientProperties().put(k, v)); return factory; } @@ -137,7 +141,7 @@ public class TbRabbitMqNode extends TbAbstractExternalNode { } } - protected static AMQP.BasicProperties convert(String name) throws TbNodeException { + static AMQP.BasicProperties convert(String name) throws TbNodeException { switch (name) { case "BASIC": return MessageProperties.BASIC; @@ -152,7 +156,8 @@ public class TbRabbitMqNode extends TbAbstractExternalNode { case "PERSISTENT_TEXT_PLAIN": return MessageProperties.PERSISTENT_TEXT_PLAIN; default: - throw new TbNodeException("Message Properties: '" + name + "' is undefined!"); + throw new TbNodeException("Undefined message properties '" + name + + "'! Only " + supportedPropertiesStr + " message properties are supported!"); } } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java index 468f1008b4..649d28a51a 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java @@ -62,16 +62,21 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.times; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; @ExtendWith(MockitoExtension.class) public class TbRabbitMqNodeTest { + private final String supportedPropertiesStr = String.join(", ", + "BASIC", "TEXT_PLAIN", "MINIMAL_BASIC", "MINIMAL_PERSISTENT_BASIC", "PERSISTENT_BASIC", "PERSISTENT_TEXT_PLAIN" + ); + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("b3d6f9dd-15cc-4e61-acc0-13197a090406")); private final ListeningExecutor executor = new TestDbCallbackExecutor(); @@ -109,6 +114,25 @@ public class TbRabbitMqNodeTest { assertThat(config.getClientProperties()).isEqualTo(Collections.emptyMap()); } + @Test + public void verifyGetConnectionFactoryMethod() { + ReflectionTestUtils.setField(node, "config", config); + + ConnectionFactory connectionFactory = node.getConnectionFactory(); + assertThat(connectionFactory).isNotNull(); + assertThat(connectionFactory.getHost()).isEqualTo(config.getHost()); + assertThat(connectionFactory.getPort()).isEqualTo(config.getPort()); + assertThat(connectionFactory.getVirtualHost()).isEqualTo(config.getVirtualHost()); + assertThat(connectionFactory.getUsername()).isEqualTo(config.getUsername()); + assertThat(connectionFactory.getPassword()).isEqualTo(config.getPassword()); + assertThat(connectionFactory.isAutomaticRecoveryEnabled()).isEqualTo(config.isAutomaticRecoveryEnabled()); + assertThat(connectionFactory.getConnectionTimeout()).isEqualTo(config.getConnectionTimeout()); + assertThat(connectionFactory.getHandshakeTimeout()).isEqualTo(config.getHandshakeTimeout()); + Map expectedClientProperties = new ConnectionFactory().getClientProperties(); + expectedClientProperties.putAll(config.getClientProperties()); + assertThat(connectionFactory.getClientProperties()).isEqualTo(expectedClientProperties); + } + @ParameterizedTest @MethodSource public void givenForceAckIsTrueAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndEnqueueForTellNext( @@ -138,10 +162,9 @@ public class TbRabbitMqNodeTest { private static Stream givenForceAckIsTrueAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndEnqueueForTellNext() { return Stream.of( - Arguments.of("", "", null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), Arguments.of("topic_logs", "kern.critical", "", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), Arguments.of("${mdExchangeName}", "${mdRoutingKey}", "BASIC", - new TbMsgMetaData(Map.of("mdExchangeName", "md_topic_logs","mdRoutingKey", "md.kern.critical")), + new TbMsgMetaData(Map.of("mdExchangeName", "md_topic_logs", "mdRoutingKey", "md.kern.critical")), TbMsg.EMPTY_JSON_OBJECT), Arguments.of("$[msgExchangeName]", "$[msgRoutingKey]", "MINIMAL_PERSISTENT_BASIC", TbMsgMetaData.EMPTY, "{\"msgExchangeName\":\"msg_topic_logs\",\"msgRoutingKey\":\"msg.kern.critical\"}") @@ -149,9 +172,27 @@ public class TbRabbitMqNodeTest { } @Test - public void givenForceAckIsFalseAndErrorOccursDuringPublishing_whenOnMsg_thenTellFailure() throws Exception { + public void givenForceAckIsFalseAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndTellSuccess() throws Exception { given(ctxMock.isExternalNodeForceAck()).willReturn(false); mockOnInit(); + given(ctxMock.getExternalCallExecutor()).willReturn(executor); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + then(ctxMock).should(never()).ack(any(TbMsg.class)); + then(channelMock).should().basicPublish("", "", null, msg.getData().getBytes(StandardCharsets.UTF_8)); + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellSuccess(actualMsg.capture()); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(msg); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void givenForceAckAndErrorOccursDuringPublishing_whenOnMsg_thenVerifyTellFailure(boolean forceAck) throws Exception { + given(ctxMock.isExternalNodeForceAck()).willReturn(forceAck); + mockOnInit(); ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); String errorMsg = "Something went wrong"; @@ -163,10 +204,13 @@ public class TbRabbitMqNodeTest { TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT); node.onMsg(ctxMock, msg); - then(ctxMock).should(never()).ack(any(TbMsg.class)); + then(ctxMock).should(forceAck ? times(1) : never()).ack(any(TbMsg.class)); ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor throwable = ArgumentCaptor.forClass(Throwable.class); - then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture()); + Runnable verifyTellFailure = forceAck ? + () -> then(ctxMock).should().enqueueForTellFailure(actualMsg.capture(), throwable.capture()) : + () -> then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture()); + verifyTellFailure.run(); metaData.putValue("error", RuntimeException.class + ": " + errorMsg); TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); @@ -196,7 +240,8 @@ public class TbRabbitMqNodeTest { public void givenUndefinedProperties_whenConvert_thenThrowsException(String name) { assertThatThrownBy(() -> TbRabbitMqNode.convert(name)) .isInstanceOf(TbNodeException.class) - .hasMessage("Message Properties: '" + name + "' is undefined!"); + .hasMessage("Undefined message properties type '" + name + + "'! Only " + supportedPropertiesStr + " message properties are supported!"); } @Test