Browse Source

Merge pull request #7725 from ViacheslavKlimov/feature/monitoring-service

Monitoring microservice
pull/8351/head
Andrew Shvayka 3 years ago
committed by GitHub
parent
commit
5b6db5d796
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 160
      monitoring/pom.xml
  2. 46
      monitoring/src/main/conf/logback.xml
  3. 22
      monitoring/src/main/conf/tb-monitoring.conf
  4. 40
      monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java
  5. 49
      monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java
  6. 194
      monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java
  7. 59
      monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java
  8. 34
      monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java
  9. 26
      monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java
  10. 35
      monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java
  11. 33
      monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringConfig.java
  12. 33
      monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringConfig.java
  13. 39
      monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringConfig.java
  14. 33
      monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringConfig.java
  15. 30
      monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java
  16. 67
      monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java
  17. 22
      monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java
  18. 28
      monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java
  19. 32
      monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java
  20. 27
      monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java
  21. 28
      monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataCmd.java
  22. 44
      monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java
  23. 28
      monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/LatestValueCmd.java
  24. 42
      monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java
  25. 22
      monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java
  26. 49
      monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java
  27. 31
      monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java
  28. 53
      monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java
  29. 24
      monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java
  30. 58
      monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java
  31. 144
      monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java
  32. 120
      monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportHealthChecker.java
  33. 143
      monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportMonitoringService.java
  34. 78
      monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/CoapTransportHealthChecker.java
  35. 67
      monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/HttpTransportHealthChecker.java
  36. 87
      monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/MqttTransportHealthChecker.java
  37. 41
      monitoring/src/main/java/org/thingsboard/monitoring/util/TbStopWatch.java
  38. 38
      monitoring/src/main/resources/logback.xml
  39. 107
      monitoring/src/main/resources/tb-monitoring.yml
  40. 30
      msa/monitoring/docker/Dockerfile
  41. 31
      msa/monitoring/docker/start-tb-monitoring.sh
  42. 192
      msa/monitoring/pom.xml
  43. 1
      msa/pom.xml
  44. 6
      pom.xml

160
monitoring/pom.xml

@ -0,0 +1,160 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016-2023 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.5.0-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>monitoring</artifactId>
<name>ThingsBoard Monitoring Service</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/..</main.dir>
<pkg.type>java</pkg.type>
<pkg.disabled>false</pkg.disabled>
<pkg.process-resources.phase>process-resources</pkg.process-resources.phase>
<pkg.package.phase>package</pkg.package.phase>
<pkg.name>tb-monitoring</pkg.name>
<pkg.copyInstallScripts>false</pkg.copyInstallScripts>
<pkg.win.dist>${project.build.directory}/windows</pkg.win.dist>
<pkg.implementationTitle>ThingsBoard Monitoring Service</pkg.implementationTitle>
<pkg.mainClass>org.thingsboard.monitoring.ThingsboardMonitoringApplication</pkg.mainClass>
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>data</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>util</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>rest-client</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>scandium</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${pkg.name}-${project.version}</finalName>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.thingsboard</groupId>
<artifactId>gradle-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>jenkins</id>
<name>Jenkins Repository</name>
<url>https://repo.jenkins-ci.org/releases</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

46
monitoring/src/main/conf/logback.xml

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Copyright © 2016-2023 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE configuration>
<configuration scan="true" scanPeriod="30 seconds">
<appender name="fileLogAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${pkg.logFolder}/${pkg.name}.log</file>
<rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${pkg.logFolder}/${pkg.name}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>30</maxHistory>
<totalSizeCap>3GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org" level="WARN"/>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.monitoring" level="DEBUG"/>
<logger name="org.thingsboard.monitoring.client" level="INFO"/>
<root level="INFO">
<appender-ref ref="fileLogAppender"/>
</root>
</configuration>

22
monitoring/src/main/conf/tb-monitoring.conf

@ -0,0 +1,22 @@
#
# Copyright © 2016-2023 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=/var/log/tb-monitoring/gc.log:time,uptime,level,tags:filecount=10,filesize=10M"
export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError"
export JAVA_OPTS="$JAVA_OPTS -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark"
export JAVA_OPTS="$JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:+UseStringDeduplication -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10"
export LOG_FILENAME=tb-monitoring.out
export LOADER_PATH=/usr/share/tb-monitoring/conf

40
monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@SpringBootApplication
@EnableScheduling
@Slf4j
public class ThingsboardMonitoringApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(ThingsboardMonitoringApplication.class)
.properties(Map.of("spring.config.name", "tb-monitoring"))
.run(args);
}
}

49
monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.client;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.thingsboard.rest.client.RestClient;
import java.time.Duration;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TbClient extends RestClient {
@Value("${monitoring.rest.username}")
private String username;
@Value("${monitoring.rest.password}")
private String password;
public TbClient(@Value("${monitoring.rest.base_url}") String baseUrl,
@Value("${monitoring.rest.request_timeout_ms}") int requestTimeoutMs) {
super(new RestTemplateBuilder()
.setConnectTimeout(Duration.ofMillis(requestTimeoutMs))
.setReadTimeout(Duration.ofMillis(requestTimeoutMs))
.build(), baseUrl);
}
public String logIn() {
login(username, password);
return getToken();
}
}

194
monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java

