From 0ea799d8e33e1d91a4f3fcf17ea034d82fe10ea0 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Mon, 14 May 2018 11:20:24 +0300 Subject: [PATCH] Add Netty MQTT Client module. --- netty-mqtt/.gitignore | 7 + netty-mqtt/pom.xml | 99 ++++ .../mqtt/ChannelClosedException.java | 43 ++ .../thingsboard/mqtt/MqttChannelHandler.java | 269 ++++++++++ .../java/org/thingsboard/mqtt/MqttClient.java | 205 ++++++++ .../thingsboard/mqtt/MqttClientCallback.java | 29 ++ .../thingsboard/mqtt/MqttClientConfig.java | 149 ++++++ .../org/thingsboard/mqtt/MqttClientImpl.java | 484 ++++++++++++++++++ .../thingsboard/mqtt/MqttConnectResult.java | 45 ++ .../org/thingsboard/mqtt/MqttHandler.java | 23 + .../mqtt/MqttIncomingQos2Publish.java | 48 ++ .../org/thingsboard/mqtt/MqttLastWill.java | 154 ++++++ .../thingsboard/mqtt/MqttPendingPublish.java | 101 ++++ .../mqtt/MqttPendingSubscribtion.java | 102 ++++ .../mqtt/MqttPendingUnsubscribtion.java | 55 ++ .../org/thingsboard/mqtt/MqttPingHandler.java | 98 ++++ .../thingsboard/mqtt/MqttSubscribtion.java | 84 +++ .../mqtt/RetransmissionHandler.java | 66 +++ pom.xml | 17 +- rule-engine/rule-engine-components/pom.xml | 8 +- .../rule/engine/mqtt/TbMqttNode.java | 6 +- .../credentials/AnonymousCredentials.java | 2 +- .../mqtt/credentials/BasicCredentials.java | 2 +- .../credentials/CertPemClientCredentials.java | 2 +- .../credentials/MqttClientCredentials.java | 2 +- 25 files changed, 2083 insertions(+), 17 deletions(-) create mode 100644 netty-mqtt/.gitignore create mode 100644 netty-mqtt/pom.xml create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java diff --git a/netty-mqtt/.gitignore b/netty-mqtt/.gitignore new file mode 100644 index 0000000000..4d2302b226 --- /dev/null +++ b/netty-mqtt/.gitignore @@ -0,0 +1,7 @@ +.idea/ +*.ipr +*.iws +*.ids +*.iml +logs +target diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml new file mode 100644 index 0000000000..4e9c84ffd5 --- /dev/null +++ b/netty-mqtt/pom.xml @@ -0,0 +1,99 @@ + + + 4.0.0 + + org.thingsboard + 1.4.1-SNAPSHOT + thingsboard + + org.thingsboard + netty-mqtt + 1.4.1-SNAPSHOT + jar + + Netty MQTT Client + https://thingsboard.io + + + UTF-8 + ${basedir}/.. + + + + + io.netty + netty-codec-mqtt + + + io.netty + netty-handler + + + com.google.code.findbugs + jsr305 + 3.0.1 + true + + + com.google.guava + guava + + + + + + jk-5-maven + jk-5's maven server + sftp://10.2.1.2/opt/maven + + + + + + + org.apache.maven.wagon + wagon-ssh + 2.6 + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + true + + + + + + + \ No newline at end of file diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java new file mode 100644 index 0000000000..a76b082ba9 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +/** + * Created by Valerii Sosliuk on 12/26/2017. + */ +public class ChannelClosedException extends RuntimeException { + + private static final long serialVersionUID = 6266638352424706909L; + + public ChannelClosedException() { + } + + public ChannelClosedException(String message) { + super(message); + } + + public ChannelClosedException(String message, Throwable cause) { + super(message, cause); + } + + public ChannelClosedException(Throwable cause) { + super(cause); + } + + public ChannelClosedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java new file mode 100644 index 0000000000..ef5e7a531d --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -0,0 +1,269 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import com.google.common.collect.ImmutableSet; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.mqtt.*; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Promise; + +final class MqttChannelHandler extends SimpleChannelInboundHandler { + + private final MqttClientImpl client; + private final Promise connectFuture; + + MqttChannelHandler(MqttClientImpl client, Promise connectFuture) { + this.client = client; + this.connectFuture = connectFuture; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { + switch (msg.fixedHeader().messageType()) { + case CONNACK: + handleConack(ctx.channel(), (MqttConnAckMessage) msg); + break; + case SUBACK: + handleSubAck((MqttSubAckMessage) msg); + break; + case PUBLISH: + handlePublish(ctx.channel(), (MqttPublishMessage) msg); + break; + case UNSUBACK: + handleUnsuback((MqttUnsubAckMessage) msg); + break; + case PUBACK: + handlePuback((MqttPubAckMessage) msg); + break; + case PUBREC: + handlePubrec(ctx.channel(), msg); + break; + case PUBREL: + handlePubrel(ctx.channel(), msg); + break; + case PUBCOMP: + handlePubcomp(msg); + break; + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( + this.client.getClientConfig().getProtocolVersion().protocolName(), // Protocol Name + this.client.getClientConfig().getProtocolVersion().protocolLevel(), // Protocol Level + this.client.getClientConfig().getUsername() != null, // Has Username + this.client.getClientConfig().getPassword() != null, // Has Password + this.client.getClientConfig().getLastWill() != null // Will Retain + && this.client.getClientConfig().getLastWill().isRetain(), + this.client.getClientConfig().getLastWill() != null // Will QOS + ? this.client.getClientConfig().getLastWill().getQos().value() + : 0, + this.client.getClientConfig().getLastWill() != null, // Has Will + this.client.getClientConfig().isCleanSession(), // Clean Session + this.client.getClientConfig().getTimeoutSeconds() // Timeout + ); + MqttConnectPayload payload = new MqttConnectPayload( + this.client.getClientConfig().getClientId(), + this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getTopic() : null, + this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getMessage().getBytes(CharsetUtil.UTF_8) : null, + this.client.getClientConfig().getUsername(), + this.client.getClientConfig().getPassword() != null ? this.client.getClientConfig().getPassword().getBytes(CharsetUtil.UTF_8) : null + ); + ctx.channel().writeAndFlush(new MqttConnectMessage(fixedHeader, variableHeader, payload)); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + } + + private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { + for (MqttSubscribtion subscribtion : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { + if (subscribtion.matches(message.variableHeader().topicName())) { + if (subscribtion.isOnce() && subscribtion.isCalled()) { + continue; + } + message.payload().markReaderIndex(); + subscribtion.setCalled(true); + subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); + if (subscribtion.isOnce()) { + this.client.off(subscribtion.getTopic(), subscribtion.getHandler()); + } + message.payload().resetReaderIndex(); + } + } + /*Set subscribtions = ImmutableSet.copyOf(this.client.getSubscriptions().get(message.variableHeader().topicName())); + for (MqttSubscribtion subscribtion : subscribtions) { + if(subscribtion.isOnce() && subscribtion.isCalled()){ + continue; + } + message.payload().markReaderIndex(); + subscribtion.setCalled(true); + subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); + if(subscribtion.isOnce()){ + this.client.off(subscribtion.getTopic(), subscribtion.getHandler()); + } + message.payload().resetReaderIndex(); + }*/ + message.payload().release(); + } + + private void handleConack(Channel channel, MqttConnAckMessage message) { + switch (message.variableHeader().connectReturnCode()) { + case CONNECTION_ACCEPTED: + this.connectFuture.setSuccess(new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture())); + + this.client.getPendingSubscribtions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> { + channel.write(e.getValue().getSubscribeMessage()); + e.getValue().setSent(true); + }); + + this.client.getPendingPublishes().forEach((id, publish) -> { + if (publish.isSent()) return; + channel.write(publish.getMessage()); + publish.setSent(true); + if (publish.getQos() == MqttQoS.AT_MOST_ONCE) { + publish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 + this.client.getPendingPublishes().remove(publish.getMessageId()); + } + }); + channel.flush(); + break; + + case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: + case CONNECTION_REFUSED_IDENTIFIER_REJECTED: + case CONNECTION_REFUSED_NOT_AUTHORIZED: + case CONNECTION_REFUSED_SERVER_UNAVAILABLE: + case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: + this.connectFuture.setSuccess(new MqttConnectResult(false, message.variableHeader().connectReturnCode(), channel.closeFuture())); + channel.close(); + // Don't start reconnect logic here + break; + } + } + + private void handleSubAck(MqttSubAckMessage message) { + MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscribtions().remove(message.variableHeader().messageId()); + if (pendingSubscription == null) { + return; + } + pendingSubscription.onSubackReceived(); + for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) { + MqttSubscribtion subscribtion = new MqttSubscribtion(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce()); + this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion); + this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion); + } + this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic()); + + this.client.getServerSubscribtions().add(pendingSubscription.getTopic()); + + if (!pendingSubscription.getFuture().isDone()) { + pendingSubscription.getFuture().setSuccess(null); + } + } + + private void handlePublish(Channel channel, MqttPublishMessage message) { + switch (message.fixedHeader().qosLevel()) { + case AT_MOST_ONCE: + invokeHandlersForIncomingPublish(message); + break; + + case AT_LEAST_ONCE: + invokeHandlersForIncomingPublish(message); + if (message.variableHeader().messageId() != -1) { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId()); + channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + } + break; + + case EXACTLY_ONCE: + if (message.variableHeader().messageId() != -1) { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId()); + MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader); + + MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage); + this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().messageId(), incomingQos2Publish); + message.payload().retain(); + incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket); + + channel.writeAndFlush(pubrecMessage); + } + break; + } + } + + private void handleUnsuback(MqttUnsubAckMessage message) { + MqttPendingUnsubscribtion unsubscribtion = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId()); + if (unsubscribtion == null) { + return; + } + unsubscribtion.onUnsubackReceived(); + this.client.getServerSubscribtions().remove(unsubscribtion.getTopic()); + unsubscribtion.getFuture().setSuccess(null); + this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId()); + } + + private void handlePuback(MqttPubAckMessage message) { + MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(message.variableHeader().messageId()); + pendingPublish.getFuture().setSuccess(null); + pendingPublish.onPubackReceived(); + this.client.getPendingPublishes().remove(message.variableHeader().messageId()); + pendingPublish.getPayload().release(); + } + + private void handlePubrec(Channel channel, MqttMessage message) { + MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + pendingPublish.onPubackReceived(); + + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); + MqttMessage pubrelMessage = new MqttMessage(fixedHeader, variableHeader); + channel.writeAndFlush(pubrelMessage); + + pendingPublish.setPubrelMessage(pubrelMessage); + pendingPublish.startPubrelRetransmissionTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket); + } + + private void handlePubrel(Channel channel, MqttMessage message) { + if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { + MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); + incomingQos2Publish.onPubrelReceived(); + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().messageId()); + } + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + } + + private void handlePubcomp(MqttMessage message) { + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); + MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(variableHeader.messageId()); + pendingPublish.getFuture().setSuccess(null); + this.client.getPendingPublishes().remove(variableHeader.messageId()); + pendingPublish.getPayload().release(); + pendingPublish.onPubcompReceived(); + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java new file mode 100644 index 0000000000..6563525783 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java @@ -0,0 +1,205 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.concurrent.Future; + +public interface MqttClient { + + /** + * Connect to the specified hostname/ip. By default uses port 1883. + * If you want to change the port number, see {@link #connect(String, int)} + * + * @param host The ip address or host to connect to + * @return A future which will be completed when the connection is opened and we received an CONNACK + */ + Future connect(String host); + + /** + * Connect to the specified hostname/ip using the specified port + * + * @param host The ip address or host to connect to + * @param port The tcp port to connect to + * @return A future which will be completed when the connection is opened and we received an CONNACK + */ + Future connect(String host, int port); + + /** + * + * @return boolean value indicating if channel is active + */ + boolean isConnected(); + + /** + * Attempt reconnect to the host that was attempted with {@link #connect(String, int)} method before + * + * @return A future which will be completed when the connection is opened and we received an CONNACK + * @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted + */ + Future reconnect(); + + /** + * Retrieve the netty {@link EventLoopGroup} we are using + * @return The netty {@link EventLoopGroup} we use for the connection + */ + EventLoopGroup getEventLoop(); + + /** + * By default we use the netty {@link NioEventLoopGroup}. + * If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)} + * If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)} + * + * @param eventLoop The new eventloop to use + */ + void setEventLoop(EventLoopGroup eventLoop); + + /** + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @return A future which will be completed when the server acknowledges our subscribe request + */ + Future on(String topic, MqttHandler handler); + + /** + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @param qos The qos to request to the server + * @return A future which will be completed when the server acknowledges our subscribe request + */ + Future on(String topic, MqttHandler handler, MqttQoS qos); + + /** + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @return A future which will be completed when the server acknowledges our subscribe request + */ + Future once(String topic, MqttHandler handler); + + /** + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @param qos The qos to request to the server + * @return A future which will be completed when the server acknowledges our subscribe request + */ + Future once(String topic, MqttHandler handler, MqttQoS qos); + + /** + * Remove the subscribtion for the given topic and handler + * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)} + * + * @param topic The topic to unsubscribe for + * @param handler The handler to unsubscribe + * @return A future which will be completed when the server acknowledges our unsubscribe request + */ + Future off(String topic, MqttHandler handler); + + /** + * Remove all subscribtions for the given topic. + * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)} + * + * @param topic The topic to unsubscribe for + * @return A future which will be completed when the server acknowledges our unsubscribe request + */ + Future off(String topic); + + /** + * Publish a message to the given payload + * @param topic The topic to publish to + * @param payload The payload to send + * @return A future which will be completed when the message is sent out of the MqttClient + */ + Future publish(String topic, ByteBuf payload); + + /** + * Publish a message to the given payload, using the given qos + * @param topic The topic to publish to + * @param payload The payload to send + * @param qos The qos to use while publishing + * @return A future which will be completed when the message is delivered to the server + */ + Future publish(String topic, ByteBuf payload, MqttQoS qos); + + /** + * Publish a message to the given payload, using optional retain + * @param topic The topic to publish to + * @param payload The payload to send + * @param retain true if you want to retain the message on the server, false otherwise + * @return A future which will be completed when the message is sent out of the MqttClient + */ + Future publish(String topic, ByteBuf payload, boolean retain); + + /** + * Publish a message to the given payload, using the given qos and optional retain + * @param topic The topic to publish to + * @param payload The payload to send + * @param qos The qos to use while publishing + * @param retain true if you want to retain the message on the server, false otherwise + * @return A future which will be completed when the message is delivered to the server + */ + Future publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain); + + /** + * Retrieve the MqttClient configuration + * @return The {@link MqttClientConfig} instance we use + */ + MqttClientConfig getClientConfig(); + + /** + * Construct the MqttClientImpl with default config + */ + static MqttClient create(){ + return new MqttClientImpl(); + } + + /** + * Construct the MqttClientImpl with additional config. + * This config can also be changed using the {@link #getClientConfig()} function + * + * @param config The config object to use while looking for settings + */ + static MqttClient create(MqttClientConfig config){ + return new MqttClientImpl(config); + } + + + /** + * Send disconnect and close channel + * + */ + void disconnect(); + + /** + * Sets the {@see #MqttClientCallback} object for this MqttClient + * @param callback The callback to be set + */ + void setCallback(MqttClientCallback callback); + +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java new file mode 100644 index 0000000000..d7f0a08409 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +/** + * Created by Valerii Sosliuk on 12/30/2017. + */ +public interface MqttClientCallback { + + /** + * This method is called when the connection to the server is lost. + * + * @param cause the reason behind the loss of connection. + */ + public void connectionLost(Throwable cause); +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java new file mode 100644 index 0000000000..a59d83b14b --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -0,0 +1,149 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.Channel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.mqtt.MqttVersion; +import io.netty.handler.ssl.SslContext; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Random; + +@SuppressWarnings({"WeakerAccess", "unused"}) +public final class MqttClientConfig { + + private final SslContext sslContext; + private final String randomClientId; + + private String clientId; + private int timeoutSeconds = 60; + private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; + @Nullable private String username = null; + @Nullable private String password = null; + private boolean cleanSession = true; + @Nullable private MqttLastWill lastWill; + private Class channelClass = NioSocketChannel.class; + + private boolean reconnect = true; + + public MqttClientConfig() { + this(null); + } + + public MqttClientConfig(SslContext sslContext) { + this.sslContext = sslContext; + Random random = new Random(); + String id = "netty-mqtt/"; + String[] options = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".split(""); + for(int i = 0; i < 8; i++){ + id += options[random.nextInt(options.length)]; + } + this.clientId = id; + this.randomClientId = id; + } + + @Nonnull + public String getClientId() { + return clientId; + } + + public void setClientId(@Nullable String clientId) { + if(clientId == null){ + this.clientId = randomClientId; + }else{ + this.clientId = clientId; + } + } + + public int getTimeoutSeconds() { + return timeoutSeconds; + } + + public void setTimeoutSeconds(int timeoutSeconds) { + if(timeoutSeconds != -1 && timeoutSeconds <= 0){ + throw new IllegalArgumentException("timeoutSeconds must be > 0 or -1"); + } + this.timeoutSeconds = timeoutSeconds; + } + + public MqttVersion getProtocolVersion() { + return protocolVersion; + } + + public void setProtocolVersion(MqttVersion protocolVersion) { + if(protocolVersion == null){ + throw new NullPointerException("protocolVersion"); + } + this.protocolVersion = protocolVersion; + } + + @Nullable + public String getUsername() { + return username; + } + + public void setUsername(@Nullable String username) { + this.username = username; + } + + @Nullable + public String getPassword() { + return password; + } + + public void setPassword(@Nullable String password) { + this.password = password; + } + + public boolean isCleanSession() { + return cleanSession; + } + + public void setCleanSession(boolean cleanSession) { + this.cleanSession = cleanSession; + } + + @Nullable + public MqttLastWill getLastWill() { + return lastWill; + } + + public void setLastWill(@Nullable MqttLastWill lastWill) { + this.lastWill = lastWill; + } + + public Class getChannelClass() { + return channelClass; + } + + public void setChannelClass(Class channelClass) { + this.channelClass = channelClass; + } + + public SslContext getSslContext() { + return sslContext; + } + + public boolean isReconnect() { + return reconnect; + } + + public void setReconnect(boolean reconnect) { + this.reconnect = reconnect; + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java new file mode 100644 index 0000000000..391410525f --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -0,0 +1,484 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.mqtt.*; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times + */ +@SuppressWarnings({"WeakerAccess", "unused"}) +final class MqttClientImpl implements MqttClient { + + private final Set serverSubscribtions = new HashSet<>(); + private final IntObjectHashMap pendingServerUnsubscribes = new IntObjectHashMap<>(); + private final IntObjectHashMap qos2PendingIncomingPublishes = new IntObjectHashMap<>(); + private final IntObjectHashMap pendingPublishes = new IntObjectHashMap<>(); + private final HashMultimap subscriptions = HashMultimap.create(); + private final IntObjectHashMap pendingSubscribtions = new IntObjectHashMap<>(); + private final Set pendingSubscribeTopics = new HashSet<>(); + private final HashMultimap handlerToSubscribtion = HashMultimap.create(); + private final AtomicInteger nextMessageId = new AtomicInteger(1); + + private final MqttClientConfig clientConfig; + + private EventLoopGroup eventLoop; + + private Channel channel; + + private boolean disconnected = false; + private String host; + private int port; + private MqttClientCallback callback; + + + /** + * Construct the MqttClientImpl with default config + */ + public MqttClientImpl() { + this.clientConfig = new MqttClientConfig(); + } + + /** + * Construct the MqttClientImpl with additional config. + * This config can also be changed using the {@link #getClientConfig()} function + * + * @param clientConfig The config object to use while looking for settings + */ + public MqttClientImpl(MqttClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + /** + * Connect to the specified hostname/ip. By default uses port 1883. + * If you want to change the port number, see {@link #connect(String, int)} + * + * @param host The ip address or host to connect to + * @return A future which will be completed when the connection is opened and we received an CONNACK + */ + @Override + public Future connect(String host) { + return connect(host, 1883); + } + + /** + * Connect to the specified hostname/ip using the specified port + * + * @param host The ip address or host to connect to + * @param port The tcp port to connect to + * @return A future which will be completed when the connection is opened and we received an CONNACK + */ + @Override + public Future connect(String host, int port) { + if (this.eventLoop == null) { + this.eventLoop = new NioEventLoopGroup(); + } + this.host = host; + this.port = port; + + Promise connectFuture = new DefaultPromise<>(this.eventLoop.next()); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(this.eventLoop); + bootstrap.channel(clientConfig.getChannelClass()); + bootstrap.remoteAddress(host, port); + bootstrap.handler(new MqttChannelInitializer(connectFuture, host, port, clientConfig.getSslContext())); + ChannelFuture future = bootstrap.connect(); + future.addListener((ChannelFutureListener) f -> { + if (f.isSuccess()) { + MqttClientImpl.this.channel = f.channel(); + } else if (clientConfig.isReconnect() && !disconnected) { + eventLoop.schedule((Runnable) () -> connect(host, port), 1L, TimeUnit.SECONDS); + } + }); + return connectFuture; + } + + @Override + public boolean isConnected() { + if (!disconnected) { + return channel == null ? false : channel.isActive(); + }; + return false; + } + + @Override + public Future reconnect() { + if (host == null) { + throw new IllegalStateException("Cannot reconnect. Call connect() first"); + } + return connect(host, port); + } + + /** + * Retrieve the netty {@link EventLoopGroup} we are using + * + * @return The netty {@link EventLoopGroup} we use for the connection + */ + @Override + public EventLoopGroup getEventLoop() { + return eventLoop; + } + + /** + * By default we use the netty {@link NioEventLoopGroup}. + * If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)} + * If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)} + * + * @param eventLoop The new eventloop to use + */ + @Override + public void setEventLoop(EventLoopGroup eventLoop) { + this.eventLoop = eventLoop; + } + + /** + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @return A future which will be completed when the server acknowledges our subscribe request + */ + @Override + public Future on(String topic, MqttHandler handler) { + return on(topic, handler, MqttQoS.AT_MOST_ONCE); + } + + /** + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @param qos The qos to request to the server + * @return A future which will be completed when the server acknowledges our subscribe request + */ + @Override + public Future on(String topic, MqttHandler handler, MqttQoS qos) { + return createSubscribtion(topic, handler, false, qos); + } + + /** + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @return A future which will be completed when the server acknowledges our subscribe request + */ + @Override + public Future once(String topic, MqttHandler handler) { + return once(topic, handler, MqttQoS.AT_MOST_ONCE); + } + + /** + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed + * + * @param topic The topic filter to subscribe to + * @param handler The handler to invoke when we receive a message + * @param qos The qos to request to the server + * @return A future which will be completed when the server acknowledges our subscribe request + */ + @Override + public Future once(String topic, MqttHandler handler, MqttQoS qos) { + return createSubscribtion(topic, handler, true, qos); + } + + /** + * Remove the subscribtion for the given topic and handler + * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)} + * + * @param topic The topic to unsubscribe for + * @param handler The handler to unsubscribe + * @return A future which will be completed when the server acknowledges our unsubscribe request + */ + @Override + public Future off(String topic, MqttHandler handler) { + Promise future = new DefaultPromise<>(this.eventLoop.next()); + for (MqttSubscribtion subscribtion : this.handlerToSubscribtion.get(handler)) { + this.subscriptions.remove(topic, subscribtion); + } + this.handlerToSubscribtion.removeAll(handler); + this.checkSubscribtions(topic, future); + return future; + } + + /** + * Remove all subscribtions for the given topic. + * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)} + * + * @param topic The topic to unsubscribe for + * @return A future which will be completed when the server acknowledges our unsubscribe request + */ + @Override + public Future off(String topic) { + Promise future = new DefaultPromise<>(this.eventLoop.next()); + ImmutableSet subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic)); + for (MqttSubscribtion subscribtion : subscribtions) { + for (MqttSubscribtion handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) { + this.subscriptions.remove(topic, handSub); + } + this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion); + } + this.checkSubscribtions(topic, future); + return future; + } + + /** + * Publish a message to the given payload + * + * @param topic The topic to publish to + * @param payload The payload to send + * @return A future which will be completed when the message is sent out of the MqttClient + */ + @Override + public Future publish(String topic, ByteBuf payload) { + return publish(topic, payload, MqttQoS.AT_MOST_ONCE, false); + } + + /** + * Publish a message to the given payload, using the given qos + * + * @param topic The topic to publish to + * @param payload The payload to send + * @param qos The qos to use while publishing + * @return A future which will be completed when the message is delivered to the server + */ + @Override + public Future publish(String topic, ByteBuf payload, MqttQoS qos) { + return publish(topic, payload, qos, false); + } + + /** + * Publish a message to the given payload, using optional retain + * + * @param topic The topic to publish to + * @param payload The payload to send + * @param retain true if you want to retain the message on the server, false otherwise + * @return A future which will be completed when the message is sent out of the MqttClient + */ + @Override + public Future publish(String topic, ByteBuf payload, boolean retain) { + return publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain); + } + + /** + * Publish a message to the given payload, using the given qos and optional retain + * + * @param topic The topic to publish to + * @param payload The payload to send + * @param qos The qos to use while publishing + * @param retain true if you want to retain the message on the server, false otherwise + * @return A future which will be completed when the message is delivered to the server + */ + @Override + public Future publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain) { + Promise future = new DefaultPromise<>(this.eventLoop.next()); + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); + MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); + MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.messageId(), future, payload.retain(), message, qos); + ChannelFuture channelFuture = this.sendAndFlushPacket(message); + + if (channelFuture != null) { + pendingPublish.setSent(channelFuture != null); + if (channelFuture.cause() != null) { + future.setFailure(channelFuture.cause()); + return future; + } + } + if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) { + pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 + } else if (pendingPublish.isSent()) { + this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); + pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); + } + return future; + } + + /** + * Retrieve the MqttClient configuration + * + * @return The {@link MqttClientConfig} instance we use + */ + @Override + public MqttClientConfig getClientConfig() { + return clientConfig; + } + + @Override + public void disconnect() { + disconnected = true; + if (this.channel != null) { + MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)); + this.sendAndFlushPacket(message).addListener(future1 -> channel.close()); + } + } + + @Override + public void setCallback(MqttClientCallback callback) { + this.callback = callback; + } + + + ///////////////////////////////////////////// PRIVATE API ///////////////////////////////////////////// + + ChannelFuture sendAndFlushPacket(Object message) { + if (this.channel == null) { + return null; + } + if (this.channel.isActive()) { + return this.channel.writeAndFlush(message); + } + ChannelClosedException e = new ChannelClosedException("Channel is closed"); + if (callback != null) { + callback.connectionLost(e); + } + return this.channel.newFailedFuture(e); + } + + private MqttMessageIdVariableHeader getNewMessageId() { + this.nextMessageId.compareAndSet(0xffff, 1); + return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement()); + } + + private Future createSubscribtion(String topic, MqttHandler handler, boolean once, MqttQoS qos) { + if (this.pendingSubscribeTopics.contains(topic)) { + Optional> subscribtionEntry = this.pendingSubscribtions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny(); + if (subscribtionEntry.isPresent()) { + subscribtionEntry.get().getValue().addHandler(handler, once); + return subscribtionEntry.get().getValue().getFuture(); + } + } + if (this.serverSubscribtions.contains(topic)) { + MqttSubscribtion subscribtion = new MqttSubscribtion(topic, handler, once); + this.subscriptions.put(topic, subscribtion); + this.handlerToSubscribtion.put(handler, subscribtion); + return this.channel.newSucceededFuture(); + } + + Promise future = new DefaultPromise<>(this.eventLoop.next()); + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttTopicSubscription subscription = new MqttTopicSubscription(topic, qos); + MqttMessageIdVariableHeader variableHeader = getNewMessageId(); + MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription)); + MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload); + + final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message); + pendingSubscribtion.addHandler(handler, once); + this.pendingSubscribtions.put(variableHeader.messageId(), pendingSubscribtion); + this.pendingSubscribeTopics.add(topic); + pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened + + pendingSubscribtion.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket); + + return future; + } + + private void checkSubscribtions(String topic, Promise promise) { + if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscribtions.contains(topic)) { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = getNewMessageId(); + MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic)); + MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload); + + MqttPendingUnsubscribtion pendingUnsubscribtion = new MqttPendingUnsubscribtion(promise, topic, message); + this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscribtion); + pendingUnsubscribtion.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); + + this.sendAndFlushPacket(message); + } else { + promise.setSuccess(null); + } + } + + IntObjectHashMap getPendingSubscribtions() { + return pendingSubscribtions; + } + + HashMultimap getSubscriptions() { + return subscriptions; + } + + Set getPendingSubscribeTopics() { + return pendingSubscribeTopics; + } + + HashMultimap getHandlerToSubscribtion() { + return handlerToSubscribtion; + } + + Set getServerSubscribtions() { + return serverSubscribtions; + } + + IntObjectHashMap getPendingServerUnsubscribes() { + return pendingServerUnsubscribes; + } + + IntObjectHashMap getPendingPublishes() { + return pendingPublishes; + } + + IntObjectHashMap getQos2PendingIncomingPublishes() { + return qos2PendingIncomingPublishes; + } + + private class MqttChannelInitializer extends ChannelInitializer { + + private final Promise connectFuture; + private final String host; + private final int port; + private final SslContext sslContext; + + + public MqttChannelInitializer(Promise connectFuture, String host, int port, SslContext sslContext) { + this.connectFuture = connectFuture; + this.host = host; + this.port = port; + this.sslContext = sslContext; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + if (sslContext != null) { + ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port)); + } + + ch.pipeline().addLast("mqttDecoder", new MqttDecoder()); + ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE); + ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0)); + ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds())); + ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture)); + } + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java new file mode 100644 index 0000000000..5fa0e6da43 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.ChannelFuture; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; + +@SuppressWarnings({"WeakerAccess", "unused"}) +public final class MqttConnectResult { + + private final boolean success; + private final MqttConnectReturnCode returnCode; + private final ChannelFuture closeFuture; + + MqttConnectResult(boolean success, MqttConnectReturnCode returnCode, ChannelFuture closeFuture) { + this.success = success; + this.returnCode = returnCode; + this.closeFuture = closeFuture; + } + + public boolean isSuccess() { + return success; + } + + public MqttConnectReturnCode getReturnCode() { + return returnCode; + } + + public ChannelFuture getCloseFuture() { + return closeFuture; + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java new file mode 100644 index 0000000000..4d6d58cf1a --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.buffer.ByteBuf; + +public interface MqttHandler { + + void onMessage(String topic, ByteBuf payload); +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java new file mode 100644 index 0000000000..af84cde14f --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.EventLoop; +import io.netty.handler.codec.mqtt.*; + +import java.util.function.Consumer; + +final class MqttIncomingQos2Publish { + + private final MqttPublishMessage incomingPublish; + + private final RetransmissionHandler retransmissionHandler = new RetransmissionHandler<>(); + + MqttIncomingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) { + this.incomingPublish = incomingPublish; + + this.retransmissionHandler.setOriginalMessage(originalMessage); + } + + MqttPublishMessage getIncomingPublish() { + return incomingPublish; + } + + void startPubrecRetransmitTimer(EventLoop eventLoop, Consumer sendPacket) { + this.retransmissionHandler.setHandle((fixedHeader, originalMessage) -> + sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader()))); + this.retransmissionHandler.start(eventLoop); + } + + void onPubrelReceived() { + this.retransmissionHandler.stop(); + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java new file mode 100644 index 0000000000..1dadfcda8c --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java @@ -0,0 +1,154 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.handler.codec.mqtt.MqttQoS; + +@SuppressWarnings({"WeakerAccess", "unused", "SimplifiableIfStatement", "StringBufferReplaceableByString"}) +public final class MqttLastWill { + + private final String topic; + private final String message; + private final boolean retain; + private final MqttQoS qos; + + public MqttLastWill(String topic, String message, boolean retain, MqttQoS qos) { + if(topic == null){ + throw new NullPointerException("topic"); + } + if(message == null){ + throw new NullPointerException("message"); + } + if(qos == null){ + throw new NullPointerException("qos"); + } + this.topic = topic; + this.message = message; + this.retain = retain; + this.qos = qos; + } + + public String getTopic() { + return topic; + } + + public String getMessage() { + return message; + } + + public boolean isRetain() { + return retain; + } + + public MqttQoS getQos() { + return qos; + } + + public static MqttLastWill.Builder builder(){ + return new MqttLastWill.Builder(); + } + + public static final class Builder { + + private String topic; + private String message; + private boolean retain; + private MqttQoS qos; + + public String getTopic() { + return topic; + } + + public Builder setTopic(String topic) { + if(topic == null){ + throw new NullPointerException("topic"); + } + this.topic = topic; + return this; + } + + public String getMessage() { + return message; + } + + public Builder setMessage(String message) { + if(message == null){ + throw new NullPointerException("message"); + } + this.message = message; + return this; + } + + public boolean isRetain() { + return retain; + } + + public Builder setRetain(boolean retain) { + this.retain = retain; + return this; + } + + public MqttQoS getQos() { + return qos; + } + + public Builder setQos(MqttQoS qos) { + if(qos == null){ + throw new NullPointerException("qos"); + } + this.qos = qos; + return this; + } + + public MqttLastWill build(){ + return new MqttLastWill(topic, message, retain, qos); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MqttLastWill that = (MqttLastWill) o; + + if (retain != that.retain) return false; + if (!topic.equals(that.topic)) return false; + if (!message.equals(that.message)) return false; + return qos == that.qos; + + } + + @Override + public int hashCode() { + int result = topic.hashCode(); + result = 31 * result + message.hashCode(); + result = 31 * result + (retain ? 1 : 0); + result = 31 * result + qos.hashCode(); + return result; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("MqttLastWill{"); + sb.append("topic='").append(topic).append('\''); + sb.append(", message='").append(message).append('\''); + sb.append(", retain=").append(retain); + sb.append(", qos=").append(qos.name()); + sb.append('}'); + return sb.toString(); + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java new file mode 100644 index 0000000000..c656e84fdf --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java @@ -0,0 +1,101 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.concurrent.Promise; + +import java.util.function.Consumer; + +final class MqttPendingPublish { + + private final int messageId; + private final Promise future; + private final ByteBuf payload; + private final MqttPublishMessage message; + private final MqttQoS qos; + + private final RetransmissionHandler publishRetransmissionHandler = new RetransmissionHandler<>(); + private final RetransmissionHandler pubrelRetransmissionHandler = new RetransmissionHandler<>(); + + private boolean sent = false; + + MqttPendingPublish(int messageId, Promise future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) { + this.messageId = messageId; + this.future = future; + this.payload = payload; + this.message = message; + this.qos = qos; + + this.publishRetransmissionHandler.setOriginalMessage(message); + } + + int getMessageId() { + return messageId; + } + + Promise getFuture() { + return future; + } + + ByteBuf getPayload() { + return payload; + } + + boolean isSent() { + return sent; + } + + void setSent(boolean sent) { + this.sent = sent; + } + + MqttPublishMessage getMessage() { + return message; + } + + MqttQoS getQos() { + return qos; + } + + void startPublishRetransmissionTimer(EventLoop eventLoop, Consumer sendPacket) { + this.publishRetransmissionHandler.setHandle(((fixedHeader, originalMessage) -> + sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload.retain())))); + this.publishRetransmissionHandler.start(eventLoop); + } + + void onPubackReceived() { + this.publishRetransmissionHandler.stop(); + } + + void setPubrelMessage(MqttMessage pubrelMessage) { + this.pubrelRetransmissionHandler.setOriginalMessage(pubrelMessage); + } + + void startPubrelRetransmissionTimer(EventLoop eventLoop, Consumer sendPacket) { + this.pubrelRetransmissionHandler.setHandle((fixedHeader, originalMessage) -> + sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader()))); + this.pubrelRetransmissionHandler.start(eventLoop); + } + + void onPubcompReceived() { + this.pubrelRetransmissionHandler.stop(); + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java new file mode 100644 index 0000000000..782aef1e12 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java @@ -0,0 +1,102 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.EventLoop; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.util.concurrent.Promise; + +import java.util.HashSet; +import java.util.Set; +import java.util.function.Consumer; + +final class MqttPendingSubscribtion { + + private final Promise future; + private final String topic; + private final Set handlers = new HashSet<>(); + private final MqttSubscribeMessage subscribeMessage; + + private final RetransmissionHandler retransmissionHandler = new RetransmissionHandler<>(); + + private boolean sent = false; + + MqttPendingSubscribtion(Promise future, String topic, MqttSubscribeMessage message) { + this.future = future; + this.topic = topic; + this.subscribeMessage = message; + + this.retransmissionHandler.setOriginalMessage(message); + } + + Promise getFuture() { + return future; + } + + String getTopic() { + return topic; + } + + boolean isSent() { + return sent; + } + + void setSent(boolean sent) { + this.sent = sent; + } + + MqttSubscribeMessage getSubscribeMessage() { + return subscribeMessage; + } + + void addHandler(MqttHandler handler, boolean once){ + this.handlers.add(new MqttPendingHandler(handler, once)); + } + + Set getHandlers() { + return handlers; + } + + void startRetransmitTimer(EventLoop eventLoop, Consumer sendPacket) { + if(this.sent){ //If the packet is sent, we can start the retransmit timer + this.retransmissionHandler.setHandle((fixedHeader, originalMessage) -> + sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); + this.retransmissionHandler.start(eventLoop); + } + } + + void onSubackReceived(){ + this.retransmissionHandler.stop(); + } + + final class MqttPendingHandler { + private final MqttHandler handler; + private final boolean once; + + MqttPendingHandler(MqttHandler handler, boolean once) { + this.handler = handler; + this.once = once; + } + + MqttHandler getHandler() { + return handler; + } + + boolean isOnce() { + return once; + } + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java new file mode 100644 index 0000000000..a626a815e4 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.EventLoop; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; +import io.netty.util.concurrent.Promise; + +import java.util.function.Consumer; + +final class MqttPendingUnsubscribtion { + + private final Promise future; + private final String topic; + + private final RetransmissionHandler retransmissionHandler = new RetransmissionHandler<>(); + + MqttPendingUnsubscribtion(Promise future, String topic, MqttUnsubscribeMessage unsubscribeMessage) { + this.future = future; + this.topic = topic; + + this.retransmissionHandler.setOriginalMessage(unsubscribeMessage); + } + + Promise getFuture() { + return future; + } + + String getTopic() { + return topic; + } + + void startRetransmissionTimer(EventLoop eventLoop, Consumer sendPacket) { + this.retransmissionHandler.setHandle((fixedHeader, originalMessage) -> + sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); + this.retransmissionHandler.start(eventLoop); + } + + void onUnsubackReceived(){ + this.retransmissionHandler.stop(); + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java new file mode 100644 index 0000000000..d0fd998d69 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java @@ -0,0 +1,98 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.ScheduledFuture; + +import java.util.concurrent.TimeUnit; + +final class MqttPingHandler extends ChannelInboundHandlerAdapter { + + private final int keepaliveSeconds; + + private ScheduledFuture pingRespTimeout; + + MqttPingHandler(int keepaliveSeconds) { + this.keepaliveSeconds = keepaliveSeconds; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof MqttMessage)) { + ctx.fireChannelRead(msg); + return; + } + MqttMessage message = (MqttMessage) msg; + if(message.fixedHeader().messageType() == MqttMessageType.PINGREQ){ + this.handlePingReq(ctx.channel()); + } else if(message.fixedHeader().messageType() == MqttMessageType.PINGRESP){ + this.handlePingResp(); + }else{ + ctx.fireChannelRead(ReferenceCountUtil.retain(msg)); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + super.userEventTriggered(ctx, evt); + + if(evt instanceof IdleStateEvent){ + IdleStateEvent event = (IdleStateEvent) evt; + switch(event.state()){ + case READER_IDLE: + break; + case WRITER_IDLE: + this.sendPingReq(ctx.channel()); + break; + } + } + } + + private void sendPingReq(Channel channel){ + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0); + channel.writeAndFlush(new MqttMessage(fixedHeader)); + + if(this.pingRespTimeout != null){ + this.pingRespTimeout = channel.eventLoop().schedule(() -> { + MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); + channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE); + //TODO: what do when the connection is closed ? + }, this.keepaliveSeconds, TimeUnit.SECONDS); + } + } + + private void handlePingReq(Channel channel){ + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); + channel.writeAndFlush(new MqttMessage(fixedHeader)); + } + + private void handlePingResp(){ + if(this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()){ + this.pingRespTimeout.cancel(true); + this.pingRespTimeout = null; + } + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java new file mode 100644 index 0000000000..27f4cb969e --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import java.util.regex.Pattern; + +final class MqttSubscribtion { + + private final String topic; + private final Pattern topicRegex; + private final MqttHandler handler; + + private final boolean once; + + private boolean called; + + MqttSubscribtion(String topic, MqttHandler handler, boolean once) { + if(topic == null){ + throw new NullPointerException("topic"); + } + if(handler == null){ + throw new NullPointerException("handler"); + } + this.topic = topic; + this.handler = handler; + this.once = once; + this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$"); + } + + String getTopic() { + return topic; + } + + public MqttHandler getHandler() { + return handler; + } + + boolean isOnce() { + return once; + } + + boolean isCalled() { + return called; + } + + boolean matches(String topic){ + return this.topicRegex.matcher(topic).matches(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MqttSubscribtion that = (MqttSubscribtion) o; + + return once == that.once && topic.equals(that.topic) && handler.equals(that.handler); + } + + @Override + public int hashCode() { + int result = topic.hashCode(); + result = 31 * result + handler.hashCode(); + result = 31 * result + (once ? 1 : 0); + return result; + } + + void setCalled(boolean called) { + this.called = called; + } +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java new file mode 100644 index 0000000000..36e91e5210 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import io.netty.channel.EventLoop; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.util.concurrent.ScheduledFuture; + +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +final class RetransmissionHandler { + + private ScheduledFuture timer; + private int timeout = 10; + private BiConsumer handler; + private T originalMessage; + + void start(EventLoop eventLoop){ + if(eventLoop == null){ + throw new NullPointerException("eventLoop"); + } + if(this.handler == null){ + throw new NullPointerException("handler"); + } + this.timeout = 10; + this.startTimer(eventLoop); + } + + private void startTimer(EventLoop eventLoop){ + this.timer = eventLoop.schedule(() -> { + this.timeout += 5; + MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); + handler.accept(fixedHeader, originalMessage); + startTimer(eventLoop); + }, timeout, TimeUnit.SECONDS); + } + + void stop(){ + if(this.timer != null){ + this.timer.cancel(true); + } + } + + void setHandle(BiConsumer runnable) { + this.handler = runnable; + } + + void setOriginalMessage(T originalMessage) { + this.originalMessage = originalMessage; + } +} diff --git a/pom.xml b/pom.xml index 67d48b268b..297512fc9a 100755 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,6 @@ 2.5.3 1.2.1 9.4.1211 - 2.0.0TB org/thingsboard/server/gen/**/*, org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/* @@ -87,6 +86,7 @@ + netty-mqtt common rule-engine dao @@ -324,6 +324,11 @@ + + org.thingsboard + netty-mqtt + ${project.version} + org.thingsboard extensions-api @@ -568,6 +573,11 @@ netty-handler ${netty.version} + + io.netty + netty-codec-mqtt + ${netty.version} + com.datastax.cassandra cassandra-driver-core @@ -819,11 +829,6 @@ exe provided - - nl.jk5.netty-mqtt - netty-mqtt - ${netty-mqtt-client.version} - org.elasticsearch.client rest diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index 3352f5acec..0ef9f729e1 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -68,6 +68,10 @@ org.thingsboard.rule-engine rule-engine-api + + org.thingsboard + netty-mqtt + com.google.guava guava @@ -90,10 +94,6 @@ com.rabbitmq amqp-client - - nl.jk5.netty-mqtt - netty-mqtt - org.bouncycastle bcpkix-jdk15on diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 9694b0498b..ce54a73f4b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -24,9 +24,9 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; -import nl.jk5.mqtt.MqttClient; -import nl.jk5.mqtt.MqttClientConfig; -import nl.jk5.mqtt.MqttConnectResult; +import org.thingsboard.mqtt.MqttClient; +import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttConnectResult; import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.api.*; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java index 9d5e8df9b5..53aea9604c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java @@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.mqtt.credentials; import io.netty.handler.ssl.SslContext; -import nl.jk5.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttClientConfig; import java.util.Optional; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java index cbbd7036a2..b3d86c6a07 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java @@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt.credentials; import io.netty.handler.ssl.SslContext; import lombok.Data; -import nl.jk5.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttClientConfig; import java.util.Optional; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java index c9fb4a32a9..a462839b1e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java @@ -22,7 +22,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import nl.jk5.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttClientConfig; import org.apache.commons.codec.binary.Base64; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openssl.PEMDecryptorProvider; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java index 5c4594ff44..0ab81a8457 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java @@ -19,7 +19,7 @@ package org.thingsboard.rule.engine.mqtt.credentials; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.netty.handler.ssl.SslContext; -import nl.jk5.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttClientConfig; import java.util.Optional;