@ -17,11 +17,14 @@ package org.thingsboard.rule.engine.mqtt;
import io.netty.buffer.ByteBuf ;
import io.netty.buffer.Unpooled ;
import io.netty.channel.EventLoopGroup ;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode ;
import io.netty.handler.codec.mqtt.MqttQoS ;
import io.netty.handler.ssl.SslContext ;
import io.netty.handler.ssl.SslContextBuilder ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.GenericFutureListener ;
import io.netty.util.concurrent.Promise ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.api.extension.ExtendWith ;
@ -33,9 +36,12 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension ;
import org.springframework.test.util.ReflectionTestUtils ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.ListeningExecutor ;
import org.thingsboard.mqtt.MqttClient ;
import org.thingsboard.mqtt.MqttClientConfig ;
import org.thingsboard.mqtt.MqttConnectResult ;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest ;
import org.thingsboard.rule.engine.TestDbCallbackExecutor ;
import org.thingsboard.rule.engine.api.TbContext ;
import org.thingsboard.rule.engine.api.TbNode ;
import org.thingsboard.rule.engine.api.TbNodeConfiguration ;
@ -43,11 +49,13 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils ;
import org.thingsboard.rule.engine.credentials.AnonymousCredentials ;
import org.thingsboard.rule.engine.credentials.BasicCredentials ;
import org.thingsboard.rule.engine.credentials.CertPemCredentials ;
import org.thingsboard.rule.engine.credentials.ClientCredentials ;
import org.thingsboard.server.common.data.id.DeviceId ;
import org.thingsboard.server.common.data.id.RuleNodeId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.msg.TbMsgType ;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType ;
import org.thingsboard.server.common.data.rule.RuleNode ;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
@ -55,26 +63,34 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.net.ssl.SSLException ;
import java.util.Map ;
import java.util.UUID ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeoutException ;
import java.util.stream.Stream ;
import static com.amazonaws.util.StringUtils.UTF8 ;
import static org.assertj.core.api.Assertions.assertThat ;
import static org.assertj.core.api.Assertions.assertThatNoException ;
import static org.assertj.core.api.Assertions.assertThatThrownBy ;
import static org.mockito.ArgumentMatchers.any ;
import static org.mockito.ArgumentMatchers.anyBoolean ;
import static org.mockito.ArgumentMatchers.anyLong ;
import static org.mockito.ArgumentMatchers.eq ;
import static org.mockito.BDDMockito.given ;
import static org.mockito.BDDMockito.mock ;
import static org.mockito.BDDMockito.never ;
import static org.mockito.BDDMockito.spy ;
import static org.mockito.BDDMockito.then ;
import static org.mockito.BDDMockito.willAnswer ;
import static org.mockito.BDDMockito.willReturn ;
import static org.mockito.BDDMockito.willThrow ;
@ExtendWith ( MockitoExtension . class )
public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
private final TenantId TENANT_ID = TenantId . fromUUID ( UUID . fromString ( "d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f" ) ) ;
private final DeviceId DEVICE_ID = new DeviceId ( UUID . fromString ( "09115d92-d333-432a-868c-ccd6e89c9287" ) ) ;
private final RuleNodeId RULE_NODE_ID = new RuleNodeId ( UUID . fromString ( "11699e8f-c3f0-4366-9334-cbf75798314b" ) ) ;
private final ListeningExecutor executor = new TestDbCallbackExecutor ( ) ;
protected TbMqttNode mqttNode ;
protected TbMqttNodeConfiguration mqttNodeConfig ;
@ -83,6 +99,12 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
protected TbContext ctxMock ;
@Mock
protected MqttClient mqttClientMock ;
@Mock
protected EventLoopGroup eventLoopGroupMock ;
@Mock
protected Promise < MqttConnectResult > promiseMock ;
@Mock
protected MqttConnectResult resultMock ;
@BeforeEach
protected void setUp ( ) {
@ -129,9 +151,8 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
mqttNode . prepareMqttClientConfig ( mqttClientConfig ) ;
assertThat ( mqttClientConfig )
. hasFieldOrPropertyWithValue ( "username" , "test_username" )
. hasFieldOrPropertyWithValue ( "password" , "test_password" ) ;
assertThat ( mqttClientConfig . getUsername ( ) ) . isEqualTo ( "test_username" ) ;
assertThat ( mqttClientConfig . getPassword ( ) ) . isEqualTo ( "test_password" ) ;
}
@ParameterizedTest
@ -156,23 +177,62 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
}
@Test
public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException ( ) throws Exception {
String errorMsg = "Failed to connect to MQTT broker!" ;
willThrow ( new RuntimeException ( errorMsg ) ) . given ( mqttNode ) . initClient ( any ( ) ) ;
public void givenSuccessfulConnectResult_whenInit_thenOk ( ) throws ExecutionException , InterruptedException , TimeoutException {
mqttNodeConfig . setClientId ( "bfrbTESTmfkr23" ) ;
mqttNodeConfig . setAppendClientIdSuffix ( true ) ;
mqttNodeConfig . setCredentials ( new CertPemCredentials ( ) ) ;
mockConnectClient ( mqttNode ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willReturn ( resultMock ) ;
given ( resultMock . isSuccess ( ) ) . willReturn ( true ) ;
assertThatNoException ( ) . isThrownBy ( ( ) - > mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ) ;
}
@Test
public void givenFailedByTimeoutConnectResult_whenInit_thenThrowsException ( ) throws ExecutionException , InterruptedException , TimeoutException {
mqttNodeConfig . setHost ( "localhost" ) ;
mqttNodeConfig . setClientId ( "bfrbTESTmfkr23" ) ;
mqttNodeConfig . setCredentials ( new CertPemCredentials ( ) ) ;
mockConnectClient ( mqttNode ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willThrow ( new TimeoutException ( "Failed to connect" ) ) ;
assertThatThrownBy ( ( ) - > mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) )
. isInstanceOf ( TbNodeException . class )
. hasMessage ( "java.lang.RuntimeException: Failed to connect to MQTT broker at localhost:1883." )
. extracting ( e - > ( ( TbNodeException ) e ) . isUnrecoverable ( ) )
. isEqualTo ( false ) ;
}
@Test
public void givenFailedConnectResult_whenInit_thenThrowsException ( ) throws ExecutionException , InterruptedException , TimeoutException {
mqttNodeConfig . setHost ( "localhost" ) ;
mqttNodeConfig . setClientId ( "bfrbTESTmfkr23" ) ;
mqttNodeConfig . setAppendClientIdSuffix ( true ) ;
mqttNodeConfig . setCredentials ( new CertPemCredentials ( ) ) ;
mockConnectClient ( mqttNode ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willReturn ( resultMock ) ;
given ( resultMock . isSuccess ( ) ) . willReturn ( false ) ;
given ( resultMock . getReturnCode ( ) ) . willReturn ( MqttConnectReturnCode . CONNECTION_REFUSED_NOT_AUTHORIZED ) ;
var configuration = new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ;
assertThatThrownBy ( ( ) - > mqttNode . init ( ctxMock , configuration ) )
assertThatThrownBy ( ( ) - > mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) )
. isInstanceOf ( TbNodeException . class )
. hasMessage ( RuntimeException . class . getName ( ) + ": " + errorMsg ) ;
. hasMessage ( "java.lang.RuntimeException: Failed to connect to MQTT broker at localhost:1883. Result code is: CONNECTION_REFUSED_NOT_AUTHORIZED" )
. extracting ( e - > ( ( TbNodeException ) e ) . isUnrecoverable ( ) )
. isEqualTo ( false ) ;
}
@ParameterizedTest
@MethodSource
public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess ( String topicPattern , TbMsgMetaData metaData , String data ) throws Exception {
public void givenForceAckIsTrueAnd TopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess ( String topicPattern , TbMsgMetaData metaData , String data ) {
mqttNodeConfig . setRetainedMessage ( true ) ;
mqttNodeConfig . setTopicPattern ( topicPattern ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttNodeConfiguration" , mqttNodeConfig ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttClient" , mqttClientMock ) ;
ReflectionTestUtils . setField ( mqttNode , "forceAck" , true ) ;
willReturn ( mqttClientMock ) . given ( mqttNode ) . initClient ( any ( ) ) ;
Future < Void > future = mock ( Future . class ) ;
given ( future . isSuccess ( ) ) . willReturn ( true ) ;
given ( mqttClientMock . publish ( any ( String . class ) , any ( ByteBuf . class ) , any ( MqttQoS . class ) , anyBoolean ( ) ) ) . willReturn ( future ) ;
@ -182,16 +242,18 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
return null ;
} ) . given ( future ) . addListener ( any ( ) ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
TbMsg msg = TbMsg . newMsg ( TbMsgType . POST_TELEMETRY_REQUEST , DEVICE_ID , metaData , data ) ;
mqttNode . onMsg ( ctxMock , msg ) ;
then ( ctxMock ) . should ( ) . ack ( msg ) ;
String expectedTopic = TbNodeUtils . processPattern ( mqttNodeConfig . getTopicPattern ( ) , msg ) ;
then ( mqttClientMock ) . should ( ) . publish ( expectedTopic , Unpooled . wrappedBuffer ( msg . getData ( ) . getBytes ( UTF8 ) ) , MqttQoS . AT_LEAST_ONCE , true ) ;
then ( ctxMock ) . should ( ) . tellSuccess ( msg ) ;
ArgumentCaptor < TbMsg > actualMsg = ArgumentCaptor . forClass ( TbMsg . class ) ;
then ( ctxMock ) . should ( ) . enqueueForTellNext ( actualMsg . capture ( ) , eq ( TbNodeConnectionType . SUCCESS ) ) ;
assertThat ( actualMsg . getValue ( ) ) . usingRecursiveComparison ( ) . ignoringFields ( "ctx" ) . isEqualTo ( msg ) ;
}
private static Stream < Arguments > givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess ( ) {
private static Stream < Arguments > givenForceAckIsTrueAnd TopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess ( ) {
return Stream . of (
Arguments . of ( "new-topic" , TbMsgMetaData . EMPTY , TbMsg . EMPTY_JSON_OBJECT ) ,
Arguments . of ( "${md-topic-name}" , new TbMsgMetaData ( Map . of ( "md-topic-name" , "md-new-topic" ) ) , TbMsg . EMPTY_JSON_OBJECT ) ,
@ -200,10 +262,12 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
}
@Test
public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure ( ) throws Exception {
public void givenForceAckIsFalse ParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure ( ) {
mqttNodeConfig . setParseToPlainText ( true ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttNodeConfiguration" , mqttNodeConfig ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttClient" , mqttClientMock ) ;
ReflectionTestUtils . setField ( mqttNode , "forceAck" , false ) ;
willReturn ( mqttClientMock ) . given ( mqttNode ) . initClient ( any ( ) ) ;
Future < Void > future = mock ( Future . class ) ;
given ( mqttClientMock . publish ( any ( String . class ) , any ( ByteBuf . class ) , any ( MqttQoS . class ) , anyBoolean ( ) ) ) . willReturn ( future ) ;
given ( future . isSuccess ( ) ) . willReturn ( false ) ;
@ -216,10 +280,10 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
return null ;
} ) . given ( future ) . addListener ( any ( ) ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
TbMsg msg = TbMsg . newMsg ( TbMsgType . POST_TELEMETRY_REQUEST , DEVICE_ID , TbMsgMetaData . EMPTY , "\"string\"" ) ;
mqttNode . onMsg ( ctxMock , msg ) ;
then ( ctxMock ) . should ( never ( ) ) . ack ( msg ) ;
String expectedData = JacksonUtil . toPlainText ( msg . getData ( ) ) ;
then ( mqttClientMock ) . should ( ) . publish ( mqttNodeConfig . getTopicPattern ( ) , Unpooled . wrappedBuffer ( expectedData . getBytes ( UTF8 ) ) , MqttQoS . AT_LEAST_ONCE , false ) ;
TbMsgMetaData metaData = new TbMsgMetaData ( ) ;
@ -231,6 +295,20 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
assertThat ( actualMsg ) . usingRecursiveComparison ( ) . ignoringFields ( "ctx" ) . isEqualTo ( expectedMsg ) ;
}
@Test
public void givenMqttClientIsNotNull_whenDestroy_thenDisconnect ( ) {
ReflectionTestUtils . setField ( mqttNode , "mqttClient" , mqttClientMock ) ;
mqttNode . destroy ( ) ;
then ( mqttClientMock ) . should ( ) . disconnect ( ) ;
}
@Test
public void givenMqttClientIsNull_whenDestroy_thenShouldHaveNoInteractions ( ) {
ReflectionTestUtils . setField ( mqttNode , "mqttClient" , null ) ;
mqttNode . destroy ( ) ;
then ( mqttClientMock ) . shouldHaveNoInteractions ( ) ;
}
private static Stream < Arguments > givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig ( ) {
return Stream . of (
// default config for version 0
@ -251,4 +329,13 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
protected TbNode getTestNode ( ) {
return mqttNode ;
}
protected void mockConnectClient ( TbMqttNode node ) {
given ( ctxMock . getTenantId ( ) ) . willReturn ( TENANT_ID ) ;
given ( ctxMock . getSelf ( ) ) . willReturn ( new RuleNode ( RULE_NODE_ID ) ) ;
given ( ctxMock . getExternalCallExecutor ( ) ) . willReturn ( executor ) ;
given ( ctxMock . getSharedEventLoop ( ) ) . willReturn ( eventLoopGroupMock ) ;
willReturn ( promiseMock ) . given ( node ) . connectMqttClient ( any ( ) ) ;
}
}