diff --git a/monitoring/pom.xml b/monitoring/pom.xml new file mode 100644 index 0000000000..429df521b6 --- /dev/null +++ b/monitoring/pom.xml @@ -0,0 +1,160 @@ + + + + 4.0.0 + + org.thingsboard + 3.5.0-SNAPSHOT + thingsboard + + + monitoring + ThingsBoard Monitoring Service + jar + + + UTF-8 + ${basedir}/.. + java + false + process-resources + package + tb-monitoring + false + ${project.build.directory}/windows + ThingsBoard Monitoring Service + org.thingsboard.monitoring.ThingsboardMonitoringApplication + + + + + org.thingsboard.common + data + + + org.thingsboard.common + util + + + org.thingsboard + rest-client + compile + + + org.springframework.boot + spring-boot-starter + + + + org.eclipse.californium + californium-core + + + org.eclipse.californium + scandium + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + org.apache.httpcomponents + httpclient + + + org.java-websocket + Java-WebSocket + compile + + + com.google.guava + guava + + + org.apache.commons + commons-lang3 + + + org.slf4j + slf4j-api + + + org.slf4j + log4j-over-slf4j + + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + + + + + ${pkg.name}-${project.version} + + + ${project.basedir}/src/main/resources + + + + + org.apache.maven.plugins + maven-resources-plugin + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.springframework.boot + spring-boot-maven-plugin + + + org.thingsboard + gradle-maven-plugin + + + org.apache.maven.plugins + maven-assembly-plugin + + + org.apache.maven.plugins + maven-install-plugin + + + + + + jenkins + Jenkins Repository + https://repo.jenkins-ci.org/releases + + false + + + + diff --git a/monitoring/src/main/conf/logback.xml b/monitoring/src/main/conf/logback.xml new file mode 100644 index 0000000000..58e49c7639 --- /dev/null +++ b/monitoring/src/main/conf/logback.xml @@ -0,0 +1,46 @@ + + + + + + + ${pkg.logFolder}/${pkg.name}.log + + ${pkg.logFolder}/${pkg.name}.%d{yyyy-MM-dd}.%i.log + 100MB + 30 + 3GB + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/monitoring/src/main/conf/tb-monitoring.conf b/monitoring/src/main/conf/tb-monitoring.conf new file mode 100644 index 0000000000..d1d729bd20 --- /dev/null +++ b/monitoring/src/main/conf/tb-monitoring.conf @@ -0,0 +1,22 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=/var/log/tb-monitoring/gc.log:time,uptime,level,tags:filecount=10,filesize=10M" +export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError" +export JAVA_OPTS="$JAVA_OPTS -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark" +export JAVA_OPTS="$JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:+UseStringDeduplication -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10" +export LOG_FILENAME=tb-monitoring.out +export LOADER_PATH=/usr/share/tb-monitoring/conf diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java new file mode 100644 index 0000000000..3fa43a8175 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.thingsboard.common.util.ThingsBoardThreadFactory; + +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@SpringBootApplication +@EnableScheduling +@Slf4j +public class ThingsboardMonitoringApplication { + + public static void main(String[] args) { + new SpringApplicationBuilder(ThingsboardMonitoringApplication.class) + .properties(Map.of("spring.config.name", "tb-monitoring")) + .run(args); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java new file mode 100644 index 0000000000..9dcc58cf2f --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.thingsboard.rest.client.RestClient; + +import java.time.Duration; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class TbClient extends RestClient { + + @Value("${monitoring.rest.username}") + private String username; + @Value("${monitoring.rest.password}") + private String password; + + public TbClient(@Value("${monitoring.rest.base_url}") String baseUrl, + @Value("${monitoring.rest.request_timeout_ms}") int requestTimeoutMs) { + super(new RestTemplateBuilder() + .setConnectTimeout(Duration.ofMillis(requestTimeoutMs)) + .setReadTimeout(Duration.ofMillis(requestTimeoutMs)) + .build(), baseUrl); + } + + public String logIn() { + login(username, password); + return getToken(); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java new file mode 100644 index 0000000000..20e7a900ff --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java @@ -0,0 +1,194 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.data.cmd.CmdsWrapper; +import org.thingsboard.monitoring.data.cmd.EntityDataCmd; +import org.thingsboard.monitoring.data.cmd.EntityDataUpdate; +import org.thingsboard.monitoring.data.cmd.LatestValueCmd; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.query.EntityDataPageLink; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.EntityListFilter; + +import javax.net.ssl.SSLParameters; +import java.net.URI; +import java.nio.channels.NotYetConnectedException; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +@Slf4j +public class WsClient extends WebSocketClient implements AutoCloseable { + + public volatile JsonNode lastMsg; + private CountDownLatch reply; + private CountDownLatch update; + + private final Lock updateLock = new ReentrantLock(); + + private long requestTimeoutMs; + + public WsClient(URI serverUri, long requestTimeoutMs) { + super(serverUri); + this.requestTimeoutMs = requestTimeoutMs; + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + + } + + @Override + public void onMessage(String s) { + if (s == null) { + return; + } + updateLock.lock(); + try { + lastMsg = JacksonUtil.toJsonNode(s); + log.trace("Received new msg: {}", lastMsg.toPrettyString()); + if (update != null) { + update.countDown(); + } + if (reply != null) { + reply.countDown(); + } + } finally { + updateLock.unlock(); + } + } + + @Override + public void onClose(int i, String s, boolean b) { + log.debug("WebSocket client is closed"); + } + + @Override + public void onError(Exception e) { + log.error("WebSocket client error:", e); + } + + public void registerWaitForUpdate() { + updateLock.lock(); + try { + lastMsg = null; + update = new CountDownLatch(1); + } finally { + updateLock.unlock(); + } + log.trace("Registered wait for update"); + } + + @Override + public void send(String text) throws NotYetConnectedException { + updateLock.lock(); + try { + reply = new CountDownLatch(1); + } finally { + updateLock.unlock(); + } + super.send(text); + } + + public WsClient subscribeForTelemetry(List devices, String key) { + EntityDataCmd cmd = new EntityDataCmd(); + cmd.setCmdId(RandomUtils.nextInt(0, 1000)); + + EntityListFilter devicesFilter = new EntityListFilter(); + devicesFilter.setEntityType(EntityType.DEVICE); + devicesFilter.setEntityList(devices.stream().map(UUID::toString).collect(Collectors.toList())); + EntityDataPageLink pageLink = new EntityDataPageLink(100,0, null, null); + EntityDataQuery devicesQuery = new EntityDataQuery(devicesFilter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + cmd.setQuery(devicesQuery); + + LatestValueCmd latestCmd = new LatestValueCmd(); + latestCmd.setKeys(List.of(new EntityKey(EntityKeyType.TIME_SERIES, key))); + cmd.setLatestCmd(latestCmd); + + CmdsWrapper wrapper = new CmdsWrapper(); + wrapper.setEntityDataCmds(List.of(cmd)); + send(JacksonUtil.toString(wrapper)); + return this; + } + + public JsonNode waitForUpdate(long ms) { + log.trace("update latch count: {}", update.getCount()); + try { + if (update.await(ms, TimeUnit.MILLISECONDS)) { + log.trace("Waited for update"); + return getLastMsg(); + } + } catch (InterruptedException e) { + log.debug("Failed to await reply", e); + } + log.trace("No update arrived within {} ms", ms); + return null; + } + + public JsonNode waitForReply() { + try { + if (reply.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { + log.trace("Waited for reply"); + return getLastMsg(); + } + } catch (InterruptedException e) { + log.debug("Failed to await reply", e); + } + log.trace("No reply arrived within {} ms", requestTimeoutMs); + throw new IllegalStateException("No WS reply arrived within " + requestTimeoutMs + " ms"); + } + + private JsonNode getLastMsg() { + if (lastMsg != null) { + JsonNode errorMsg = lastMsg.get("errorMsg"); + if (errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText())) { + throw new RuntimeException("WS error from server: " + errorMsg.asText()); + } else { + return lastMsg; + } + } else { + return null; + } + } + + public Object getTelemetryUpdate(UUID deviceId, String key) { + JsonNode lastMsg = getLastMsg(); + if (lastMsg == null || lastMsg.isNull()) return null; + EntityDataUpdate update = JacksonUtil.treeToValue(lastMsg, EntityDataUpdate.class); + return update.getLatest(deviceId, key); + } + + @Override + protected void onSetSSLParameters(SSLParameters sslParameters) { + sslParameters.setEndpointIdentificationAlgorithm(null); + } + +} \ No newline at end of file diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java new file mode 100644 index 0000000000..1bab803dc9 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import lombok.RequiredArgsConstructor; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.http.ssl.TrustStrategy; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.data.Latencies; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.monitoring.util.TbStopWatch; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +@Component +@RequiredArgsConstructor +public class WsClientFactory { + + private final MonitoringReporter monitoringReporter; + private final TbStopWatch stopWatch; + @Value("${monitoring.ws.base_url}") + private String baseUrl; + @Value("${monitoring.ws.request_timeout_ms}") + private int requestTimeoutMs; + + public WsClient createClient(String accessToken) throws Exception { + URI uri = new URI(baseUrl + "/api/ws/plugins/telemetry?token=" + accessToken); + stopWatch.start(); + WsClient wsClient = new WsClient(uri, requestTimeoutMs); + if (baseUrl.startsWith("wss")) { + SSLContextBuilder builder = SSLContexts.custom(); + builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); + wsClient.setSocketFactory(builder.build().getSocketFactory()); + } + boolean connected = wsClient.connectBlocking(requestTimeoutMs, TimeUnit.MILLISECONDS); + if (!connected) { + throw new IllegalStateException("Failed to establish WS session"); + } + monitoringReporter.reportLatency(Latencies.WS_CONNECT, stopWatch.getTime()); + return wsClient; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java new file mode 100644 index 0000000000..548ad6d08b --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config; + +import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.util.UUID; + +@Data +public class DeviceConfig { + + private UUID id; + private DeviceCredentials credentials; + + public void setId(String id) { + this.id = StringUtils.isNotEmpty(id) ? UUID.fromString(id) : null; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java new file mode 100644 index 0000000000..5f1ab49e91 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config; + +import lombok.Data; + +@Data +public class MonitoringTargetConfig { + + private String baseUrl; + private DeviceConfig device; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java new file mode 100644 index 0000000000..ed0a4ea331 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.thingsboard.monitoring.transport.TransportHealthChecker; +import org.thingsboard.monitoring.transport.impl.CoapTransportHealthChecker; +import org.thingsboard.monitoring.transport.impl.HttpTransportHealthChecker; +import org.thingsboard.monitoring.transport.impl.MqttTransportHealthChecker; + +@AllArgsConstructor +@Getter +public enum TransportType { + + MQTT(MqttTransportHealthChecker.class), + COAP(CoapTransportHealthChecker.class), + HTTP(HttpTransportHealthChecker.class); + + private final Class> serviceClass; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringConfig.java new file mode 100644 index 0000000000..905858b4eb --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringConfig.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.coap.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.coap") +public class CoapTransportMonitoringConfig extends TransportMonitoringConfig { + + @Override + public TransportType getTransportType() { + return TransportType.COAP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringConfig.java new file mode 100644 index 0000000000..3a3e8f612c --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringConfig.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.http.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.http") +public class HttpTransportMonitoringConfig extends TransportMonitoringConfig { + + @Override + public TransportType getTransportType() { + return TransportType.HTTP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringConfig.java new file mode 100644 index 0000000000..7fef5b95dc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringConfig.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.mqtt.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.mqtt") +@Data +@EqualsAndHashCode(callSuper = true) +public class MqttTransportMonitoringConfig extends TransportMonitoringConfig { + + private Integer qos; + + @Override + public TransportType getTransportType() { + return TransportType.MQTT; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringConfig.java new file mode 100644 index 0000000000..0712d1d919 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringConfig.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import lombok.Data; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; + +import java.util.List; + +@Data +public abstract class TransportMonitoringConfig { + + private int requestTimeoutMs; + + private List targets; + + public abstract TransportType getTransportType(); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java new file mode 100644 index 0000000000..8141c34586 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +import org.thingsboard.monitoring.config.TransportType; + +public class Latencies { + + public static final String WS_UPDATE = "wsUpdate"; + public static final String WS_CONNECT = "wsConnect"; + public static final String LOG_IN = "logIn"; + + public static String transportRequest(TransportType transportType) { + return String.format("%sTransportRequest", transportType.name().toLowerCase()); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java new file mode 100644 index 0000000000..c64291e30f --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +import com.google.common.util.concurrent.AtomicDouble; +import lombok.RequiredArgsConstructor; + +import java.util.concurrent.atomic.AtomicInteger; + +@RequiredArgsConstructor +public class Latency { + + private final String key; + private final AtomicDouble latencySum = new AtomicDouble(); + private final AtomicInteger counter = new AtomicInteger(); + + public synchronized void report(double latencyInMs) { + latencySum.addAndGet(latencyInMs); + counter.incrementAndGet(); + } + + public synchronized double getAvg() { + return latencySum.get() / counter.get(); + } + + public boolean isNotEmpty() { + return counter.get() > 0; + } + + public synchronized void reset() { + latencySum.set(0.0); + counter.set(0); + } + + public String getKey() { + return key; + } + + public synchronized Latency snapshot() { + Latency snapshot = new Latency(key); + snapshot.latencySum.set(latencySum.get()); + snapshot.counter.set(counter.get()); + return snapshot; + } + + @Override + public String toString() { + return "Latency{" + + "key='" + key + '\'' + + ", avgLatency=" + getAvg() + + '}'; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java new file mode 100644 index 0000000000..825162952f --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +public class MonitoredServiceKey { + + public static final String GENERAL = "Monitoring"; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java new file mode 100644 index 0000000000..8157c4af5f --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +public class TransportFailureException extends RuntimeException { + + public TransportFailureException(Throwable cause) { + super(cause.getMessage(), cause); + } + + public TransportFailureException(String message) { + super(message); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java new file mode 100644 index 0000000000..d7619ea32d --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +import lombok.Data; +import org.thingsboard.monitoring.config.TransportType; + +@Data +public class TransportInfo { + + private final TransportType transportType; + private final String url; + + @Override + public String toString() { + return String.format("%s (%s)", transportType, url); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java new file mode 100644 index 0000000000..34227150ea --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.cmd; + +import lombok.Data; + +import java.util.List; + +@Data +public class CmdsWrapper { + + private List entityDataCmds; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataCmd.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataCmd.java new file mode 100644 index 0000000000..ccd6fa8f7c --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataCmd.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.cmd; + +import lombok.Data; +import org.thingsboard.server.common.data.query.EntityDataQuery; + +@Data +public class EntityDataCmd { + + private int cmdId; + private EntityDataQuery query; + private LatestValueCmd latestCmd; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java new file mode 100644 index 0000000000..5f3a466217 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.cmd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.TsValue; + +import java.util.List; +import java.util.UUID; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class EntityDataUpdate { + + @JsonIgnoreProperties(ignoreUnknown = true) + private List update; + + public String getLatest(UUID entityId, String key) { + if (update == null) return null; + + return update.stream() + .filter(entityData -> entityData.getEntityId().getId().equals(entityId)).findFirst() + .map(EntityData::getLatest).map(latest -> latest.get(EntityKeyType.TIME_SERIES)) + .map(latest -> latest.get(key)).map(TsValue::getValue) + .orElse(null); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/LatestValueCmd.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/LatestValueCmd.java new file mode 100644 index 0000000000..5c2cffcbb6 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/LatestValueCmd.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.cmd; + +import lombok.Data; +import org.thingsboard.server.common.data.query.EntityKey; + +import java.util.List; + +@Data +public class LatestValueCmd { + + private List keys; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java new file mode 100644 index 0000000000..939cb7440f --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import org.thingsboard.monitoring.data.Latency; + +import java.util.Collection; + +public class HighLatencyNotification implements Notification { + + private final Collection latencies; + private final int thresholdMs; + + public HighLatencyNotification(Collection latencies, int thresholdMs) { + this.latencies = latencies; + this.thresholdMs = thresholdMs; + } + + @Override + public String getText() { + StringBuilder text = new StringBuilder(); + text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); + latencies.forEach(latency -> { + text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg())); + }); + return text.toString(); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java new file mode 100644 index 0000000000..21e5994d84 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +public interface Notification { + + String getText(); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java new file mode 100644 index 0000000000..4b78d96a63 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import lombok.Getter; +import org.apache.commons.lang3.exception.ExceptionUtils; + +@Getter +public class ServiceFailureNotification implements Notification { + + private final Object serviceKey; + private final Throwable error; + private final int failuresCount; + + public ServiceFailureNotification(Object serviceKey, Throwable error, int failuresCount) { + this.serviceKey = serviceKey; + this.error = error; + this.failuresCount = failuresCount; + } + + @Override + public String getText() { + String errorMsg = error.getMessage(); + if (errorMsg == null || errorMsg.equals("null")) { + Throwable cause = ExceptionUtils.getRootCause(error); + if (cause != null) { + errorMsg = cause.getMessage(); + } + } + if (errorMsg == null) { + errorMsg = error.getClass().getSimpleName(); + } + return String.format("[%s] Failure: %s (number of subsequent failures: %s)", serviceKey, errorMsg, failuresCount); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java new file mode 100644 index 0000000000..44eaf093cc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +public class ServiceRecoveryNotification implements Notification { + + private final Object serviceKey; + + public ServiceRecoveryNotification(Object serviceKey) { + this.serviceKey = serviceKey; + } + + @Override + public String getText() { + return String.format("[%s] is OK", serviceKey); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java new file mode 100644 index 0000000000..f882105048 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java @@ -0,0 +1,53 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.notification; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.data.notification.Notification; +import org.thingsboard.monitoring.notification.channels.NotificationChannel; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +@Component +@RequiredArgsConstructor +@Slf4j +public class NotificationService { + + private final List notificationChannels; + private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(); + + public void sendNotification(Notification notification) { + forEachNotificationChannel(notificationChannel -> notificationChannel.sendNotification(notification)); + } + + private void forEachNotificationChannel(Consumer function) { + notificationChannels.forEach(notificationChannel -> { + notificationExecutor.submit(() -> { + try { + function.accept(notificationChannel); + } catch (Exception e) { + log.error("Failed to send notification to {}", notificationChannel.getClass().getSimpleName(), e); + } + }); + }); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java new file mode 100644 index 0000000000..6c1c23cc28 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.notification.channels; + +import org.thingsboard.monitoring.data.notification.Notification; + +public interface NotificationChannel { + + void sendNotification(Notification notification); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java new file mode 100644 index 0000000000..092ea0b8c6 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.notification.channels.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.monitoring.data.notification.Notification; +import org.thingsboard.monitoring.notification.channels.NotificationChannel; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.Map; + +@Component +@ConditionalOnProperty(value = "monitoring.notification_channels.slack.enabled", havingValue = "true") +@Slf4j +public class SlackNotificationChannel implements NotificationChannel { + + @Value("${monitoring.notification_channels.slack.webhook_url}") + private String webhookUrl; + + private RestTemplate restTemplate; + + @PostConstruct + private void init() { + restTemplate = new RestTemplateBuilder() + .setConnectTimeout(Duration.ofSeconds(5)) + .setReadTimeout(Duration.ofSeconds(2)) + .build(); + } + + @Override + public void sendNotification(Notification notification) { + sendNotification(notification.getText()); + } + + private void sendNotification(String message) { + restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java new file mode 100644 index 0000000000..4649e31ac4 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -0,0 +1,144 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service; + +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.data.Latency; +import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.data.notification.HighLatencyNotification; +import org.thingsboard.monitoring.data.notification.ServiceFailureNotification; +import org.thingsboard.monitoring.data.notification.ServiceRecoveryNotification; +import org.thingsboard.monitoring.notification.NotificationService; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +@Component +@RequiredArgsConstructor +@Slf4j +public class MonitoringReporter { + + private final NotificationService notificationService; + + private final Map latencies = new ConcurrentHashMap<>(); + private final Map failuresCounters = new ConcurrentHashMap<>(); + + @Value("${monitoring.failures_threshold}") + private int failuresThreshold; + @Value("${monitoring.send_repeated_failure_notification}") + private boolean sendRepeatedFailureNotification; + + @Value("${monitoring.latency.enabled}") + private boolean latencyReportingEnabled; + @Value("${monitoring.latency.threshold_ms}") + private int latencyThresholdMs; + @Value("${monitoring.latency.reporting_asset_id}") + private String reportingAssetId; + + public void reportLatencies(TbClient tbClient) { + List latencies = this.latencies.values().stream() + .filter(Latency::isNotEmpty) + .map(latency -> { + Latency snapshot = latency.snapshot(); + latency.reset(); + return snapshot; + }) + .collect(Collectors.toList()); + if (latencies.isEmpty()) { + return; + } + log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms") + .collect(Collectors.joining("\n"))); + + if (!latencyReportingEnabled) return; + + if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) { + HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs); + notificationService.sendNotification(highLatencyNotification); + } + + try { + if (StringUtils.isBlank(reportingAssetId)) { + String assetName = "Monitoring"; + Asset monitoringAsset = tbClient.findAsset(assetName).orElseGet(() -> { + Asset asset = new Asset(); + asset.setType("Monitoring"); + asset.setName(assetName); + asset = tbClient.saveAsset(asset); + log.info("Created monitoring asset {}", asset.getId()); + return asset; + }); + reportingAssetId = monitoringAsset.getId().toString(); + } + + ObjectNode msg = JacksonUtil.newObjectNode(); + latencies.forEach(latency -> { + msg.set(latency.getKey(), new DoubleNode(latency.getAvg())); + }); + tbClient.saveEntityTelemetry(new AssetId(UUID.fromString(reportingAssetId)), "time", msg); + } catch (Exception e) { + log.error("Failed to report latencies: {}", e.getMessage()); + } + } + + public void reportLatency(String key, long latencyInNanos) { + String latencyKey = key + "Latency"; + double latencyInMs = (double) latencyInNanos / 1000_000; + log.trace("Reporting latency [{}]: {} ms", key, latencyInMs); + latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs); + } + + public void serviceFailure(Object serviceKey, Throwable error) { + if (log.isDebugEnabled()) { + log.error("Error occurred", error); + } + int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet(); + ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount); + log.error(notification.getText()); + if (failuresCount == failuresThreshold || (sendRepeatedFailureNotification && failuresCount % failuresThreshold == 0)) { + notificationService.sendNotification(notification); + } + } + + public void serviceIsOk(Object serviceKey) { + ServiceRecoveryNotification notification = new ServiceRecoveryNotification(serviceKey); + if (!serviceKey.equals(MonitoredServiceKey.GENERAL)) { + log.info(notification.getText()); + } + AtomicInteger failuresCounter = failuresCounters.get(serviceKey); + if (failuresCounter != null) { + if (failuresCounter.get() >= failuresThreshold) { + notificationService.sendNotification(notification); + } + failuresCounter.set(0); + } + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportHealthChecker.java new file mode 100644 index 0000000000..9717b98f51 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportHealthChecker.java @@ -0,0 +1,120 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.transport; + +import com.fasterxml.jackson.databind.node.TextNode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.WsClient; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.service.TransportMonitoringConfig; +import org.thingsboard.monitoring.data.Latencies; +import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.data.TransportFailureException; +import org.thingsboard.monitoring.data.TransportInfo; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.monitoring.util.TbStopWatch; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.UUID; + +@Slf4j +public abstract class TransportHealthChecker { + + protected final C config; + protected final MonitoringTargetConfig target; + private TransportInfo transportInfo; + + @Autowired + private MonitoringReporter reporter; + @Autowired + private TbStopWatch stopWatch; + @Value("${monitoring.check_timeout_ms}") + private int resultCheckTimeoutMs; + + public static final String TEST_TELEMETRY_KEY = "testData"; + + protected TransportHealthChecker(C config, MonitoringTargetConfig target) { + this.config = config; + this.target = target; + } + + @PostConstruct + private void init() { + transportInfo = new TransportInfo(getTransportType(), target.getBaseUrl()); + } + + public final void check(WsClient wsClient) { + log.debug("[{}] Checking", transportInfo); + try { + wsClient.registerWaitForUpdate(); + + String testValue = UUID.randomUUID().toString(); + String testPayload = JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString(); + try { + initClientAndSendPayload(testPayload); + log.trace("[{}] Sent test payload ({})", transportInfo, testPayload); + } catch (Throwable e) { + throw new TransportFailureException(e); + } + + log.trace("[{}] Waiting for WS update", transportInfo); + checkWsUpdate(wsClient, testValue); + + reporter.serviceIsOk(transportInfo); + reporter.serviceIsOk(MonitoredServiceKey.GENERAL); + } catch (TransportFailureException transportFailureException) { + reporter.serviceFailure(transportInfo, transportFailureException); + } catch (Exception e) { + reporter.serviceFailure(MonitoredServiceKey.GENERAL, e); + } + } + + private void initClientAndSendPayload(String payload) throws Throwable { + initClient(); + stopWatch.start(); + sendTestPayload(payload); + reporter.reportLatency(Latencies.transportRequest(getTransportType()), stopWatch.getTime()); + } + + private void checkWsUpdate(WsClient wsClient, String testValue) { + stopWatch.start(); + wsClient.waitForUpdate(resultCheckTimeoutMs); + log.trace("[{}] Waited for WS update. Last WS msg: {}", transportInfo, wsClient.lastMsg); + Object update = wsClient.getTelemetryUpdate(target.getDevice().getId(), TEST_TELEMETRY_KEY); + if (update == null) { + throw new TransportFailureException("No WS update arrived within " + resultCheckTimeoutMs + " ms"); + } else if (!update.toString().equals(testValue)) { + throw new TransportFailureException("Was expecting value " + testValue + " but got " + update); + } + reporter.reportLatency(Latencies.WS_UPDATE, stopWatch.getTime()); + } + + + protected abstract void initClient() throws Exception; + + protected abstract void sendTestPayload(String payload) throws Exception; + + @PreDestroy + protected abstract void destroyClient() throws Exception; + + protected abstract TransportType getTransportType(); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportMonitoringService.java new file mode 100644 index 0000000000..b3dd10db15 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/transport/TransportMonitoringService.java @@ -0,0 +1,143 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.transport; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationContext; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.client.WsClient; +import org.thingsboard.monitoring.client.WsClientFactory; +import org.thingsboard.monitoring.config.DeviceConfig; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.service.TransportMonitoringConfig; +import org.thingsboard.monitoring.data.Latencies; +import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.monitoring.util.TbStopWatch; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; +import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; +import org.thingsboard.server.common.data.device.data.DeviceData; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import javax.annotation.PostConstruct; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Service +@RequiredArgsConstructor +@Slf4j +public final class TransportMonitoringService { + + private final List configs; + private final List> transportHealthCheckers = new LinkedList<>(); + private final List devices = new LinkedList<>(); + + private final TbClient tbClient; + private final WsClientFactory wsClientFactory; + private final TbStopWatch stopWatch; + private final MonitoringReporter reporter; + private final ApplicationContext applicationContext; + private ScheduledExecutorService scheduler; + @Value("${monitoring.transports.monitoring_rate_ms}") + private int monitoringRateMs; + + @PostConstruct + private void init() { + configs.forEach(config -> { + config.getTargets().stream() + .filter(target -> StringUtils.isNotBlank(target.getBaseUrl())) + .peek(target -> checkMonitoringTarget(config, target, tbClient)) + .forEach(target -> { + TransportHealthChecker transportHealthChecker = applicationContext.getBean(config.getTransportType().getServiceClass(), config, target); + transportHealthCheckers.add(transportHealthChecker); + devices.add(target.getDevice().getId()); + }); + }); + scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("monitoring-executor")); + } + + @EventListener(ApplicationReadyEvent.class) + public void startMonitoring() { + scheduler.scheduleWithFixedDelay(() -> { + try { + log.debug("Starting transports check"); + stopWatch.start(); + String accessToken = tbClient.logIn(); + reporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime()); + + try (WsClient wsClient = wsClientFactory.createClient(accessToken)) { + wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply(); + + for (TransportHealthChecker transportHealthChecker : transportHealthCheckers) { + transportHealthChecker.check(wsClient); + } + } + reporter.reportLatencies(tbClient); + log.debug("Finished transports check"); + } catch (Throwable error) { + try { + reporter.serviceFailure(MonitoredServiceKey.GENERAL, error); + } catch (Throwable reportError) { + log.error("Error occurred during service failure reporting", reportError); + } + } + }, 0, monitoringRateMs, TimeUnit.MILLISECONDS); + } + + private void checkMonitoringTarget(TransportMonitoringConfig config, MonitoringTargetConfig target, TbClient tbClient) { + DeviceConfig deviceConfig = target.getDevice(); + tbClient.logIn(); + + DeviceId deviceId; + if (deviceConfig == null || deviceConfig.getId() == null) { + String deviceName = String.format("[%s] Monitoring device (%s)", config.getTransportType(), target.getBaseUrl()); + Device device = tbClient.getTenantDevice(deviceName) + .orElseGet(() -> { + log.info("Creating new device '{}'", deviceName); + Device monitoringDevice = new Device(); + monitoringDevice.setName(deviceName); + monitoringDevice.setType("default"); + DeviceData deviceData = new DeviceData(); + deviceData.setConfiguration(new DefaultDeviceConfiguration()); + deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); + return tbClient.saveDevice(monitoringDevice); + }); + deviceId = device.getId(); + target.getDevice().setId(deviceId.toString()); + } else { + deviceId = new DeviceId(deviceConfig.getId()); + } + + log.info("Loading credentials for device {}", deviceId); + DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(deviceId) + .orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + deviceId)); + target.getDevice().setCredentials(credentials); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/CoapTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/CoapTransportHealthChecker.java new file mode 100644 index 0000000000..3e36e62c58 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/CoapTransportHealthChecker.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.transport.impl; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.service.CoapTransportMonitoringConfig; +import org.thingsboard.monitoring.transport.TransportHealthChecker; + +import java.io.IOException; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class CoapTransportHealthChecker extends TransportHealthChecker { + + private CoapClient coapClient; + + protected CoapTransportHealthChecker(CoapTransportMonitoringConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + if (coapClient == null) { + String accessToken = target.getDevice().getCredentials().getCredentialsId(); + String uri = target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry"; + coapClient = new CoapClient(uri); + coapClient.setTimeout((long) config.getRequestTimeoutMs()); + log.debug("Initialized CoAP client for URI {}", uri); + } + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + CoapResponse response = coapClient.post(payload, MediaTypeRegistry.APPLICATION_JSON); + CoAP.ResponseCode code = response.getCode(); + if (code.codeClass != CoAP.CodeClass.SUCCESS_RESPONSE.value) { + throw new IOException("COAP client didn't receive success response from transport"); + } + } + + @Override + protected void destroyClient() throws Exception { + if (coapClient != null) { + coapClient.shutdown(); + coapClient = null; + log.info("Disconnected CoAP client"); + } + } + + @Override + protected TransportType getTransportType() { + return TransportType.COAP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/HttpTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/HttpTransportHealthChecker.java new file mode 100644 index 0000000000..834025b9e5 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/HttpTransportHealthChecker.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.transport.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.service.HttpTransportMonitoringConfig; +import org.thingsboard.monitoring.transport.TransportHealthChecker; + +import java.time.Duration; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class HttpTransportHealthChecker extends TransportHealthChecker { + + private RestTemplate restTemplate; + + protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + if (restTemplate == null) { + restTemplate = new RestTemplateBuilder() + .setConnectTimeout(Duration.ofMillis(config.getRequestTimeoutMs())) + .setReadTimeout(Duration.ofMillis(config.getRequestTimeoutMs())) + .build(); + log.debug("Initialized HTTP client"); + } + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + String accessToken = target.getDevice().getCredentials().getCredentialsId(); + restTemplate.postForObject(target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry", payload, String.class); + } + + @Override + protected void destroyClient() throws Exception {} + + @Override + protected TransportType getTransportType() { + return TransportType.HTTP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/MqttTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/MqttTransportHealthChecker.java new file mode 100644 index 0000000000..850cfe136b --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/transport/impl/MqttTransportHealthChecker.java @@ -0,0 +1,87 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.transport.impl; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.service.MqttTransportMonitoringConfig; +import org.thingsboard.monitoring.transport.TransportHealthChecker; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class MqttTransportHealthChecker extends TransportHealthChecker { + + private MqttClient mqttClient; + + private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry"; + + protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + if (mqttClient == null || !mqttClient.isConnected()) { + String clientId = MqttAsyncClient.generateClientId(); + String accessToken = target.getDevice().getCredentials().getCredentialsId(); + mqttClient = new MqttClient(target.getBaseUrl(), clientId, new MemoryPersistence()); + mqttClient.setTimeToWait(config.getRequestTimeoutMs()); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(accessToken); + options.setConnectionTimeout(config.getRequestTimeoutMs() / 1000); + IMqttToken result = mqttClient.connectWithResult(options); + if (result.getException() != null) { + throw result.getException(); + } + log.debug("Initialized MQTT client for URI {}", mqttClient.getServerURI()); + } + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + MqttMessage message = new MqttMessage(); + message.setPayload(payload.getBytes()); + message.setQos(config.getQos()); + mqttClient.publish(DEVICE_TELEMETRY_TOPIC, message); + } + + @Override + protected void destroyClient() throws Exception { + if (mqttClient != null) { + mqttClient.disconnect(); + mqttClient = null; + log.info("Disconnected MQTT client"); + } + } + + @Override + protected TransportType getTransportType() { + return TransportType.MQTT; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/util/TbStopWatch.java b/monitoring/src/main/java/org/thingsboard/monitoring/util/TbStopWatch.java new file mode 100644 index 0000000000..49e387999a --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/util/TbStopWatch.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.util; + +import org.apache.commons.lang3.time.StopWatch; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class TbStopWatch { + + private final StopWatch internal = new StopWatch(); + + public void start() { + internal.reset(); + internal.start(); + } + + public long getTime() { + internal.stop(); + long nanoTime = internal.getNanoTime(); + internal.reset(); + return nanoTime; + } + +} diff --git a/monitoring/src/main/resources/logback.xml b/monitoring/src/main/resources/logback.xml new file mode 100644 index 0000000000..a7ca84126f --- /dev/null +++ b/monitoring/src/main/resources/logback.xml @@ -0,0 +1,38 @@ + + + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml new file mode 100644 index 0000000000..403b29acc2 --- /dev/null +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -0,0 +1,107 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +monitoring: + # Monitored server domain + domain: "${DOMAIN:localhost}" + rest: + # Base REST API url, https://DOMAIN by default + base_url: '${REST_BASE_URL:https://${monitoring.domain}}' + # Authentication username + username: '${REST_AUTH_USERNAME:tenant@thingsboard.org}' + # Authentication password + password: '${REST_AUTH_PASSWORD:tenant}' + # REST request timeout in milliseconds + request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:5000}' + ws: + # WebSocket url, wss://DOMAIN by default + base_url: '${WS_BASE_URL:wss://${monitoring.domain}}' + # WebSocket request timeout + request_timeout_ms: '${WS_REQUEST_TIMEOUT_MS:3000}' + + # Maximum time between request to transport and WebSocket update + check_timeout_ms: '${CHECK_TIMEOUT_MS:5000}' + + # Failures threshold for notifying + failures_threshold: '${FAILURES_THRESHOLD:2}' + # Whether to notify about next failures after first notification (will notify after each FAILURES_THRESHOLD failures) + send_repeated_failure_notification: '${SEND_REPEATED_FAILURE_NOTIFICATION:true}' + + transports: + # Transports check frequency in milliseconds + monitoring_rate_ms: '${TRANSPORTS_MONITORING_RATE_MS:10000}' + + mqtt: + # Enable MQTT checks + enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}' + # MQTT request timeout in milliseconds + request_timeout_ms: '${MQTT_REQUEST_TIMEOUT_MS:4000}' + # MQTT QoS + qos: '${MQTT_QOS_LEVEL:1}' + targets: + # MQTT base url, tcp://DOMAIN:1883 by default + - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}' + device: + # MQTT device to push telemetry for. If not set - device will be found or created automatically + id: '${MQTT_TRANSPORT_TARGET_DEVICE_ID:}' + # To add more targets, use following environment variables: + # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[1].device.id, + # monitoring.transports.mqtt.targets[2].base_url, monitoring.transports.mqtt.targets[2].device.id, etc. + + coap: + # Enable CoAP checks + enabled: '${COAP_TRANSPORT_MONITORING_ENABLED:true}' + # CoAP request timeout in milliseconds + request_timeout_ms: '${COAP_REQUEST_TIMEOUT_MS:4000}' + targets: + # CoAP base url, coap://DOMAIN by default + - base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}' + # CoAP device to push telemetry for. If not set - device will be found or created automatically + device: + id: '${COAP_TRANSPORT_TARGET_DEVICE_ID:}' + # To add more targets, use following environment variables: + # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[1].device.id, + # monitoring.transports.coap.targets[2].base_url, monitoring.transports.coap.targets[2].device.id, etc. + + http: + # Enable HTTP checks + enabled: '${HTTP_TRANSPORT_MONITORING_ENABLED:true}' + # HTTP request timeout in milliseconds + request_timeout_ms: '${HTTP_REQUEST_TIMEOUT_MS:4000}' + targets: + # HTTP base url, https://DOMAIN by default + - base_url: '${HTTP_TRANSPORT_BASE_URL:https://${monitoring.domain}}' + device: + # HTTP device to push telemetry for. If not set - device will be found or created automatically + id: '${HTTP_TRANSPORT_TARGET_DEVICE_ID:}' + # To add more targets, use following environment variables: + # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[1].device.id, + # monitoring.transports.http.targets[2].base_url, monitoring.transports.http.targets[2].device.id, etc. + + notification_channels: + slack: + # Enable notifying via Slack + enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}' + # Slack webhook url + webhook_url: '${SLACK_WEBHOOK_URL:}' + + latency: + # Enable latencies reporting + enabled: "${LATENCY_REPORTING_ENABLED:true}" + # Latency threshold for notifying + threshold_ms: '${LATENCY_THRESHOLD:2000}' + # ID of the asset to save latencies to. If not set and latencies reporting is enabled - asset will be found or created automatically + reporting_asset_id: '${LATENCY_REPORTING_ASSET_ID:}' diff --git a/msa/monitoring/docker/Dockerfile b/msa/monitoring/docker/Dockerfile new file mode 100644 index 0000000000..5da050eb91 --- /dev/null +++ b/msa/monitoring/docker/Dockerfile @@ -0,0 +1,30 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM thingsboard/openjdk11:bullseye-slim + +COPY start-tb-monitoring.sh ${pkg.name}.deb /tmp/ + +RUN chmod a+x /tmp/*.sh \ + && mv /tmp/start-tb-monitoring.sh /usr/bin && \ + (yes | dpkg -i /tmp/${pkg.name}.deb) && \ + rm /tmp/${pkg.name}.deb && \ + (systemctl --no-reload disable --now ${pkg.name}.service > /dev/null 2>&1 || :) && \ + chmod 555 ${pkg.installFolder}/bin/${pkg.name}.jar + +USER ${pkg.user} + +CMD ["start-tb-monitoring.sh"] diff --git a/msa/monitoring/docker/start-tb-monitoring.sh b/msa/monitoring/docker/start-tb-monitoring.sh new file mode 100755 index 0000000000..d5bcc52d37 --- /dev/null +++ b/msa/monitoring/docker/start-tb-monitoring.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +CONF_FOLDER=${pkg.installFolder}/conf +jarfile=${pkg.installFolder}/bin/${pkg.name}.jar +configfile=${pkg.name}.conf + +source "${CONF_FOLDER}/${configfile}" + +echo "Starting '${project.name}' ..." + +cd ${pkg.installFolder}/bin + +exec java -cp ${jarfile} $JAVA_OPTS -Dloader.main=org.thingsboard.monitoring.ThingsboardMonitoringApplication \ + -Dspring.jpa.hibernate.ddl-auto=none \ + -Dlogging.config=$CONF_FOLDER/logback.xml \ + org.springframework.boot.loader.PropertiesLauncher diff --git a/msa/monitoring/pom.xml b/msa/monitoring/pom.xml new file mode 100644 index 0000000000..09b492f759 --- /dev/null +++ b/msa/monitoring/pom.xml @@ -0,0 +1,192 @@ + + + + 4.0.0 + + org.thingsboard + 3.5.0-SNAPSHOT + msa + + + org.thingsboard.msa + monitoring + pom + + ThingsBoard Monitoring Microservice + + + UTF-8 + ${basedir}/../.. + tb-monitoring + tb-monitoring + /var/log/${pkg.name} + /usr/share/${pkg.name} + pre-integration-test + + + + + org.thingsboard + monitoring + ${project.version} + deb + deb + provided + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-tb-monitoring-deb + package + + copy + + + + + org.thingsboard + monitoring + deb + deb + ${pkg.name}.deb + ${project.build.directory} + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + copy-docker-config + process-resources + + copy-resources + + + ${project.build.directory} + + + docker + true + + + + + + + + com.spotify + dockerfile-maven-plugin + + + build-docker-image + pre-integration-test + + build + + + ${dockerfile.skip} + ${docker.repo}/${docker.name} + true + false + ${project.build.directory} + + + + tag-docker-image + pre-integration-test + + tag + + + ${dockerfile.skip} + ${docker.repo}/${docker.name} + ${project.version} + + + + + + + + + push-docker-image + + + push-docker-image + + + + + + com.spotify + dockerfile-maven-plugin + + + push-latest-docker-image + pre-integration-test + + push + + + latest + ${docker.repo}/${docker.name} + + + + push-version-docker-image + pre-integration-test + + push + + + ${project.version} + ${docker.repo}/${docker.name} + + + + + + + + + + + jenkins + Jenkins Repository + https://repo.jenkins-ci.org/releases + + false + + + + + \ No newline at end of file diff --git a/msa/pom.xml b/msa/pom.xml index ad40e574ec..6b76f974f1 100644 --- a/msa/pom.xml +++ b/msa/pom.xml @@ -47,6 +47,7 @@ tb-node transport js-executor + monitoring diff --git a/pom.xml b/pom.xml index fb1aafd6b6..52b470a1a4 100755 --- a/pom.xml +++ b/pom.xml @@ -164,6 +164,7 @@ application msa rest-client + monitoring @@ -1084,6 +1085,11 @@ jaxb-runtime ${jaxb-runtime.version} + + org.springframework.boot + spring-boot-starter + ${spring-boot.version} + org.springframework.boot spring-boot-starter-security