From 00b5d36e0bf8d8295463018537dbbd5744902b7b Mon Sep 17 00:00:00 2001 From: blackstar-baba <535650957@qq.com> Date: Wed, 1 Apr 2020 14:39:51 +0800 Subject: [PATCH] fix bug: Under high concurrency, Mqtt client nextMessageId exceeds the 0xffff limit (#2564) The compareAndSet method and getAndIncrement method of the AtomicInteger class are atomic, but when these two methods are used at the same time, they are no longer atomic. --- .../main/java/org/thingsboard/mqtt/MqttClientImpl.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 169779b530..aef2cad684 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -407,8 +407,12 @@ final class MqttClientImpl implements MqttClient { } private MqttMessageIdVariableHeader getNewMessageId() { - this.nextMessageId.compareAndSet(0xffff, 1); - return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement()); + int messageId; + synchronized (this.nextMessageId) { + this.nextMessageId.compareAndSet(0xffff, 1); + messageId = this.nextMessageId.getAndIncrement(); + } + return MqttMessageIdVariableHeader.from(messageId); } private Future createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {