From e7c4e7685196d805faebb0e28260a163c7c224d9 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Mon, 8 Nov 2021 12:59:42 +0200 Subject: [PATCH] Added removing for timers in pendingPublishes on channel is closed --- .../org/thingsboard/mqtt/MqttClientImpl.java | 32 +++++++++++-------- .../thingsboard/mqtt/MqttPendingPublish.java | 5 +++ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 827d9fa4cf..5fbdb883af 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; /** * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times @@ -160,6 +161,7 @@ final class MqttClientImpl implements MqttClient { subscriptions.clear(); pendingServerUnsubscribes.clear(); qos2PendingIncomingPublishes.clear(); + pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed()); pendingPublishes.clear(); pendingSubscribeTopics.clear(); handlerToSubscribtion.clear(); @@ -366,20 +368,24 @@ final class MqttClientImpl implements MqttClient { ChannelFuture channelFuture = this.sendAndFlushPacket(message); if (channelFuture != null) { - pendingPublish.setSent(true); - if (channelFuture.cause() != null) { - this.pendingPublishes.remove(pendingPublish.getMessageId()); - future.setFailure(channelFuture.cause()); - return future; - } - } - if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) { - this.pendingPublishes.remove(pendingPublish.getMessageId()); - pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 - } else if (pendingPublish.isSent()) { - pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); + channelFuture.addListener(result -> { + pendingPublish.setSent(true); + if (result.cause() != null) { + pendingPublishes.remove(pendingPublish.getMessageId()); + future.setFailure(result.cause()); + } else { + if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) { + pendingPublishes.remove(pendingPublish.getMessageId()); + pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 + } else if (pendingPublish.isSent()) { + pendingPublish.startPublishRetransmissionTimer(eventLoop.next(), MqttClientImpl.this::sendAndFlushPacket); + } else { + pendingPublishes.remove(pendingPublish.getMessageId()); + } + } + }); } else { - this.pendingPublishes.remove(pendingPublish.getMessageId()); + pendingPublishes.remove(pendingPublish.getMessageId()); } return future; } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java index cedbd4556d..f647569553 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java @@ -98,4 +98,9 @@ final class MqttPendingPublish { void onPubcompReceived() { this.pubrelRetransmissionHandler.stop(); } + + void onChannelClosed() { + this.publishRetransmissionHandler.stop(); + this.pubrelRetransmissionHandler.stop(); + } }