Browse Source

Refactoring of HTTP and MQTT client credentials

pull/3985/head
Andrii Shvaika 5 years ago
parent
commit
90334db157
  1. 3
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java
  2. 39
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
  3. 26
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java
  4. 50
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java
  5. 21
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java
  6. 34
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/credentials/BasicCredentialsTest.java

3
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java

@ -66,6 +66,7 @@ public class CertPemCredentials implements ClientCredentials {
return CredentialsType.CERT_PEM;
}
@Override
public SslContext initSslContext() {
try {
Security.addProvider(new BouncyCastleProvider());
@ -120,7 +121,7 @@ public class CertPemCredentials implements ClientCredentials {
return keyManagerFactory;
}
private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
protected TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
X509Certificate caCertHolder;
caCertHolder = readCertFile(caCert);

39
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java

@ -20,6 +20,7 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.util.StringUtils;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
@ -78,14 +79,14 @@ public class TbMqttNode implements TbNode {
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData());
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
.addListener(future -> {
if (future.isSuccess()) {
ctx.tellSuccess(msg);
} else {
TbMsg next = processException(ctx, msg, future.cause());
ctx.tellFailure(next, future.cause());
}
}
);
if (future.isSuccess()) {
ctx.tellSuccess(msg);
} else {
TbMsg next = processException(ctx, msg, future.cause());
ctx.tellFailure(next, future.cause());
}
}
);
}
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
@ -108,12 +109,7 @@ public class TbMqttNode implements TbNode {
}
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials();
if (credentials.getType() == CredentialsType.BASIC) {
config.setUsername(((BasicCredentials) credentials).getUsername());
config.setPassword(((BasicCredentials) credentials).getPassword());
}
prepareMqttClientConfig(config);
MqttClient client = MqttClient.create(config, null);
client.setEventLoop(ctx.getSharedEventLoop());
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
@ -135,14 +131,17 @@ public class TbMqttNode implements TbNode {
return client;
}
private SslContext getSslContext() throws SSLException {
protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials();
SslContext sslContext = credentials.initSslContext();
if (!this.mqttNodeConfiguration.isSsl() &&
(credentials.getType() == CredentialsType.ANONYMOUS || credentials.getType() == CredentialsType.BASIC)) {
sslContext = null;
if (credentials.getType() == CredentialsType.BASIC) {
BasicCredentials basicCredentials = (BasicCredentials) credentials;
config.setUsername(basicCredentials.getUsername());
config.setPassword(basicCredentials.getPassword());
}
return sslContext;
}
private SslContext getSslContext() throws SSLException {
return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
}
}

26
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java

@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.credentials.CredentialsType;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
@ -63,29 +64,4 @@ public class AzureIotHubSasCredentials extends CertPemCredentials {
return CredentialsType.SAS;
}
private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
X509Certificate caCertHolder;
caCertHolder = readCertFile(caCert);
KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
caKeyStore.load(null, null);
caKeyStore.setCertificateEntry("caCert-cert", caCertHolder);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(caKeyStore);
return trustManagerFactory;
}
private X509Certificate readCertFile(String fileContent) throws Exception {
X509Certificate certificate = null;
if (fileContent != null && !fileContent.trim().isEmpty()) {
fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.replaceAll("\\s", "");
byte[] decoded = Base64.decodeBase64(fileContent);
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded));
}
return certificate;
}
}

50
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java