@ -0,0 +1,194 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.client;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.data.cmd.CmdsWrapper;
import org.thingsboard.monitoring.data.cmd.EntityDataCmd;
import org.thingsboard.monitoring.data.cmd.EntityDataUpdate;
import org.thingsboard.monitoring.data.cmd.LatestValueCmd;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityListFilter;
import javax.net.ssl.SSLParameters;
import java.net.URI;
import java.nio.channels.NotYetConnectedException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
public class WsClient extends WebSocketClient implements AutoCloseable {
public volatile JsonNode lastMsg;
private CountDownLatch reply;
private CountDownLatch update;
private final Lock updateLock = new ReentrantLock();
private long requestTimeoutMs;
public WsClient(URI serverUri, long requestTimeoutMs) {
super(serverUri);
this.requestTimeoutMs = requestTimeoutMs;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
}
@Override
public void onMessage(String s) {
if (s == null) {
return;
}
updateLock.lock();
try {
lastMsg = JacksonUtil.toJsonNode(s);
log.trace("Received new msg: {}", lastMsg.toPrettyString());
if (update != null) {
update.countDown();
}
if (reply != null) {
reply.countDown();
}
} finally {
updateLock.unlock();
}
}
@Override
public void onClose(int i, String s, boolean b) {
log.debug("WebSocket client is closed");
}
@Override
public void onError(Exception e) {
log.error("WebSocket client error:", e);
}
public void registerWaitForUpdate() {
updateLock.lock();
try {
lastMsg = null;
update = new CountDownLatch(1);
} finally {
updateLock.unlock();
}
log.trace("Registered wait for update");
}
@Override
public void send(String text) throws NotYetConnectedException {
updateLock.lock();
try {
reply = new CountDownLatch(1);
} finally {
updateLock.unlock();
}
super.send(text);
}
public WsClient subscribeForTelemetry(List<UUID> devices, String key) {
EntityDataCmd cmd = new EntityDataCmd();
cmd.setCmdId(RandomUtils.nextInt(0, 1000));
EntityListFilter devicesFilter = new EntityListFilter();
devicesFilter.setEntityType(EntityType.DEVICE);
devicesFilter.setEntityList(devices.stream().map(UUID::toString).collect(Collectors.toList()));
EntityDataPageLink pageLink = new EntityDataPageLink(100,0, null, null);
EntityDataQuery devicesQuery = new EntityDataQuery(devicesFilter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
cmd.setQuery(devicesQuery);
LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(List.of(new EntityKey(EntityKeyType.TIME_SERIES, key)));
cmd.setLatestCmd(latestCmd);
CmdsWrapper wrapper = new CmdsWrapper();
wrapper.setEntityDataCmds(List.of(cmd));
send(JacksonUtil.toString(wrapper));
return this;
}
public JsonNode waitForUpdate(long ms) {
log.trace("update latch count: {}", update.getCount());
try {
if (update.await(ms, TimeUnit.MILLISECONDS)) {
log.trace("Waited for update");
return getLastMsg();
}
} catch (InterruptedException e) {
log.debug("Failed to await reply", e);
}
log.trace("No update arrived within {} ms", ms);
return null;
}
public JsonNode waitForReply() {
try {
if (reply.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) {
log.trace("Waited for reply");
return getLastMsg();
}
} catch (InterruptedException e) {
log.debug("Failed to await reply", e);
}
log.trace("No reply arrived within {} ms", requestTimeoutMs);
throw new IllegalStateException("No WS reply arrived within " + requestTimeoutMs + " ms");
}
private JsonNode getLastMsg() {
if (lastMsg != null) {
JsonNode errorMsg = lastMsg.get("errorMsg");
if (errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText())) {
throw new RuntimeException("WS error from server: " + errorMsg.asText());
} else {
return lastMsg;
}
} else {
return null;
}
}
public Object getTelemetryUpdate(UUID deviceId, String key) {
JsonNode lastMsg = getLastMsg();
if (lastMsg == null || lastMsg.isNull()) return null;
EntityDataUpdate update = JacksonUtil.treeToValue(lastMsg, EntityDataUpdate.class);
return update.getLatest(deviceId, key);
}
@Override
protected void onSetSSLParameters(SSLParameters sslParameters) {
sslParameters.setEndpointIdentificationAlgorithm(null);
}
}

59
monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.client;
import lombok.RequiredArgsConstructor;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.data.Latencies;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.monitoring.util.TbStopWatch;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@Component
@RequiredArgsConstructor
public class WsClientFactory {
private final MonitoringReporter monitoringReporter;
private final TbStopWatch stopWatch;
@Value("${monitoring.ws.base_url}")
private String baseUrl;
@Value("${monitoring.ws.request_timeout_ms}")
private int requestTimeoutMs;
public WsClient createClient(String accessToken) throws Exception {
URI uri = new URI(baseUrl + "/api/ws/plugins/telemetry?token=" + accessToken);
stopWatch.start();
WsClient wsClient = new WsClient(uri, requestTimeoutMs);
if (baseUrl.startsWith("wss")) {
SSLContextBuilder builder = SSLContexts.custom();
builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
wsClient.setSocketFactory(builder.build().getSocketFactory());
}
boolean connected = wsClient.connectBlocking(requestTimeoutMs, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("Failed to establish WS session");
}
monitoringReporter.reportLatency(Latencies.WS_CONNECT, stopWatch.getTime());
return wsClient;
}
}

34
monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.util.UUID;
@Data
public class DeviceConfig {
private UUID id;
private DeviceCredentials credentials;
public void setId(String id) {
this.id = StringUtils.isNotEmpty(id) ? UUID.fromString(id) : null;
}
}

26
monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config;
import lombok.Data;
@Data
public class MonitoringTargetConfig {
private String baseUrl;
private DeviceConfig device;
}

