@ -36,12 +36,10 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension ;
import org.springframework.test.util.ReflectionTestUtils ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.ListeningExecutor ;
import org.thingsboard.mqtt.MqttClient ;
import org.thingsboard.mqtt.MqttClientConfig ;
import org.thingsboard.mqtt.MqttConnectResult ;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest ;
import org.thingsboard.rule.engine.TestDbCallbackExecutor ;
import org.thingsboard.rule.engine.api.TbContext ;
import org.thingsboard.rule.engine.api.TbNode ;
import org.thingsboard.rule.engine.api.TbNodeConfiguration ;
@ -50,7 +48,6 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.credentials.AnonymousCredentials ;
import org.thingsboard.rule.engine.credentials.BasicCredentials ;
import org.thingsboard.rule.engine.credentials.CertPemCredentials ;
import org.thingsboard.rule.engine.credentials.ClientCredentials ;
import org.thingsboard.server.common.data.id.DeviceId ;
import org.thingsboard.server.common.data.id.RuleNodeId ;
import org.thingsboard.server.common.data.id.TenantId ;
@ -60,7 +57,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
import javax.net.ssl.SSLException ;
import java.nio.charset.StandardCharsets ;
import java.util.Map ;
import java.util.UUID ;
import java.util.concurrent.ExecutionException ;
@ -68,12 +65,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException ;
import java.util.stream.Stream ;
import static com.amazonaws.util.StringUtils.UTF8 ;
import static org.assertj.core.api.Assertions.assertThat ;
import static org.assertj.core.api.Assertions.assertThatNoException ;
import static org.assertj.core.api.Assertions.assertThatThrownBy ;
import static org.mockito.ArgumentMatchers.any ;
import static org.mockito.ArgumentMatchers.anyBoolean ;
import static org.mockito.ArgumentMatchers.anyInt ;
import static org.mockito.ArgumentMatchers.anyLong ;
import static org.mockito.ArgumentMatchers.eq ;
import static org.mockito.BDDMockito.given ;
@ -90,7 +87,6 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
private final TenantId TENANT_ID = TenantId . fromUUID ( UUID . fromString ( "d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f" ) ) ;
private final DeviceId DEVICE_ID = new DeviceId ( UUID . fromString ( "09115d92-d333-432a-868c-ccd6e89c9287" ) ) ;
private final RuleNodeId RULE_NODE_ID = new RuleNodeId ( UUID . fromString ( "11699e8f-c3f0-4366-9334-cbf75798314b" ) ) ;
private final ListeningExecutor executor = new TestDbCallbackExecutor ( ) ;
protected TbMqttNode mqttNode ;
protected TbMqttNodeConfiguration mqttNodeConfig ;
@ -129,62 +125,64 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
@Test
public void verifyGetOwnerIdMethod ( ) {
String tenantIdStr = "6f67b6cc-21dd-46c5-809c-402b738a3f8b" ;
String ruleNodeIdStr = "80a90b53-6888-4344-bf46-01ce8e96eee7" ;
RuleNode ruleNode = new RuleNode ( new RuleNodeId ( UUID . fromString ( ruleNodeIdStr ) ) ) ;
given ( ctxMock . getTenantId ( ) ) . willReturn ( TenantId . fromUUID ( UUID . fromString ( tenantIdStr ) ) ) ;
given ( ctxMock . getSelf ( ) ) . willReturn ( ruleNode ) ;
given ( ctxMock . getTenantId ( ) ) . willReturn ( TENANT_ID ) ;
given ( ctxMock . getSelf ( ) ) . willReturn ( new RuleNode ( RULE_NODE_ID ) ) ;
String actualOwnerIdStr = mqttNode . getOwnerId ( ctxMock ) ;
String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]" ;
String expectedOwnerIdStr = "Tenant[" + TENANT_ID . getId ( ) + "]RuleNode[" + RULE_NODE_ID . getId ( ) + "]" ;
assertThat ( actualOwnerIdStr ) . isEqualTo ( expectedOwnerIdStr ) ;
}
@Test
public void verifyPrepareMqttClientConfigMethodWithBasicCredentials ( ) throws SSL Exception {
public void verifyPrepareMqttClientConfigMethodWithBasicCredentials ( ) throws Exception {
BasicCredentials credentials = new BasicCredentials ( ) ;
credentials . setUsername ( "test_username" ) ;
credentials . setPassword ( "test_password" ) ;
mqttNodeConfig . setCredentials ( credentials ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttNodeConfiguration" , mqttNodeConfig ) ;
MqttClientConfig mqttClientConfig = new MqttClientConfig ( mqttNode . getSslContext ( ) ) ;
mockSuccessfulInit ( ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
MqttClientConfig mqttClientConfig = new MqttClientConfig ( mqttNode . getSslContext ( ) ) ;
mqttNode . prepareMqttClientConfig ( mqttClientConfig ) ;
assertThat ( mqttClientConfig . getUsername ( ) ) . isEqualTo ( "test_username" ) ;
assertThat ( mqttClientConfig . getPassword ( ) ) . isEqualTo ( "test_password" ) ;
}
@ParameterizedTest
@MethodSource
public void verifyGetSslContextMethod ( boolean ssl , ClientCredentials credentials , SslContext expectedSslContext ) throws SSLException {
mqttNodeConfig . setSsl ( ssl ) ;
mqttNodeConfig . setCredentials ( credentials ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttNodeConfiguration" , mqttNodeConfig ) ;
@Test
public void givenSslIsTrueAndCredentials_whenGetSslContext_thenVerifySslContext ( ) throws Exception {
mqttNodeConfig . setSsl ( true ) ;
mqttNodeConfig . setCredentials ( new BasicCredentials ( ) ) ;
mockSuccessfulInit ( ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
SslContext actualSslContext = mqttNode . getSslContext ( ) ;
assertThat ( actualSslContext )
. usingRecursiveComparison ( )
. ignoringFields ( "ctx" , "ctxLock" , "sessionContext.context.ctx" , "sessionContext.context.ctxLock" )
. isEqualTo ( expectedSslContext ) ;
. isEqualTo ( SslContextBuilder . forClient ( ) . build ( ) ) ;
}
private static Stream < Arguments > verifyGetSslContextMethod ( ) throws SSLException {
return Stream . of (
Arguments . of ( true , new BasicCredentials ( ) , SslContextBuilder . forClient ( ) . build ( ) ) ,
Arguments . of ( false , new AnonymousCredentials ( ) , null )
) ;
@Test
public void givenSslIsFalse_whenGetSslContext_thenVerifySslContextIsNull ( ) throws Exception {
mqttNodeConfig . setSsl ( false ) ;
mockSuccessfulInit ( ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
SslContext actualSslContext = mqttNode . getSslContext ( ) ;
assertThat ( actualSslContext ) . isNull ( ) ;
}
@Test
public void givenSuccessfulConnectResult_whenInit_thenOk ( ) throws ExecutionException , InterruptedException , TimeoutException {
public void givenSuccessfulConnectResult_whenInit_thenOk ( ) throws Exception {
mqttNodeConfig . setClientId ( "bfrbTESTmfkr23" ) ;
mqttNodeConfig . setAppendClientIdSuffix ( true ) ;
mqttNodeConfig . setCredentials ( new CertPemCredentials ( ) ) ;
mockConnectClient ( mqttNode ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willReturn ( resultMock ) ;
given ( resultMock . isSuccess ( ) ) . willReturn ( true ) ;
mockSuccessfulInit ( ) ;
assertThatNoException ( ) . isThrownBy ( ( ) - > mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ) ;
}
@ -195,7 +193,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
mqttNodeConfig . setClientId ( "bfrbTESTmfkr23" ) ;
mqttNodeConfig . setCredentials ( new CertPemCredentials ( ) ) ;
mockConnectClient ( mqttNode ) ;
mockConnectClient ( ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willThrow ( new TimeoutException ( "Failed to connect" ) ) ;
assertThatThrownBy ( ( ) - > mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) )
@ -206,13 +204,13 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
}
@Test
public void givenFailedConnectResult_whenInit_thenThrowsException ( ) throws ExecutionException , InterruptedException , TimeoutEx ception {
public void givenFailedConnectResult_whenInit_thenThrowsException ( ) throws Exception {
mqttNodeConfig . setHost ( "localhost" ) ;
mqttNodeConfig . setClientId ( "bfrbTESTmfkr23" ) ;
mqttNodeConfig . setAppendClientIdSuffix ( true ) ;
mqttNodeConfig . setCredentials ( new CertPemCredentials ( ) ) ;
mockConnectClient ( mqttNode ) ;
mockConnectClient ( ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willReturn ( resultMock ) ;
given ( resultMock . isSuccess ( ) ) . willReturn ( false ) ;
given ( resultMock . getReturnCode ( ) ) . willReturn ( MqttConnectReturnCode . CONNECTION_REFUSED_NOT_AUTHORIZED ) ;
@ -226,12 +224,15 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
@ParameterizedTest
@MethodSource
public void givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess ( String topicPattern , TbMsgMetaData metaData , String data ) {
public void givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess (
String topicPattern , TbMsgMetaData metaData , String data
) throws Exception {
mqttNodeConfig . setRetainedMessage ( true ) ;
mqttNodeConfig . setTopicPattern ( topicPattern ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttNodeConfiguration" , mqttNodeConfig ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttClient" , mqttClientMock ) ;
ReflectionTestUtils . setField ( mqttNode , "forceAck" , true ) ;
given ( ctxMock . isExternalNodeForceAck ( ) ) . willReturn ( true ) ;
mockSuccessfulInit ( ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
Future < Void > future = mock ( Future . class ) ;
given ( future . isSuccess ( ) ) . willReturn ( true ) ;
@ -247,7 +248,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
then ( ctxMock ) . should ( ) . ack ( msg ) ;
String expectedTopic = TbNodeUtils . processPattern ( mqttNodeConfig . getTopicPattern ( ) , msg ) ;
then ( mqttClientMock ) . should ( ) . publish ( expectedTopic , Unpooled . wrappedBuffer ( msg . getData ( ) . getBytes ( UTF8 ) ) , MqttQoS . AT_LEAST_ONCE , true ) ;
then ( mqttClientMock ) . should ( ) . publish ( expectedTopic , Unpooled . wrappedBuffer ( msg . getData ( ) . getBytes ( StandardCharsets . UTF_ 8 ) ) , MqttQoS . AT_LEAST_ONCE , true ) ;
ArgumentCaptor < TbMsg > actualMsg = ArgumentCaptor . forClass ( TbMsg . class ) ;
then ( ctxMock ) . should ( ) . enqueueForTellNext ( actualMsg . capture ( ) , eq ( TbNodeConnectionType . SUCCESS ) ) ;
assertThat ( actualMsg . getValue ( ) ) . usingRecursiveComparison ( ) . ignoringFields ( "ctx" ) . isEqualTo ( msg ) ;
@ -262,11 +263,12 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
}
@Test
public void givenForceAckIsFalseParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure ( ) {
public void givenForceAckIsFalseParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure ( ) throws Exception {
mqttNodeConfig . setParseToPlainText ( true ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttNodeConfiguration" , mqttNodeConfig ) ;
ReflectionTestUtils . setField ( mqttNode , "mqttClient" , mqttClientMock ) ;
ReflectionTestUtils . setField ( mqttNode , "forceAck" , false ) ;
given ( ctxMock . isExternalNodeForceAck ( ) ) . willReturn ( false ) ;
mockSuccessfulInit ( ) ;
mqttNode . init ( ctxMock , new TbNodeConfiguration ( JacksonUtil . valueToTree ( mqttNodeConfig ) ) ) ;
Future < Void > future = mock ( Future . class ) ;
given ( mqttClientMock . publish ( any ( String . class ) , any ( ByteBuf . class ) , any ( MqttQoS . class ) , anyBoolean ( ) ) ) . willReturn ( future ) ;
@ -285,7 +287,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
then ( ctxMock ) . should ( never ( ) ) . ack ( msg ) ;
String expectedData = JacksonUtil . toPlainText ( msg . getData ( ) ) ;
then ( mqttClientMock ) . should ( ) . publish ( mqttNodeConfig . getTopicPattern ( ) , Unpooled . wrappedBuffer ( expectedData . getBytes ( UTF8 ) ) , MqttQoS . AT_LEAST_ONCE , false ) ;
then ( mqttClientMock ) . should ( ) . publish ( mqttNodeConfig . getTopicPattern ( ) , Unpooled . wrappedBuffer ( expectedData . getBytes ( StandardCharsets . UTF_ 8 ) ) , MqttQoS . AT_LEAST_ONCE , false ) ;
TbMsgMetaData metaData = new TbMsgMetaData ( ) ;
metaData . putValue ( "error" , RuntimeException . class + ": " + errorMsg ) ;
TbMsg expectedMsg = TbMsg . transformMsgMetadata ( msg , metaData ) ;
@ -330,12 +332,18 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
return mqttNode ;
}
protected void mockConnectClient ( TbMqttNode node ) {
private void mockConnectClient ( ) {
given ( ctxMock . getTenantId ( ) ) . willReturn ( TENANT_ID ) ;
given ( ctxMock . getSelf ( ) ) . willReturn ( new RuleNode ( RULE_NODE_ID ) ) ;
given ( ctxMock . getExternalCallExecutor ( ) ) . willReturn ( executor ) ;
given ( ctxMock . getSharedEventLoop ( ) ) . willReturn ( eventLoopGroupMock ) ;
willReturn ( promiseMock ) . given ( node ) . connectMqttClient ( any ( ) ) ;
willReturn ( mqttClientMock ) . given ( mqttNode ) . getMqttClient ( any ( ) , any ( ) ) ;
given ( mqttClientMock . connect ( any ( ) , anyInt ( ) ) ) . willReturn ( promiseMock ) ;
}
private void mockSuccessfulInit ( ) throws Exception {
mockConnectClient ( ) ;
given ( promiseMock . get ( anyLong ( ) , any ( TimeUnit . class ) ) ) . willReturn ( resultMock ) ;
given ( resultMock . isSuccess ( ) ) . willReturn ( true ) ;
}
}