@ -15,14 +15,18 @@
*/
package org.thingsboard.rule.engine.mqtt.azure;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.AzureIotHubUtil;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.credentials.BasicCredentials;
import org.thingsboard.rule.engine.credentials.CertPemCredentials;
import org.thingsboard.rule.engine.credentials.ClientCredentials;
import org.thingsboard.rule.engine.credentials.CredentialsType;
@ -50,42 +54,24 @@ public class TbAzureIotHubNode extends TbMqttNode {
mqttNodeConfiguration.setPort(8883);
mqttNodeConfiguration.setCleanSession(true);
ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
mqttNodeConfiguration.setCredentials(new ClientCredentials() {
@Override
public CredentialsType getType() {
return credentials.getType();
if (CredentialsType.CERT_PEM == credentials.getType()) {
CertPemCredentials pemCredentials = (CertPemCredentials) credentials;
if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) {
pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
}
@Override
public SslContext initSslContext() throws SSLException {
if (credentials instanceof AzureIotHubSasCredentials) {
AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
if (sasCredentials.getCaCert() == null || sasCredentials.getCaCert().isEmpty()) {
sasCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
}
} else if (credentials instanceof CertPemCredentials) {
CertPemCredentials pemCredentials = (CertPemCredentials) credentials;
if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) {
pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
}
}
return credentials.initSslContext();
}
// @Override
// public void configure(MqttClientConfig config) {
// config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
// config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
// if (credentials instanceof AzureIotHubSasCredentials) {
// AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
// config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), sasCredentials.getSasKey()));
// }
// }
});
}
this.mqttClient = initClient(ctx);
} catch (Exception e) {
throw new TbNodeException(e);
}
}
protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
if (CredentialsType.SAS == credentials.getType()) {
config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), ((AzureIotHubSasCredentials) credentials).getSasKey()));
}
}
}

21
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java

@ -35,6 +35,7 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@ -134,6 +135,9 @@ public class TbHttpClient {
requestFactory.setReadTimeout(config.getReadTimeoutMs());
httpClient = new AsyncRestTemplate(requestFactory);
} else if (config.isUseSimpleClientHttpFactory()) {
if (CredentialsType.CERT_PEM == config.getCredentials().getType()) {
throw new TbNodeException("Simple HTTP Factory does not support CERT PEM credentials!");
}
httpClient = new AsyncRestTemplate();
} else {
this.eventLoopGroup = new NioEventLoopGroup();
@ -231,7 +235,13 @@ public class TbHttpClient {
private HttpHeaders prepareHeaders(TbMsgMetaData metaData) {
HttpHeaders headers = new HttpHeaders();
config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData)));
getBasicAuthHeaderValue(config.getCredentials()).ifPresent(authString -> headers.add("Authorization", authString));
ClientCredentials credentials = config.getCredentials();
if (CredentialsType.BASIC == credentials.getType()) {
BasicCredentials basicCredentials = (BasicCredentials) credentials;
String authString = basicCredentials.getUsername() + ":" + basicCredentials.getPassword();
String encodedAuthString = new String(Base64.encodeBase64(authString.getBytes(StandardCharsets.UTF_8)));
headers.add("Authorization", "Basic " + encodedAuthString);
}
return headers;
}
@ -266,13 +276,4 @@ public class TbHttpClient {
}
}
public static Optional<String> getBasicAuthHeaderValue(ClientCredentials credentials) {
if (CredentialsType.BASIC == credentials.getType()) {
BasicCredentials basicCredentials = (BasicCredentials) credentials;
String authString = basicCredentials.getUsername() + ":" + basicCredentials.getPassword();
String encodedAuthString = new String(Base64.encodeBase64(authString.getBytes(StandardCharsets.UTF_8)));
return Optional.of("Basic " + encodedAuthString);
}
return Optional.empty();
}
}

34
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/credentials/BasicCredentialsTest.java

@ -1,34 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest.credentials;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.rule.engine.credentials.BasicCredentials;
import org.thingsboard.rule.engine.rest.TbHttpClient;
public class BasicCredentialsTest {
@Test
public void getBasicAuthHeaderValueTest() {
BasicCredentials credentials = new BasicCredentials();
credentials.setUsername("testUser");
credentials.setPassword("testPwd");
String actualHeaderValue = TbHttpClient.getBasicAuthHeaderValue(credentials).get();
String expectedHeaderValue = "Basic dGVzdFVzZXI6dGVzdFB3ZA==";
Assert.assertEquals(expectedHeaderValue, actualHeaderValue);
}
}
Loading…
Cancel
Save