35
monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.thingsboard.monitoring.transport.TransportHealthChecker;
import org.thingsboard.monitoring.transport.impl.CoapTransportHealthChecker;
import org.thingsboard.monitoring.transport.impl.HttpTransportHealthChecker;
import org.thingsboard.monitoring.transport.impl.MqttTransportHealthChecker;
@AllArgsConstructor
@Getter
public enum TransportType {
MQTT(MqttTransportHealthChecker.class),
COAP(CoapTransportHealthChecker.class),
HTTP(HttpTransportHealthChecker.class);
private final Class<? extends TransportHealthChecker<?>> serviceClass;
}

33
monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringConfig.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.service;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.config.TransportType;
@Component
@ConditionalOnProperty(name = "monitoring.transports.coap.enabled", havingValue = "true")
@ConfigurationProperties(prefix = "monitoring.transports.coap")
public class CoapTransportMonitoringConfig extends TransportMonitoringConfig {
@Override
public TransportType getTransportType() {
return TransportType.COAP;
}
}

33
monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringConfig.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.service;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.config.TransportType;
@Component
@ConditionalOnProperty(name = "monitoring.transports.http.enabled", havingValue = "true")
@ConfigurationProperties(prefix = "monitoring.transports.http")
public class HttpTransportMonitoringConfig extends TransportMonitoringConfig {
@Override
public TransportType getTransportType() {
return TransportType.HTTP;
}
}

39
monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringConfig.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.service;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.config.TransportType;
@Component
@ConditionalOnProperty(name = "monitoring.transports.mqtt.enabled", havingValue = "true")
@ConfigurationProperties(prefix = "monitoring.transports.mqtt")
@Data
@EqualsAndHashCode(callSuper = true)
public class MqttTransportMonitoringConfig extends TransportMonitoringConfig {
private Integer qos;
@Override
public TransportType getTransportType() {
return TransportType.MQTT;
}
}

33
monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringConfig.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.service;
import lombok.Data;
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
import org.thingsboard.monitoring.config.TransportType;
import java.util.List;
@Data
public abstract class TransportMonitoringConfig {
private int requestTimeoutMs;
private List<MonitoringTargetConfig> targets;
public abstract TransportType getTransportType();
}

30
monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data;
import org.thingsboard.monitoring.config.TransportType;
public class Latencies {
public static final String WS_UPDATE = "wsUpdate";
public static final String WS_CONNECT = "wsConnect";
public static final String LOG_IN = "logIn";
public static String transportRequest(TransportType transportType) {
return String.format("%sTransportRequest", transportType.name().toLowerCase());
}
}

67
monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data;
import com.google.common.util.concurrent.AtomicDouble;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.atomic.AtomicInteger;
@RequiredArgsConstructor
public class Latency {
private final String key;
private final AtomicDouble latencySum = new AtomicDouble();
private final AtomicInteger counter = new AtomicInteger();
public synchronized void report(double latencyInMs) {
latencySum.addAndGet(latencyInMs);
counter.incrementAndGet();
}
public synchronized double getAvg() {
return latencySum.get() / counter.get();
}
public boolean isNotEmpty() {
return counter.get() > 0;
}
public synchronized void reset() {
latencySum.set(0.0);
counter.set(0);
}
public String getKey() {
return key;
}
public synchronized Latency snapshot() {
Latency snapshot = new Latency(key);
snapshot.latencySum.set(latencySum.get());
snapshot.counter.set(counter.get());
return snapshot;
}
@Override
public String toString() {
return "Latency{" +
"key='" + key + '\'' +
", avgLatency=" + getAvg() +
'}';
}
}

22
monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data;
public class MonitoredServiceKey {
public static final String GENERAL = "Monitoring";
}

28
monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data;
public class TransportFailureException extends RuntimeException {
public TransportFailureException(Throwable cause) {
super(cause.getMessage(), cause);
}
public TransportFailureException(String message) {
super(message);
}
}

32
monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data;
import lombok.Data;
import org.thingsboard.monitoring.config.TransportType;
@Data
public class TransportInfo {
private final TransportType transportType;
private final String url;
@Override
public String toString() {
return String.format("%s (%s)", transportType, url);
}
}

27
monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.cmd;
import lombok.Data;
import java.util.List;
@Data
public class CmdsWrapper {
private List<EntityDataCmd> entityDataCmds;
}

28
monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataCmd.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.cmd;
import lombok.Data;
import org.thingsboard.server.common.data.query.EntityDataQuery;
@Data
public class EntityDataCmd {
private int cmdId;
private EntityDataQuery query;
private LatestValueCmd latestCmd;
}

44
monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.cmd;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import java.util.List;
import java.util.UUID;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class EntityDataUpdate {
@JsonIgnoreProperties(ignoreUnknown = true)
private List<EntityData> update;
public String getLatest(UUID entityId, String key) {
if (update == null) return null;
return update.stream()
.filter(entityData -> entityData.getEntityId().getId().equals(entityId)).findFirst()
.map(EntityData::getLatest).map(latest -> latest.get(EntityKeyType.TIME_SERIES))
.map(latest -> latest.get(key)).map(TsValue::getValue)
.orElse(null);
}
}

28
monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/LatestValueCmd.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.cmd;
import lombok.Data;
import org.thingsboard.server.common.data.query.EntityKey;
import java.util.List;
@Data
public class LatestValueCmd {
private List<EntityKey> keys;
}

