diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 3d05e999e0..8a60168154 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -88,20 +88,30 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.proxy_enabled:false}") private boolean proxyEnabled; - private final AtomicInteger connectionsCounter = new AtomicInteger(); + private final AtomicInteger connectionsActiveCounterMQTT = new AtomicInteger(); + private final AtomicInteger connectionsActiveCounterMQTTS = new AtomicInteger(); @PostConstruct public void init() { super.init(); - transportService.createGaugeStats("openConnections", connectionsCounter); + transportService.createGaugeStats("connections_active", connectionsActiveCounterMQTT, "protocol", "MQTT"); + transportService.createGaugeStats("connections_active", connectionsActiveCounterMQTTS, "protocol", "MQTTS"); } - public void channelRegistered() { - connectionsCounter.incrementAndGet(); + public void channelRegistered(boolean isSSL) { + if (isSSL) { + connectionsActiveCounterMQTTS.incrementAndGet(); + } else { + connectionsActiveCounterMQTT.incrementAndGet(); + } } - public void channelUnregistered() { - connectionsCounter.decrementAndGet(); + public void channelUnregistered(boolean isSSL) { + if (isSSL) { + connectionsActiveCounterMQTTS.decrementAndGet(); + } else { + connectionsActiveCounterMQTT.decrementAndGet(); + } } public boolean checkAddress(InetSocketAddress address) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index f753579292..bf623d67d8 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -180,16 +180,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement this.rpcAwaitingAck = new ConcurrentHashMap<>(); } + boolean isSSL() { + return sslHandler != null; + } + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); - context.channelRegistered(); + context.channelRegistered(isSSL()); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); - context.channelUnregistered(); + context.channelUnregistered(isSSL()); } @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 5c7d552855..612f331757 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -161,5 +161,5 @@ public interface TransportService { boolean hasSession(SessionInfoProto sessionInfo); - void createGaugeStats(String openConnections, AtomicInteger connectionsCounter); + void createGaugeStats(String openConnections, AtomicInteger connectionsCounter, String... tags); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 260957c990..47e341547e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1257,9 +1257,21 @@ public class DefaultTransportService extends TransportActivityManager implements } @Override - public void createGaugeStats(String statsName, AtomicInteger number) { - statsFactory.createGauge(StatsType.TRANSPORT + "." + statsName, number); - statsMap.put(statsName, number); + public void createGaugeStats(String statsName, AtomicInteger number, String... tags) { + String key = "thingsboard" + "." + StatsType.TRANSPORT.getName() + "." + statsName; + statsFactory.createGauge(key, number, tags); + statsMap.put(statsName + TagsKey(tags), number); + } + + String TagsKey(String... tags) { + if (tags == null || tags.length < 2) return ""; + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < tags.length; i += 2) { + if (i > 0) sb.append(','); + sb.append(tags[i]).append('=').append(i + 1 < tags.length ? tags[i + 1] : ""); + } + sb.append(']'); + return sb.toString(); } @Scheduled(fixedDelayString = "${transport.stats.print-interval-ms:60000}")