diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java index a1c40f0d24..0a97be3bd0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java @@ -66,6 +66,7 @@ public class CertPemCredentials implements ClientCredentials { return CredentialsType.CERT_PEM; } + @Override public SslContext initSslContext() { try { Security.addProvider(new BouncyCastleProvider()); @@ -120,7 +121,7 @@ public class CertPemCredentials implements ClientCredentials { return keyManagerFactory; } - private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception { + protected TrustManagerFactory createAndInitTrustManagerFactory() throws Exception { X509Certificate caCertHolder; caCertHolder = readCertFile(caCert); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index ba1d0caea2..2ecf770e19 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -20,6 +20,7 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.ssl.SslContext; import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.util.StringUtils; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; @@ -78,14 +79,14 @@ public class TbMqttNode implements TbNode { String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData()); this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE) .addListener(future -> { - if (future.isSuccess()) { - ctx.tellSuccess(msg); - } else { - TbMsg next = processException(ctx, msg, future.cause()); - ctx.tellFailure(next, future.cause()); - } - } - ); + if (future.isSuccess()) { + ctx.tellSuccess(msg); + } else { + TbMsg next = processException(ctx, msg, future.cause()); + ctx.tellFailure(next, future.cause()); + } + } + ); } private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) { @@ -108,12 +109,7 @@ public class TbMqttNode implements TbNode { } config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); - ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); - if (credentials.getType() == CredentialsType.BASIC) { - config.setUsername(((BasicCredentials) credentials).getUsername()); - config.setPassword(((BasicCredentials) credentials).getPassword()); - } - + prepareMqttClientConfig(config); MqttClient client = MqttClient.create(config, null); client.setEventLoop(ctx.getSharedEventLoop()); Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); @@ -135,14 +131,17 @@ public class TbMqttNode implements TbNode { return client; } - private SslContext getSslContext() throws SSLException { + protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); - SslContext sslContext = credentials.initSslContext(); - if (!this.mqttNodeConfiguration.isSsl() && - (credentials.getType() == CredentialsType.ANONYMOUS || credentials.getType() == CredentialsType.BASIC)) { - sslContext = null; + if (credentials.getType() == CredentialsType.BASIC) { + BasicCredentials basicCredentials = (BasicCredentials) credentials; + config.setUsername(basicCredentials.getUsername()); + config.setPassword(basicCredentials.getPassword()); } - return sslContext; + } + + private SslContext getSslContext() throws SSLException { + return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java index f938e6e603..5155acf831 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java @@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.credentials.CredentialsType; import javax.net.ssl.TrustManagerFactory; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.security.KeyStore; import java.security.Security; import java.security.cert.CertificateFactory; @@ -63,29 +64,4 @@ public class AzureIotHubSasCredentials extends CertPemCredentials { return CredentialsType.SAS; } - private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception { - X509Certificate caCertHolder; - caCertHolder = readCertFile(caCert); - - KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType()); - caKeyStore.load(null, null); - caKeyStore.setCertificateEntry("caCert-cert", caCertHolder); - - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - trustManagerFactory.init(caKeyStore); - return trustManagerFactory; - } - - private X509Certificate readCertFile(String fileContent) throws Exception { - X509Certificate certificate = null; - if (fileContent != null && !fileContent.trim().isEmpty()) { - fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - .replaceAll("\\s", ""); - byte[] decoded = Base64.decodeBase64(fileContent); - CertificateFactory certFactory = CertificateFactory.getInstance("X.509"); - certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded)); - } - return certificate; - } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 4ada1e0d39..9718b10f9c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -15,14 +15,18 @@ */ package org.thingsboard.rule.engine.mqtt.azure; +import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; import org.thingsboard.common.util.AzureIotHubUtil; +import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.credentials.BasicCredentials; import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.credentials.ClientCredentials; import org.thingsboard.rule.engine.credentials.CredentialsType; @@ -50,42 +54,24 @@ public class TbAzureIotHubNode extends TbMqttNode { mqttNodeConfiguration.setPort(8883); mqttNodeConfiguration.setCleanSession(true); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); - mqttNodeConfiguration.setCredentials(new ClientCredentials() { - @Override - public CredentialsType getType() { - return credentials.getType(); + if (CredentialsType.CERT_PEM == credentials.getType()) { + CertPemCredentials pemCredentials = (CertPemCredentials) credentials; + if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) { + pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert()); } - - @Override - public SslContext initSslContext() throws SSLException { - if (credentials instanceof AzureIotHubSasCredentials) { - AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials; - if (sasCredentials.getCaCert() == null || sasCredentials.getCaCert().isEmpty()) { - sasCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert()); - } - } else if (credentials instanceof CertPemCredentials) { - CertPemCredentials pemCredentials = (CertPemCredentials) credentials; - if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) { - pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert()); - } - } - return credentials.initSslContext(); - } - -// @Override -// public void configure(MqttClientConfig config) { -// config.setProtocolVersion(MqttVersion.MQTT_3_1_1); -// config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); -// if (credentials instanceof AzureIotHubSasCredentials) { -// AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials; -// config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), sasCredentials.getSasKey())); -// } -// } - }); - + } this.mqttClient = initClient(ctx); } catch (Exception e) { throw new TbNodeException(e); } } + + protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + config.setProtocolVersion(MqttVersion.MQTT_3_1_1); + config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); + ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); + if (CredentialsType.SAS == credentials.getType()) { + config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), ((AzureIotHubSasCredentials) credentials).getSasKey())); + } + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index a2921029bc..17dc6b5bce 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -35,6 +35,7 @@ import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory; import org.springframework.http.client.Netty4ClientHttpRequestFactory; +import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.util.StringUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -134,6 +135,9 @@ public class TbHttpClient { requestFactory.setReadTimeout(config.getReadTimeoutMs()); httpClient = new AsyncRestTemplate(requestFactory); } else if (config.isUseSimpleClientHttpFactory()) { + if (CredentialsType.CERT_PEM == config.getCredentials().getType()) { + throw new TbNodeException("Simple HTTP Factory does not support CERT PEM credentials!"); + } httpClient = new AsyncRestTemplate(); } else { this.eventLoopGroup = new NioEventLoopGroup(); @@ -231,7 +235,13 @@ public class TbHttpClient { private HttpHeaders prepareHeaders(TbMsgMetaData metaData) { HttpHeaders headers = new HttpHeaders(); config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData))); - getBasicAuthHeaderValue(config.getCredentials()).ifPresent(authString -> headers.add("Authorization", authString)); + ClientCredentials credentials = config.getCredentials(); + if (CredentialsType.BASIC == credentials.getType()) { + BasicCredentials basicCredentials = (BasicCredentials) credentials; + String authString = basicCredentials.getUsername() + ":" + basicCredentials.getPassword(); + String encodedAuthString = new String(Base64.encodeBase64(authString.getBytes(StandardCharsets.UTF_8))); + headers.add("Authorization", "Basic " + encodedAuthString); + } return headers; } @@ -266,13 +276,4 @@ public class TbHttpClient { } } - public static Optional getBasicAuthHeaderValue(ClientCredentials credentials) { - if (CredentialsType.BASIC == credentials.getType()) { - BasicCredentials basicCredentials = (BasicCredentials) credentials; - String authString = basicCredentials.getUsername() + ":" + basicCredentials.getPassword(); - String encodedAuthString = new String(Base64.encodeBase64(authString.getBytes(StandardCharsets.UTF_8))); - return Optional.of("Basic " + encodedAuthString); - } - return Optional.empty(); - } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/credentials/BasicCredentialsTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/credentials/BasicCredentialsTest.java deleted file mode 100644 index 9230e39dc4..0000000000 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/credentials/BasicCredentialsTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright © 2016-2021 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.rest.credentials; - -import org.junit.Assert; -import org.junit.Test; -import org.thingsboard.rule.engine.credentials.BasicCredentials; -import org.thingsboard.rule.engine.rest.TbHttpClient; - -public class BasicCredentialsTest { - @Test - public void getBasicAuthHeaderValueTest() { - BasicCredentials credentials = new BasicCredentials(); - credentials.setUsername("testUser"); - credentials.setPassword("testPwd"); - String actualHeaderValue = TbHttpClient.getBasicAuthHeaderValue(credentials).get(); - String expectedHeaderValue = "Basic dGVzdFVzZXI6dGVzdFB3ZA=="; - - Assert.assertEquals(expectedHeaderValue, actualHeaderValue); - } -}