42
monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.notification;
import org.thingsboard.monitoring.data.Latency;
import java.util.Collection;
public class HighLatencyNotification implements Notification {
private final Collection<Latency> latencies;
private final int thresholdMs;
public HighLatencyNotification(Collection<Latency> latencies, int thresholdMs) {
this.latencies = latencies;
this.thresholdMs = thresholdMs;
}
@Override
public String getText() {
StringBuilder text = new StringBuilder();
text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n");
latencies.forEach(latency -> {
text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg()));
});
return text.toString();
}
}

22
monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.notification;
public interface Notification {
String getText();
}

49
monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.notification;
import lombok.Getter;
import org.apache.commons.lang3.exception.ExceptionUtils;
@Getter
public class ServiceFailureNotification implements Notification {
private final Object serviceKey;
private final Throwable error;
private final int failuresCount;
public ServiceFailureNotification(Object serviceKey, Throwable error, int failuresCount) {
this.serviceKey = serviceKey;
this.error = error;
this.failuresCount = failuresCount;
}
@Override
public String getText() {
String errorMsg = error.getMessage();
if (errorMsg == null || errorMsg.equals("null")) {
Throwable cause = ExceptionUtils.getRootCause(error);
if (cause != null) {
errorMsg = cause.getMessage();
}
}
if (errorMsg == null) {
errorMsg = error.getClass().getSimpleName();
}
return String.format("[%s] Failure: %s (number of subsequent failures: %s)", serviceKey, errorMsg, failuresCount);
}
}

31
monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.data.notification;
public class ServiceRecoveryNotification implements Notification {
private final Object serviceKey;
public ServiceRecoveryNotification(Object serviceKey) {
this.serviceKey = serviceKey;
}
@Override
public String getText() {
return String.format("[%s] is OK", serviceKey);
}
}

53
monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java

@ -0,0 +1,53 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.notification;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.data.notification.Notification;
import org.thingsboard.monitoring.notification.channels.NotificationChannel;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private final List<NotificationChannel> notificationChannels;
private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor();
public void sendNotification(Notification notification) {
forEachNotificationChannel(notificationChannel -> notificationChannel.sendNotification(notification));
}
private void forEachNotificationChannel(Consumer<NotificationChannel> function) {
notificationChannels.forEach(notificationChannel -> {
notificationExecutor.submit(() -> {
try {
function.accept(notificationChannel);
} catch (Exception e) {
log.error("Failed to send notification to {}", notificationChannel.getClass().getSimpleName(), e);
}
});
});
}
}

24
monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.notification.channels;
import org.thingsboard.monitoring.data.notification.Notification;
public interface NotificationChannel {
void sendNotification(Notification notification);
}

58
monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.notification.channels.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.thingsboard.monitoring.data.notification.Notification;
import org.thingsboard.monitoring.notification.channels.NotificationChannel;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Map;
@Component
@ConditionalOnProperty(value = "monitoring.notification_channels.slack.enabled", havingValue = "true")
@Slf4j
public class SlackNotificationChannel implements NotificationChannel {
@Value("${monitoring.notification_channels.slack.webhook_url}")
private String webhookUrl;
private RestTemplate restTemplate;
@PostConstruct
private void init() {
restTemplate = new RestTemplateBuilder()
.setConnectTimeout(Duration.ofSeconds(5))
.setReadTimeout(Duration.ofSeconds(2))
.build();
}
@Override
public void sendNotification(Notification notification) {
sendNotification(notification.getText());
}
private void sendNotification(String message) {
restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class);
}
}

144
monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java

@ -0,0 +1,144 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.data.Latency;
import org.thingsboard.monitoring.data.MonitoredServiceKey;
import org.thingsboard.monitoring.data.notification.HighLatencyNotification;
import org.thingsboard.monitoring.data.notification.ServiceFailureNotification;
import org.thingsboard.monitoring.data.notification.ServiceRecoveryNotification;
import org.thingsboard.monitoring.notification.NotificationService;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
@Slf4j
public class MonitoringReporter {
private final NotificationService notificationService;
private final Map<String, Latency> latencies = new ConcurrentHashMap<>();
private final Map<Object, AtomicInteger> failuresCounters = new ConcurrentHashMap<>();
@Value("${monitoring.failures_threshold}")
private int failuresThreshold;
@Value("${monitoring.send_repeated_failure_notification}")
private boolean sendRepeatedFailureNotification;
@Value("${monitoring.latency.enabled}")
private boolean latencyReportingEnabled;
@Value("${monitoring.latency.threshold_ms}")
private int latencyThresholdMs;
@Value("${monitoring.latency.reporting_asset_id}")
private String reportingAssetId;
public void reportLatencies(TbClient tbClient) {
List<Latency> latencies = this.latencies.values().stream()
.filter(Latency::isNotEmpty)
.map(latency -> {
Latency snapshot = latency.snapshot();
latency.reset();
return snapshot;
})
.collect(Collectors.toList());
if (latencies.isEmpty()) {
return;
}
log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms")
.collect(Collectors.joining("\n")));
if (!latencyReportingEnabled) return;
if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) {
HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs);
notificationService.sendNotification(highLatencyNotification);
}
try {
if (StringUtils.isBlank(reportingAssetId)) {
String assetName = "Monitoring";
Asset monitoringAsset = tbClient.findAsset(assetName).orElseGet(() -> {
Asset asset = new Asset();
asset.setType("Monitoring");
asset.setName(assetName);
asset = tbClient.saveAsset(asset);
log.info("Created monitoring asset {}", asset.getId());
return asset;
});
reportingAssetId = monitoringAsset.getId().toString();
}
ObjectNode msg = JacksonUtil.newObjectNode();
latencies.forEach(latency -> {
msg.set(latency.getKey(), new DoubleNode(latency.getAvg()));
});
tbClient.saveEntityTelemetry(new AssetId(UUID.fromString(reportingAssetId)), "time", msg);
} catch (Exception e) {
log.error("Failed to report latencies: {}", e.getMessage());
}
}
public void reportLatency(String key, long latencyInNanos) {
String latencyKey = key + "Latency";
double latencyInMs = (double) latencyInNanos / 1000_000;
log.trace("Reporting latency [{}]: {} ms", key, latencyInMs);
latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs);
}
public void serviceFailure(Object serviceKey, Throwable error) {
if (log.isDebugEnabled()) {
log.error("Error occurred", error);
}
int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet();
ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount);
log.error(notification.getText());
if (failuresCount == failuresThreshold || (sendRepeatedFailureNotification && failuresCount % failuresThreshold == 0)) {
notificationService.sendNotification(notification);
}
}
public void serviceIsOk(Object serviceKey) {
ServiceRecoveryNotification notification = new ServiceRecoveryNotification(serviceKey);
if (!serviceKey.equals(MonitoredServiceKey.GENERAL)) {
log.info(notification.getText());
}
AtomicInteger failuresCounter = failuresCounters.get(serviceKey);
if (failuresCounter != null) {
if (failuresCounter.get() >= failuresThreshold) {
notificationService.sendNotification(notification);
}
failuresCounter.set(0);
}
}
}

