Browse Source

MQTTS metrics

pull/15112/head
Sergey Matvienko 10 months ago
committed by Sergii Matviienko
parent
commit
45305e1884
  1. 22
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
  2. 8
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  3. 2
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  4. 18
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

22
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java

@ -88,20 +88,30 @@ public class MqttTransportContext extends TransportContext {
@Value("${transport.mqtt.proxy_enabled:false}")
private boolean proxyEnabled;
private final AtomicInteger connectionsCounter = new AtomicInteger();
private final AtomicInteger connectionsActiveCounterMQTT = new AtomicInteger();
private final AtomicInteger connectionsActiveCounterMQTTS = new AtomicInteger();
@PostConstruct
public void init() {
super.init();
transportService.createGaugeStats("openConnections", connectionsCounter);
transportService.createGaugeStats("connections_active", connectionsActiveCounterMQTT, "protocol", "MQTT");
transportService.createGaugeStats("connections_active", connectionsActiveCounterMQTTS, "protocol", "MQTTS");
}
public void channelRegistered() {
connectionsCounter.incrementAndGet();
public void channelRegistered(boolean isSSL) {
if (isSSL) {
connectionsActiveCounterMQTTS.incrementAndGet();
} else {
connectionsActiveCounterMQTT.incrementAndGet();
}
}
public void channelUnregistered() {
connectionsCounter.decrementAndGet();
public void channelUnregistered(boolean isSSL) {
if (isSSL) {
connectionsActiveCounterMQTTS.decrementAndGet();
} else {
connectionsActiveCounterMQTT.decrementAndGet();
}
}
public boolean checkAddress(InetSocketAddress address) {

8
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -180,16 +180,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.rpcAwaitingAck = new ConcurrentHashMap<>();
}
boolean isSSL() {
return sslHandler != null;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
context.channelRegistered();
context.channelRegistered(isSSL());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
context.channelUnregistered();
context.channelUnregistered(isSSL());
}
@Override

2
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -161,5 +161,5 @@ public interface TransportService {
boolean hasSession(SessionInfoProto sessionInfo);
void createGaugeStats(String openConnections, AtomicInteger connectionsCounter);
void createGaugeStats(String openConnections, AtomicInteger connectionsCounter, String... tags);
}

18
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -1257,9 +1257,21 @@ public class DefaultTransportService extends TransportActivityManager implements
}
@Override
public void createGaugeStats(String statsName, AtomicInteger number) {
statsFactory.createGauge(StatsType.TRANSPORT + "." + statsName, number);
statsMap.put(statsName, number);
public void createGaugeStats(String statsName, AtomicInteger number, String... tags) {
String key = "thingsboard" + "." + StatsType.TRANSPORT.getName() + "." + statsName;
statsFactory.createGauge(key, number, tags);
statsMap.put(statsName + TagsKey(tags), number);
}
String TagsKey(String... tags) {
if (tags == null || tags.length < 2) return "";
StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < tags.length; i += 2) {
if (i > 0) sb.append(',');
sb.append(tags[i]).append('=').append(i + 1 < tags.length ? tags[i + 1] : "");
}
sb.append(']');
return sb.toString();
}
@Scheduled(fixedDelayString = "${transport.stats.print-interval-ms:60000}")

Loading…
Cancel
Save