|
|
|
@ -55,7 +55,6 @@ import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
import java.util.function.BiConsumer; |
|
|
|
|
|
|
|
/** |
|
|
|
* Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times |
|
|
|
@ -70,7 +69,7 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create(); |
|
|
|
private final ConcurrentMap<Integer, MqttPendingSubscription> pendingSubscriptions = new ConcurrentHashMap<>(); |
|
|
|
private final Set<String> pendingSubscribeTopics = new HashSet<>(); |
|
|
|
private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create(); |
|
|
|
private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscription = HashMultimap.create(); |
|
|
|
private final AtomicInteger nextMessageId = new AtomicInteger(1); |
|
|
|
|
|
|
|
private final MqttClientConfig clientConfig; |
|
|
|
@ -166,7 +165,7 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed()); |
|
|
|
pendingPublishes.clear(); |
|
|
|
pendingSubscribeTopics.clear(); |
|
|
|
handlerToSubscribtion.clear(); |
|
|
|
handlerToSubscription.clear(); |
|
|
|
scheduleConnectIfRequired(host, port, true); |
|
|
|
}); |
|
|
|
} else { |
|
|
|
@ -283,11 +282,11 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
@Override |
|
|
|
public Future<Void> off(String topic, MqttHandler handler) { |
|
|
|
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); |
|
|
|
for (MqttSubscription subscription : this.handlerToSubscribtion.get(handler)) { |
|
|
|
for (MqttSubscription subscription : this.handlerToSubscription.get(handler)) { |
|
|
|
this.subscriptions.remove(topic, subscription); |
|
|
|
} |
|
|
|
this.handlerToSubscribtion.removeAll(handler); |
|
|
|
this.checkSubscribtions(topic, future); |
|
|
|
this.handlerToSubscription.removeAll(handler); |
|
|
|
this.checkSubscriptions(topic, future); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@ -303,12 +302,12 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); |
|
|
|
ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic)); |
|
|
|
for (MqttSubscription subscription : subscriptions) { |
|
|
|
for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) { |
|
|
|
for (MqttSubscription handSub : this.handlerToSubscription.get(subscription.getHandler())) { |
|
|
|
this.subscriptions.remove(topic, handSub); |
|
|
|
} |
|
|
|
this.handlerToSubscribtion.remove(subscription.getHandler(), subscription); |
|
|
|
this.handlerToSubscription.remove(subscription.getHandler(), subscription); |
|
|
|
} |
|
|
|
this.checkSubscribtions(topic, future); |
|
|
|
this.checkSubscriptions(topic, future); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@ -461,7 +460,7 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
if (this.serverSubscriptions.contains(topic)) { |
|
|
|
MqttSubscription subscription = new MqttSubscription(topic, handler, once); |
|
|
|
this.subscriptions.put(topic, subscription); |
|
|
|
this.handlerToSubscribtion.put(handler, subscription); |
|
|
|
this.handlerToSubscription.put(handler, subscription); |
|
|
|
return this.channel.newSucceededFuture(); |
|
|
|
} |
|
|
|
|
|
|
|
@ -484,7 +483,7 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
private void checkSubscribtions(String topic, Promise<Void> promise) { |
|
|
|
private void checkSubscriptions(String topic, Promise<Void> promise) { |
|
|
|
if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscriptions.contains(topic)) { |
|
|
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); |
|
|
|
MqttMessageIdVariableHeader variableHeader = getNewMessageId(); |
|
|
|
@ -514,8 +513,8 @@ final class MqttClientImpl implements MqttClient { |
|
|
|
return pendingSubscribeTopics; |
|
|
|
} |
|
|
|
|
|
|
|
HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscribtion() { |
|
|
|
return handlerToSubscribtion; |
|
|
|
HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscription() { |
|
|
|
return handlerToSubscription; |
|
|
|
} |
|
|
|
|
|
|
|
Set<String> getServerSubscriptions() { |
|
|
|
|