120
monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportHealthChecker.java

@ -0,0 +1,120 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.transport;
import com.fasterxml.jackson.databind.node.TextNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.WsClient;
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
import org.thingsboard.monitoring.config.TransportType;
import org.thingsboard.monitoring.config.service.TransportMonitoringConfig;
import org.thingsboard.monitoring.data.Latencies;
import org.thingsboard.monitoring.data.MonitoredServiceKey;
import org.thingsboard.monitoring.data.TransportFailureException;
import org.thingsboard.monitoring.data.TransportInfo;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.monitoring.util.TbStopWatch;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.UUID;
@Slf4j
public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> {
protected final C config;
protected final MonitoringTargetConfig target;
private TransportInfo transportInfo;
@Autowired
private MonitoringReporter reporter;
@Autowired
private TbStopWatch stopWatch;
@Value("${monitoring.check_timeout_ms}")
private int resultCheckTimeoutMs;
public static final String TEST_TELEMETRY_KEY = "testData";
protected TransportHealthChecker(C config, MonitoringTargetConfig target) {
this.config = config;
this.target = target;
}
@PostConstruct
private void init() {
transportInfo = new TransportInfo(getTransportType(), target.getBaseUrl());
}
public final void check(WsClient wsClient) {
log.debug("[{}] Checking", transportInfo);
try {
wsClient.registerWaitForUpdate();
String testValue = UUID.randomUUID().toString();
String testPayload = JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString();
try {
initClientAndSendPayload(testPayload);
log.trace("[{}] Sent test payload ({})", transportInfo, testPayload);
} catch (Throwable e) {
throw new TransportFailureException(e);
}
log.trace("[{}] Waiting for WS update", transportInfo);
checkWsUpdate(wsClient, testValue);
reporter.serviceIsOk(transportInfo);
reporter.serviceIsOk(MonitoredServiceKey.GENERAL);
} catch (TransportFailureException transportFailureException) {
reporter.serviceFailure(transportInfo, transportFailureException);
} catch (Exception e) {
reporter.serviceFailure(MonitoredServiceKey.GENERAL, e);
}
}
private void initClientAndSendPayload(String payload) throws Throwable {
initClient();
stopWatch.start();
sendTestPayload(payload);
reporter.reportLatency(Latencies.transportRequest(getTransportType()), stopWatch.getTime());
}
private void checkWsUpdate(WsClient wsClient, String testValue) {
stopWatch.start();
wsClient.waitForUpdate(resultCheckTimeoutMs);
log.trace("[{}] Waited for WS update. Last WS msg: {}", transportInfo, wsClient.lastMsg);
Object update = wsClient.getTelemetryUpdate(target.getDevice().getId(), TEST_TELEMETRY_KEY);
if (update == null) {
throw new TransportFailureException("No WS update arrived within " + resultCheckTimeoutMs + " ms");
} else if (!update.toString().equals(testValue)) {
throw new TransportFailureException("Was expecting value " + testValue + " but got " + update);
}
reporter.reportLatency(Latencies.WS_UPDATE, stopWatch.getTime());
}
protected abstract void initClient() throws Exception;
protected abstract void sendTestPayload(String payload) throws Exception;
@PreDestroy
protected abstract void destroyClient() throws Exception;
protected abstract TransportType getTransportType();
}

143
monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportMonitoringService.java

@ -0,0 +1,143 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.transport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.client.WsClient;
import org.thingsboard.monitoring.client.WsClientFactory;
import org.thingsboard.monitoring.config.DeviceConfig;
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
import org.thingsboard.monitoring.config.service.TransportMonitoringConfig;
import org.thingsboard.monitoring.data.Latencies;
import org.thingsboard.monitoring.data.MonitoredServiceKey;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.monitoring.util.TbStopWatch;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import javax.annotation.PostConstruct;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
@Slf4j
public final class TransportMonitoringService {
private final List<TransportMonitoringConfig> configs;
private final List<TransportHealthChecker<?>> transportHealthCheckers = new LinkedList<>();
private final List<UUID> devices = new LinkedList<>();
private final TbClient tbClient;
private final WsClientFactory wsClientFactory;
private final TbStopWatch stopWatch;
private final MonitoringReporter reporter;
private final ApplicationContext applicationContext;
private ScheduledExecutorService scheduler;
@Value("${monitoring.transports.monitoring_rate_ms}")
private int monitoringRateMs;
@PostConstruct
private void init() {
configs.forEach(config -> {
config.getTargets().stream()
.filter(target -> StringUtils.isNotBlank(target.getBaseUrl()))
.peek(target -> checkMonitoringTarget(config, target, tbClient))
.forEach(target -> {
TransportHealthChecker<?> transportHealthChecker = applicationContext.getBean(config.getTransportType().getServiceClass(), config, target);
transportHealthCheckers.add(transportHealthChecker);
devices.add(target.getDevice().getId());
});
});
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("monitoring-executor"));
}
@EventListener(ApplicationReadyEvent.class)
public void startMonitoring() {
scheduler.scheduleWithFixedDelay(() -> {
try {
log.debug("Starting transports check");
stopWatch.start();
String accessToken = tbClient.logIn();
reporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime());
try (WsClient wsClient = wsClientFactory.createClient(accessToken)) {
wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply();
for (TransportHealthChecker<?> transportHealthChecker : transportHealthCheckers) {
transportHealthChecker.check(wsClient);
}
}
reporter.reportLatencies(tbClient);
log.debug("Finished transports check");
} catch (Throwable error) {
try {
reporter.serviceFailure(MonitoredServiceKey.GENERAL, error);
} catch (Throwable reportError) {
log.error("Error occurred during service failure reporting", reportError);
}
}
}, 0, monitoringRateMs, TimeUnit.MILLISECONDS);
}
private void checkMonitoringTarget(TransportMonitoringConfig config, MonitoringTargetConfig target, TbClient tbClient) {
DeviceConfig deviceConfig = target.getDevice();
tbClient.logIn();
DeviceId deviceId;
if (deviceConfig == null || deviceConfig.getId() == null) {
String deviceName = String.format("[%s] Monitoring device (%s)", config.getTransportType(), target.getBaseUrl());
Device device = tbClient.getTenantDevice(deviceName)
.orElseGet(() -> {
log.info("Creating new device '{}'", deviceName);
Device monitoringDevice = new Device();
monitoringDevice.setName(deviceName);
monitoringDevice.setType("default");
DeviceData deviceData = new DeviceData();
deviceData.setConfiguration(new DefaultDeviceConfiguration());
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
return tbClient.saveDevice(monitoringDevice);
});
deviceId = device.getId();
target.getDevice().setId(deviceId.toString());
} else {
deviceId = new DeviceId(deviceConfig.getId());
}
log.info("Loading credentials for device {}", deviceId);
DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(deviceId)
.orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + deviceId));
target.getDevice().setCredentials(credentials);
}
}

78
monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/CoapTransportHealthChecker.java

@ -0,0 +1,78 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.transport.impl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
import org.thingsboard.monitoring.config.TransportType;
import org.thingsboard.monitoring.config.service.CoapTransportMonitoringConfig;
import org.thingsboard.monitoring.transport.TransportHealthChecker;
import java.io.IOException;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class CoapTransportHealthChecker extends TransportHealthChecker<CoapTransportMonitoringConfig> {
private CoapClient coapClient;
protected CoapTransportHealthChecker(CoapTransportMonitoringConfig config, MonitoringTargetConfig target) {
super(config, target);
}
@Override
protected void initClient() throws Exception {
if (coapClient == null) {
String accessToken = target.getDevice().getCredentials().getCredentialsId();
String uri = target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry";
coapClient = new CoapClient(uri);
coapClient.setTimeout((long) config.getRequestTimeoutMs());
log.debug("Initialized CoAP client for URI {}", uri);
}
}
@Override
protected void sendTestPayload(String payload) throws Exception {
CoapResponse response = coapClient.post(payload, MediaTypeRegistry.APPLICATION_JSON);
CoAP.ResponseCode code = response.getCode();
if (code.codeClass != CoAP.CodeClass.SUCCESS_RESPONSE.value) {
throw new IOException("COAP client didn't receive success response from transport");
}
}
@Override
protected void destroyClient() throws Exception {
if (coapClient != null) {
coapClient.shutdown();
coapClient = null;
log.info("Disconnected CoAP client");
}
}
@Override
protected TransportType getTransportType() {
return TransportType.COAP;
}
}

67
monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/HttpTransportHealthChecker.java

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.transport.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
import org.thingsboard.monitoring.config.TransportType;
import org.thingsboard.monitoring.config.service.HttpTransportMonitoringConfig;
import org.thingsboard.monitoring.transport.TransportHealthChecker;
import java.time.Duration;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class HttpTransportHealthChecker extends TransportHealthChecker<HttpTransportMonitoringConfig> {
private RestTemplate restTemplate;
protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, MonitoringTargetConfig target) {
super(config, target);
}
@Override
protected void initClient() throws Exception {
if (restTemplate == null) {
restTemplate = new RestTemplateBuilder()
.setConnectTimeout(Duration.ofMillis(config.getRequestTimeoutMs()))
.setReadTimeout(Duration.ofMillis(config.getRequestTimeoutMs()))
.build();
log.debug("Initialized HTTP client");
}
}
@Override
protected void sendTestPayload(String payload) throws Exception {
String accessToken = target.getDevice().getCredentials().getCredentialsId();
restTemplate.postForObject(target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry", payload, String.class);
}
@Override
protected void destroyClient() throws Exception {}
@Override
protected TransportType getTransportType() {
return TransportType.HTTP;
}
}

87
monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/MqttTransportHealthChecker.java

@ -0,0 +1,87 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.transport.impl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
import org.thingsboard.monitoring.config.TransportType;
import org.thingsboard.monitoring.config.service.MqttTransportMonitoringConfig;
import org.thingsboard.monitoring.transport.TransportHealthChecker;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class MqttTransportHealthChecker extends TransportHealthChecker<MqttTransportMonitoringConfig> {
private MqttClient mqttClient;
private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry";
protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, MonitoringTargetConfig target) {
super(config, target);
}
@Override
protected void initClient() throws Exception {
if (mqttClient == null || !mqttClient.isConnected()) {
String clientId = MqttAsyncClient.generateClientId();
String accessToken = target.getDevice().getCredentials().getCredentialsId();
mqttClient = new MqttClient(target.getBaseUrl(), clientId, new MemoryPersistence());
mqttClient.setTimeToWait(config.getRequestTimeoutMs());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(accessToken);
options.setConnectionTimeout(config.getRequestTimeoutMs() / 1000);
IMqttToken result = mqttClient.connectWithResult(options);
if (result.getException() != null) {
throw result.getException();
}
log.debug("Initialized MQTT client for URI {}", mqttClient.getServerURI());
}
}
@Override
protected void sendTestPayload(String payload) throws Exception {
MqttMessage message = new MqttMessage();
message.setPayload(payload.getBytes());
message.setQos(config.getQos());
mqttClient.publish(DEVICE_TELEMETRY_TOPIC, message);
}
@Override
protected void destroyClient() throws Exception {
if (mqttClient != null) {
mqttClient.disconnect();
mqttClient = null;
log.info("Disconnected MQTT client");
}
}
@Override
protected TransportType getTransportType() {
return TransportType.MQTT;
}
}

41
monitoring/src/main/java/org/thingsboard/monitoring/util/TbStopWatch.java

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.util;
import org.apache.commons.lang3.time.StopWatch;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TbStopWatch {
private final StopWatch internal = new StopWatch();
public void start() {
internal.reset();
internal.start();
}
public long getTime() {
internal.stop();
long nanoTime = internal.getNanoTime();
internal.reset();
return nanoTime;
}
}

38
monitoring/src/main/resources/logback.xml

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Copyright © 2016-2023 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE configuration>
<configuration scan="true" scanPeriod="10 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org" level="WARN"/>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.monitoring" level="INFO"/>
<logger name="org.thingsboard.monitoring.client" level="WARN"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

107
monitoring/src/main/resources/tb-monitoring.yml

@ -0,0 +1,107 @@
#
# Copyright © 2016-2023 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
monitoring:
# Monitored server domain
domain: "${DOMAIN:localhost}"
rest:
# Base REST API url, https://DOMAIN by default
base_url: '${REST_BASE_URL:https://${monitoring.domain}}'
# Authentication username
username: '${REST_AUTH_USERNAME:tenant@thingsboard.org}'
# Authentication password
password: '${REST_AUTH_PASSWORD:tenant}'
# REST request timeout in milliseconds
request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:5000}'
ws:
# WebSocket url, wss://DOMAIN by default
base_url: '${WS_BASE_URL:wss://${monitoring.domain}}'
# WebSocket request timeout
request_timeout_ms: '${WS_REQUEST_TIMEOUT_MS:3000}'
# Maximum time between request to transport and WebSocket update
check_timeout_ms: '${CHECK_TIMEOUT_MS:5000}'
# Failures threshold for notifying
failures_threshold: '${FAILURES_THRESHOLD:2}'
# Whether to notify about next failures after first notification (will notify after each FAILURES_THRESHOLD failures)
send_repeated_failure_notification: '${SEND_REPEATED_FAILURE_NOTIFICATION:true}'
transports:
# Transports check frequency in milliseconds
monitoring_rate_ms: '${TRANSPORTS_MONITORING_RATE_MS:10000}'
mqtt:
# Enable MQTT checks
enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}'
# MQTT request timeout in milliseconds
request_timeout_ms: '${MQTT_REQUEST_TIMEOUT_MS:4000}'
# MQTT QoS
qos: '${MQTT_QOS_LEVEL:1}'
targets:
# MQTT base url, tcp://DOMAIN:1883 by default
- base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}'
device:
# MQTT device to push telemetry for. If not set - device will be found or created automatically
id: '${MQTT_TRANSPORT_TARGET_DEVICE_ID:}'
# To add more targets, use following environment variables:
# monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[1].device.id,
# monitoring.transports.mqtt.targets[2].base_url, monitoring.transports.mqtt.targets[2].device.id, etc.
coap:
# Enable CoAP checks
enabled: '${COAP_TRANSPORT_MONITORING_ENABLED:true}'
# CoAP request timeout in milliseconds
request_timeout_ms: '${COAP_REQUEST_TIMEOUT_MS:4000}'
targets:
# CoAP base url, coap://DOMAIN by default
- base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}'
# CoAP device to push telemetry for. If not set - device will be found or created automatically
device:
id: '${COAP_TRANSPORT_TARGET_DEVICE_ID:}'
# To add more targets, use following environment variables:
# monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[1].device.id,
# monitoring.transports.coap.targets[2].base_url, monitoring.transports.coap.targets[2].device.id, etc.
http:
# Enable HTTP checks
enabled: '${HTTP_TRANSPORT_MONITORING_ENABLED:true}'
# HTTP request timeout in milliseconds
request_timeout_ms: '${HTTP_REQUEST_TIMEOUT_MS:4000}'
targets:
# HTTP base url, https://DOMAIN by default
- base_url: '${HTTP_TRANSPORT_BASE_URL:https://${monitoring.domain}}'
device:
# HTTP device to push telemetry for. If not set - device will be found or created automatically
id: '${HTTP_TRANSPORT_TARGET_DEVICE_ID:}'
# To add more targets, use following environment variables:
# monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[1].device.id,
# monitoring.transports.http.targets[2].base_url, monitoring.transports.http.targets[2].device.id, etc.
notification_channels:
slack:
# Enable notifying via Slack
enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}'
# Slack webhook url
webhook_url: '${SLACK_WEBHOOK_URL:}'
latency:
# Enable latencies reporting
enabled: "${LATENCY_REPORTING_ENABLED:true}"
# Latency threshold for notifying
threshold_ms: '${LATENCY_THRESHOLD:2000}'
# ID of the asset to save latencies to. If not set and latencies reporting is enabled - asset will be found or created automatically
reporting_asset_id: '${LATENCY_REPORTING_ASSET_ID:}'

30
msa/monitoring/docker/Dockerfile

@ -0,0 +1,30 @@
#
# Copyright © 2016-2023 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM thingsboard/openjdk11:bullseye-slim
COPY start-tb-monitoring.sh ${pkg.name}.deb /tmp/
RUN chmod a+x /tmp/*.sh \
&& mv /tmp/start-tb-monitoring.sh /usr/bin && \
(yes | dpkg -i /tmp/${pkg.name}.deb) && \
rm /tmp/${pkg.name}.deb && \
(systemctl --no-reload disable --now ${pkg.name}.service > /dev/null 2>&1 || :) && \
chmod 555 ${pkg.installFolder}/bin/${pkg.name}.jar
USER ${pkg.user}
CMD ["start-tb-monitoring.sh"]

31
msa/monitoring/docker/start-tb-monitoring.sh

@ -0,0 +1,31 @@
#!/bin/bash
#
# Copyright © 2016-2023 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
CONF_FOLDER=${pkg.installFolder}/conf
jarfile=${pkg.installFolder}/bin/${pkg.name}.jar
configfile=${pkg.name}.conf
source "${CONF_FOLDER}/${configfile}"
echo "Starting '${project.name}' ..."
cd ${pkg.installFolder}/bin
exec java -cp ${jarfile} $JAVA_OPTS -Dloader.main=org.thingsboard.monitoring.ThingsboardMonitoringApplication \
-Dspring.jpa.hibernate.ddl-auto=none \
-Dlogging.config=$CONF_FOLDER/logback.xml \
org.springframework.boot.loader.PropertiesLauncher

192
msa/monitoring/pom.xml

@ -0,0 +1,192 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016-2023 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.5.0-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>
<artifactId>monitoring</artifactId>
<packaging>pom</packaging>
<name>ThingsBoard Monitoring Microservice</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
<pkg.name>tb-monitoring</pkg.name>
<docker.name>tb-monitoring</docker.name>
<pkg.logFolder>/var/log/${pkg.name}</pkg.logFolder>
<pkg.installFolder>/usr/share/${pkg.name}</pkg.installFolder>
<docker.push-arm-amd-image.phase>pre-integration-test</docker.push-arm-amd-image.phase>
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>monitoring</artifactId>
<version>${project.version}</version>
<classifier>deb</classifier>
<type>deb</type>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-tb-monitoring-deb</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.thingsboard</groupId>
<artifactId>monitoring</artifactId>
<classifier>deb</classifier>
<type>deb</type>
<destFileName>${pkg.name}.deb</destFileName>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-docker-config</id>
<phase>process-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<resources>
<resource>
<directory>docker</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<executions>
<execution>
<id>build-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>build</goal>
</goals>
<configuration>
<skip>${dockerfile.skip}</skip>
<repository>${docker.repo}/${docker.name}</repository>
<verbose>true</verbose>
<googleContainerRegistryEnabled>false</googleContainerRegistryEnabled>
<contextDirectory>${project.build.directory}</contextDirectory>
</configuration>
</execution>
<execution>
<id>tag-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>tag</goal>
</goals>
<configuration>
<skip>${dockerfile.skip}</skip>
<repository>${docker.repo}/${docker.name}</repository>
<tag>${project.version}</tag>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>push-docker-image</id>
<activation>
<property>
<name>push-docker-image</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<executions>
<execution>
<id>push-latest-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>push</goal>
</goals>
<configuration>
<tag>latest</tag>
<repository>${docker.repo}/${docker.name}</repository>
</configuration>
</execution>
<execution>
<id>push-version-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>push</goal>
</goals>
<configuration>
<tag>${project.version}</tag>
<repository>${docker.repo}/${docker.name}</repository>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<repositories>
<repository>
<id>jenkins</id>
<name>Jenkins Repository</name>
<url>https://repo.jenkins-ci.org/releases</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

1
msa/pom.xml

@ -47,6 +47,7 @@
<module>tb-node</module>
<module>transport</module>
<module>js-executor</module>
<module>monitoring</module>
</modules>
<profiles>

6
pom.xml

@ -164,6 +164,7 @@
<module>application</module>
<module>msa</module>
<module>rest-client</module>
<module>monitoring</module>
</modules>
<profiles>
@ -1084,6 +1085,11 @@
<artifactId>jaxb-runtime</artifactId>
<version>${jaxb-runtime.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>

Loading…
Cancel
Save