diff --git a/monitoring/pom.xml b/monitoring/pom.xml index 59fdd5f1df..5c045df996 100644 --- a/monitoring/pom.xml +++ b/monitoring/pom.xml @@ -24,15 +24,24 @@ 3.4.2-SNAPSHOT thingsboard + monitoring 3.4.2-SNAPSHOT - Monitoring service + ThingsBoard Monitoring Service jar UTF-8 ${basedir}/.. - monitoring-service + java + false + process-resources + package + tb-monitoring + false + ${project.build.directory}/windows + ThingsBoard Monitoring Service + org.thingsboard.monitoring.ThingsboardMonitoringApplication @@ -47,7 +56,7 @@ org.thingsboard rest-client - provided + compile org.springframework.boot @@ -87,6 +96,21 @@ Java-WebSocket compile + + org.seleniumhq.selenium + selenium-java + 4.6.0 + + + com.google.guava + guava + 31.1-jre + + + io.github.bonigarcia + webdrivermanager + 5.3.1 + org.apache.commons commons-lang3 @@ -119,14 +143,42 @@ org.apache.maven.plugins - maven-compiler-plugin + maven-resources-plugin + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-jar-plugin org.springframework.boot spring-boot-maven-plugin - + + org.thingsboard + gradle-maven-plugin + + + org.apache.maven.plugins + maven-assembly-plugin + + + org.apache.maven.plugins + maven-install-plugin + - + + + jenkins + Jenkins Repository + https://repo.jenkins-ci.org/releases + + false + + + diff --git a/monitoring/src/main/java/org/thingsboard/aba/LatencyMsg.java b/monitoring/src/main/java/org/thingsboard/aba/LatencyMsg.java deleted file mode 100644 index 8c49ee8b27..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/LatencyMsg.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.aba; - -import lombok.Data; - -@Data -public class LatencyMsg { - - private long restClientLoginLatency; - private long deviceCredLatency; - private long getDeviceLatency; - - private long wsSubInitLatency; - - private long mqttConnectLatency; - private long mqttSendLatency; - private long mqttTotalLatency; - - private long httpSendLatency; - private long httpTotalLatency; - - private long mqttErrors; - private long mqttReconnects; - - - public boolean hasLongLatency(long threshold) { - return restClientLoginLatency > threshold - || deviceCredLatency > threshold - || getDeviceLatency > threshold - || wsSubInitLatency > threshold - || mqttConnectLatency > threshold - || mqttSendLatency > threshold - || mqttTotalLatency > threshold - || httpSendLatency > threshold - || httpTotalLatency > threshold; - } -} diff --git a/monitoring/src/main/java/org/thingsboard/aba/SaasApi.java b/monitoring/src/main/java/org/thingsboard/aba/SaasApi.java deleted file mode 100644 index 17d196a5aa..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/SaasApi.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.aba; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.ssl.SSLContexts; -import org.apache.http.ssl.TrustStrategy; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpRequest; -import org.springframework.http.client.support.HttpRequestWrapper; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; -import org.springframework.web.client.RestTemplate; -import org.thingsboard.rest.client.RestClient; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.security.DeviceCredentials; - -import java.net.URI; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; - -@Component -@Slf4j -public class SaasApi { - - private static final long wsResponseWaitSec = 300; - - - @Value("${saas.host}") - private String host; - - private AtomicLong counter = new AtomicLong(); - private SampleMqttClient mqttClient; - private AtomicLong mqttErrors = new AtomicLong(); - private AtomicLong mqttReconnects = new AtomicLong(); - - public void checkMqtt(Device device, DeviceCredentials deviceCredentials, RestClient restClient, LatencyMsg latency) { - try { - WsClient wsClient = subscribeToWebSocket(device.getId(), "tsSubCmds", restClient, latency); - - long msgTs = System.currentTimeMillis(); - long submittedValue = counter.incrementAndGet(); - sendMqtt(deviceCredentials, submittedValue, msgTs, latency); - - WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(wsResponseWaitSec); - if(actualLatestTelemetry == null) { - latency.setMqttTotalLatency(-1*wsResponseWaitSec); - } else { - validateWsResponse(actualLatestTelemetry, msgTs, submittedValue); - long responseReadyTs = System.currentTimeMillis(); - latency.setMqttTotalLatency(responseReadyTs - msgTs); - - } - wsClient.closeBlocking(); - latency.setMqttErrors(mqttErrors.getAndSet(0L)); - latency.setMqttReconnects(mqttReconnects.getAndSet(0L)); - } catch (Exception ex) { - throw new IllegalStateException("Could not check mqtt: " + ex.getMessage(), ex); - } - } - - public void checkHttp(Device device, DeviceCredentials deviceCredentials, RestClient restClient, LatencyMsg latency) { - try { - WsClient wsClient = subscribeToWebSocket(device.getId(), "tsSubCmds", restClient, latency); - long msgTs = System.currentTimeMillis(); - long submittedValue = counter.incrementAndGet(); - sendHttp(restClient, deviceCredentials, submittedValue, msgTs, latency); - WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(wsResponseWaitSec); - if(actualLatestTelemetry == null) { - latency.setHttpTotalLatency(-1*wsResponseWaitSec); - } else { - validateWsResponse(actualLatestTelemetry, msgTs, submittedValue); - long responseReadyTs = System.currentTimeMillis(); - latency.setHttpTotalLatency(responseReadyTs - msgTs); - } - wsClient.closeBlocking(); - } catch (Exception ex) { - throw new IllegalStateException("Could not check http: " + ex.getMessage(), ex); - } - } - - - private void validateWsResponse(WsTelemetryResponse response, long expectedTs, long expectedVal) { - try { - List values = response.getDataValuesByKey("checkKey"); - if (CollectionUtils.isEmpty(values)) { - throw new IllegalStateException("Ws response - no data"); - } - long actualTs = Long.parseLong(values.get(0).toString()); - long actualVal = Long.parseLong(values.get(1).toString()); - - if (actualTs != expectedTs) { - throw new IllegalStateException("Ws response - Ts not matched. Actual: " + actualTs + " Expected: " + expectedTs + " Delta: " + (expectedTs - actualTs)); - } - - if (actualVal != expectedVal) { - throw new IllegalStateException("Ws response - Value not matched. Actual: " + actualVal + " Expected: " + expectedVal + " Delta: " + (expectedVal - actualVal)); - } - } catch (Exception ex) { - throw new IllegalStateException("Could not validate WS response: " + response, ex); - } - } - - private void sendHttp(RestClient restClient, DeviceCredentials deviceCredentials, long value, long msgTs, LatencyMsg latency) { - try { - long start = System.currentTimeMillis(); - - RestTemplate restTemplate = new RestTemplate(); - restTemplate.setInterceptors(Collections.singletonList((httpRequest, bytes, clientHttpRequestExecution) -> { - HttpRequest wrapper = new HttpRequestWrapper(httpRequest); - wrapper.getHeaders().set("X-Authorization", "Bearer " + restClient.getToken()); - return clientHttpRequestExecution.execute(wrapper, bytes); - })); - - String payload = createPayload(msgTs, value).toString(); - - restTemplate.postForEntity("https://" + host + "/api/v1/" + deviceCredentials.getCredentialsId() + "/telemetry", payload, String.class); - - latency.setHttpSendLatency(System.currentTimeMillis() - start); - log.info("HTTP msg submitted"); - } catch (Exception ex) { - throw new IllegalStateException("Could not send http: " + ex.getMessage(), ex); - } - } - - private void sendMqtt(DeviceCredentials deviceCredentials, long value, long msgTs, LatencyMsg latency) { - try { - long start = System.currentTimeMillis(); - SampleMqttClient mqttClient = getMqttClient(deviceCredentials, latency); - - JsonObject payload = createPayload(msgTs, value); - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.readTree(payload.toString()); - mqttClient.publishTelemetry(objectMapper.readTree(payload.toString())); - latency.setMqttSendLatency(System.currentTimeMillis() - start); - log.info("Mqtt msg submitted"); - } catch (Exception ex) { - throw new IllegalStateException("Could not send mqtt: " + ex.getMessage(), ex); - } - } - - private WsClient subscribeToWebSocket(DeviceId deviceId, String property, RestClient restClient, LatencyMsg latency) { - try { - long start = System.currentTimeMillis(); - WsClient wsClient = new WsClient(new URI("wss://" + host + "/api/ws/plugins/telemetry?token=" + restClient.getToken())); - SSLContextBuilder builder = SSLContexts.custom(); - builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); - wsClient.setSocketFactory(builder.build().getSocketFactory()); - wsClient.connectBlocking(); - - JsonObject cmdsObject = new JsonObject(); - cmdsObject.addProperty("entityType", EntityType.DEVICE.name()); - cmdsObject.addProperty("entityId", deviceId.toString()); - cmdsObject.addProperty("scope", "LATEST_TELEMETRY"); - cmdsObject.addProperty("cmdId", new Random().nextInt(100)); - - JsonArray cmd = new JsonArray(); - cmd.add(cmdsObject); - JsonObject wsRequest = new JsonObject(); - wsRequest.add(property, cmd); - wsClient.send(wsRequest.toString()); - wsClient.waitForFirstReply(); - latency.setWsSubInitLatency(System.currentTimeMillis() - start); - log.info("Ws subscription created"); - return wsClient; - } catch (Exception ex) { - throw new IllegalStateException("Could not subscribe to WS: " + ex.getMessage(), ex); - } - } - - private SampleMqttClient getMqttClient(DeviceCredentials deviceCredentials, LatencyMsg latency) { - try { - long start = System.currentTimeMillis(); - if (mqttClient == null || !mqttClient.nativeClient.isConnected()) { - if (mqttClient != null) { - mqttReconnects.incrementAndGet(); - try { - mqttClient.disconnect(); - } catch (Exception ex) { - log.error("fail disconnect mqtt", ex); - } - } - String uri = "tcp://" + host + ":1883"; - String deviceTmpName = "health check device"; - String token = deviceCredentials.getCredentialsId(); - mqttClient = new SampleMqttClient(uri, deviceTmpName, token, mqttErrors); - boolean connected = mqttClient.connect(); - if (!connected) { - throw new IllegalStateException("Could not connect mqtt nativeClient"); - } - } - latency.setMqttConnectLatency(System.currentTimeMillis() - start); - return mqttClient; - } catch (Exception ex) { - throw new IllegalStateException("Could not create mqtt nativeClient: " + ex.getMessage(), ex); - } - } - - private static JsonObject createPayload(long ts, long val) { - JsonObject values = createPayload(val); - JsonObject payload = new JsonObject(); - payload.addProperty("ts", ts); - payload.add("values", values); - return payload; - } - - private static JsonObject createPayload(long val) { - JsonObject values = new JsonObject(); - values.addProperty("checkKey", val); - - return values; - } -} diff --git a/monitoring/src/main/java/org/thingsboard/aba/SaasHealthChecker.java b/monitoring/src/main/java/org/thingsboard/aba/SaasHealthChecker.java deleted file mode 100644 index 39f249aa4a..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/SaasHealthChecker.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.aba; - -import com.google.common.collect.Lists; -import com.google.gson.JsonObject; -import lombok.extern.slf4j.Slf4j; -import org.crawler.license.service.NotifyService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpRequest; -import org.springframework.http.client.support.HttpRequestWrapper; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; -import org.thingsboard.rest.client.RestClient; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.security.DeviceCredentials; - -import javax.annotation.PostConstruct; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -@Component -@Slf4j -public class SaasHealthChecker { - - // private String url = ""; - private String username = "paromskiy@gmail.com"; - private String pass = "123test123"; - private String deviceName = "healthcheckDevice"; - private String dashboardId = ""; - - @Value("${saas.host}") - private String host; - - @Value("${saas.check.interval.sec}") - private long refreshIntervalSec; - - @Value("${saas.notifyThreshold.ms}") - private long notifyThreshold; - - @Autowired - private NotifyService notifyService; - - @Autowired - private SaasApi saasApi; - - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private AtomicLong counter = new AtomicLong(); - private long logStatsIntervalMs = TimeUnit.HOURS.toMillis(12); - private long lastLogStatsTime = System.currentTimeMillis(); - private long lastLatencyNotification = 0; - private boolean inFailedState = false; - - private List latestLatencies = Lists.newArrayList(); - private long failsCount = 0; - - @PostConstruct - public void init() throws URISyntaxException { - check(); - executor.scheduleWithFixedDelay(() -> { - try { - log.info("Start saas healthcheck"); - check(); - counter.incrementAndGet(); - log.info("Finish saas healthcheck"); -// if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) == 1) { -// notifyService.sendInSlackSaasHealth("I AM ALIVE. Checks performed: " + counter.get()); -// } - if(System.currentTimeMillis() - lastLogStatsTime > logStatsIntervalMs) { - lastLogStatsTime = System.currentTimeMillis(); - logIntervalStats(); - notifyService.sendInSlackSaasHealth("I AM ALIVE. Checks performed: " + counter.get()); - } - } catch (Exception ex) { - log.error("Scheduled error", ex); - } - - }, refreshIntervalSec, refreshIntervalSec, TimeUnit.SECONDS); - } - - private void check() throws URISyntaxException { - try { - // load dashboard - LatencyMsg latency = new LatencyMsg(); - RestClient restClient = buildClient(latency); - Device device = getDevice(restClient, latency); - DeviceCredentials deviceCred = getDeviceCred(restClient, device, latency); - - - saasApi.checkMqtt(device, deviceCred, restClient, latency); - saasApi.checkHttp(device, deviceCred, restClient, latency); - - - processLatencyObj(latency, restClient, deviceCred); - latestLatencies.add(latency); - if (inFailedState) { - inFailedState = false; - notifyService.sendInSlackSaasHealth("service restored"); - } - } catch (Exception ex) { - failsCount++; - log.error("Error while check Saas health", ex); - if (!inFailedState || System.currentTimeMillis() - lastLatencyNotification > TimeUnit.MINUTES.toHours(15)) { - String msg = "########################## \n ERROR \n " + ex.getMessage(); - lastLatencyNotification = System.currentTimeMillis(); - inFailedState = true; - notifyService.sendInSlackSaasHealth(msg); - } - } - } - - private void processLatencyObj(LatencyMsg latency, RestClient restClient, DeviceCredentials credentials) throws URISyntaxException { - System.out.println(); - System.out.println(latency); - System.out.println(); - - - JsonObject values = new JsonObject(); - values.addProperty("restClientLoginLatency", latency.getRestClientLoginLatency()); - values.addProperty("deviceCredLatency", latency.getDeviceCredLatency()); - values.addProperty("getDeviceLatency", latency.getGetDeviceLatency()); - values.addProperty("wsSubInitLatency", latency.getWsSubInitLatency()); - values.addProperty("mqttConnectLatency", latency.getMqttConnectLatency()); - values.addProperty("mqttSendLatency", latency.getMqttSendLatency()); - values.addProperty("mqttTotalLatency", latency.getMqttTotalLatency()); - values.addProperty("httpSendLatency", latency.getHttpSendLatency()); - values.addProperty("httpTotalLatency", latency.getHttpTotalLatency()); - values.addProperty("mqttErrors", latency.getMqttErrors()); - values.addProperty("mqttReconnects", latency.getMqttReconnects()); - - JsonObject payload = new JsonObject(); - payload.addProperty("ts", System.currentTimeMillis()); - payload.add("values", values); - - RestTemplate restTemplate = new RestTemplate(); - restTemplate.setInterceptors(Collections.singletonList((httpRequest, bytes, clientHttpRequestExecution) -> { - HttpRequest wrapper = new HttpRequestWrapper(httpRequest); - wrapper.getHeaders().set("X-Authorization", "Bearer " + restClient.getToken()); - return clientHttpRequestExecution.execute(wrapper, bytes); - })); - - - restTemplate.postForEntity("https://" + host + "/api/v1/" + credentials.getCredentialsId() + "/telemetry", payload.toString(), String.class); - - if (latency.hasLongLatency(notifyThreshold) && System.currentTimeMillis() - lastLatencyNotification > TimeUnit.MINUTES.toHours(60)) { - lastLatencyNotification = System.currentTimeMillis(); - notifyService.sendInSlackSaasHealth("Some SaaS latencies are upper threshold [" + notifyThreshold + "ms] \n " + latency); - } - } - - private Device getDevice(RestClient restClient, LatencyMsg latencyMsg) { - long start = System.currentTimeMillis(); - Optional tenantDevice = restClient.getTenantDevice(deviceName); - Device device = tenantDevice.orElseThrow(() -> new IllegalStateException("Device [" + deviceName + "] was not found")); - latencyMsg.setGetDeviceLatency(System.currentTimeMillis() - start); - log.info("Device loaded"); - return device; - } - - private DeviceCredentials getDeviceCred(RestClient client, Device device, LatencyMsg latencyMsg) { - long start = System.currentTimeMillis(); - Optional credentialsOptional = client.getDeviceCredentialsByDeviceId(device.getId()); - DeviceCredentials credentials = credentialsOptional.orElseThrow(() -> new IllegalStateException("Could not load device credentials")); - latencyMsg.setDeviceCredLatency(System.currentTimeMillis() - start); - log.info("Device cred loaded"); - return credentials; - } - - private RestClient buildClient(LatencyMsg latencyMsg) { - try { - long start = System.currentTimeMillis(); - RestClient client = new RestClient("https://" + host); - client.login(username, pass); - latencyMsg.setRestClientLoginLatency(System.currentTimeMillis() - start); - log.info("Rest client ready"); - return client; - } catch (Exception ex) { - throw new IllegalStateException("Could not login: " + ex.getMessage(), ex); - } - } - - private void logIntervalStats() throws URISyntaxException { - List httpTotal = latestLatencies.stream().map(LatencyMsg::getHttpTotalLatency).collect(Collectors.toList()); - List login = latestLatencies.stream().map(LatencyMsg::getRestClientLoginLatency).collect(Collectors.toList()); - List getDevice = latestLatencies.stream().map(LatencyMsg::getDeviceCredLatency).collect(Collectors.toList()); - long mqttErrorsCnt = latestLatencies.stream().mapToLong(LatencyMsg::getMqttErrors).sum(); - long mqttReconectCnt = latestLatencies.stream().mapToLong(LatencyMsg::getMqttReconnects).sum(); - String msg = "[Interval stats] Fails count: " + failsCount + " ok count: " + latestLatencies.size() + "\n"; - if (latestLatencies.size() > 0) { - msg += "\t http send -receive dalay: " + toPercentile(httpTotal) + "\n"; - msg += "\t http login: " + toPercentile(login) + "\n"; - msg += "\t get device: " + toPercentile(getDevice) + "\n"; - msg += "\t mqtt errors: " + mqttErrorsCnt + "\n"; - msg += "\t mqtt reconnects: " + mqttReconectCnt + "\n"; - } - failsCount = 0; - latestLatencies.clear(); - - log.info("Interval stats: {}" , msg); - notifyService.sendInSlackSaasHealth(msg); - } - - private String toPercentile(List values) { - return "75%: " + percentile(values, 75) + "ms; \t90%:" + percentile(values, 90) + "ms; \t95%:" + percentile(values, 95) + "ms; \tmin: " + percentile(values, 1) + "ms; \tmax: " + percentile(values, 99.99); - } - - private static Long percentile(List values, double percentile) { - Collections.sort(values); - int index = (int) Math.ceil(percentile / 100.0 * values.size()); - return (long) ((Math.round(values.get(index - 1) * 100)) / 100d); - } -} diff --git a/monitoring/src/main/java/org/thingsboard/aba/SampleMqttClient.java b/monitoring/src/main/java/org/thingsboard/aba/SampleMqttClient.java deleted file mode 100644 index e0dbc398c1..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/SampleMqttClient.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.aba; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttActionListener; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicLong; - -@Slf4j -public class SampleMqttClient { - - public static final ObjectMapper MAPPER = new ObjectMapper(); - - @Getter - private final String deviceToken; - @Getter - private final String deviceName; - @Getter - private final String clientId; - private final MqttClientPersistence persistence; - public final MqttAsyncClient nativeClient; - public final AtomicLong failCount; - - - - public SampleMqttClient(String uri, String deviceName, String deviceToken, AtomicLong failCount) throws Exception { - this.clientId = MqttAsyncClient.generateClientId(); - this.deviceToken = deviceToken; - this.deviceName = deviceName; - this.failCount = failCount; - this.persistence = new MemoryPersistence(); - this.nativeClient = new MqttAsyncClient(uri, clientId, persistence); - } - - public boolean connect() throws Exception { - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(deviceToken); - try { - nativeClient.connect(options, null, new IMqttActionListener() { - @Override - public void onSuccess(IMqttToken iMqttToken) { - log.info("[{}] connected to Thingsboard!", deviceName); - } - - @Override - public void onFailure(IMqttToken iMqttToken, Throwable e) { - failCount.incrementAndGet(); - log.error("[{}] failed to connect to Thingsboard!", deviceName, e); - } - }).waitForCompletion(); - } catch (MqttException e) { - log.error("Failed to connect to the server", e); - } - return nativeClient.isConnected(); - } - - public void disconnect() throws Exception { - nativeClient.disconnect().waitForCompletion(); - } - - public void publishAttributes(JsonNode data) throws Exception { - publish("v1/devices/me/attributes", data, true); - } - - public void publishTelemetry(JsonNode data) throws Exception { - publish("v1/devices/me/telemetry", data, false); - } - - private void publish(String topic, JsonNode data, boolean sync) throws Exception { - MqttMessage msg = new MqttMessage(MAPPER.writeValueAsString(data).getBytes(StandardCharsets.UTF_8)); - IMqttDeliveryToken deliveryToken = nativeClient.publish(topic, msg, null, new IMqttActionListener() { - @Override - public void onSuccess(IMqttToken asyncActionToken) { - log.trace("Data updated!"); - } - - @Override - public void onFailure(IMqttToken asyncActionToken, Throwable exception) { - log.error("[{}] Data update failed!", deviceName, exception); - } - }); - if (sync) { - deliveryToken.waitForCompletion(); - } - } -} diff --git a/monitoring/src/main/java/org/thingsboard/aba/WsClient.java b/monitoring/src/main/java/org/thingsboard/aba/WsClient.java deleted file mode 100644 index fdbedc2eee..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/WsClient.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.aba; - -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; - -import javax.net.ssl.SSLParameters; -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -@Slf4j -public class WsClient extends WebSocketClient { - - private static final ObjectMapper mapper = new ObjectMapper(); - private WsTelemetryResponse message; - - private volatile boolean firstReplyReceived; - private CountDownLatch firstReply = new CountDownLatch(1); - private CountDownLatch latch = new CountDownLatch(1); - - WsClient(URI serverUri) { - super(serverUri); - } - - @Override - public void onOpen(ServerHandshake serverHandshake) { - } - - @Override - public void onMessage(String message) { - log.info("Ws on message {}", message); - if (!firstReplyReceived) { - firstReplyReceived = true; - firstReply.countDown(); - } else { - try { - WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class); - response.setArriveTs(System.currentTimeMillis()); - if (!response.getData().isEmpty()) { - this.message = response; - latch.countDown(); - } - } catch (IOException e) { - log.error("ws message can't be read"); - } - } - } - - @Override - public void onClose(int code, String reason, boolean remote) { - log.info("ws is closed, due to [{}]", reason); - } - - @Override - public void onError(Exception ex) { - ex.printStackTrace(); - } - - - public WsTelemetryResponse getLastMessage(long timeoutSec) { - try { - latch.await(timeoutSec, TimeUnit.SECONDS); - log.info("Ws response received"); - return this.message; - } catch (InterruptedException e) { - log.error("Timeout, ws message wasn't received"); - } - return null; - } - - void waitForFirstReply() { - try { - firstReply.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.error("Timeout, ws message wasn't received"); - throw new RuntimeException(e); - } - } - - @Override - protected void onSetSSLParameters(SSLParameters sslParameters) { - sslParameters.setEndpointIdentificationAlgorithm(null); - } -} diff --git a/monitoring/src/main/java/org/thingsboard/aba/WsTelemetryResponse.java b/monitoring/src/main/java/org/thingsboard/aba/WsTelemetryResponse.java deleted file mode 100644 index a122a8c2aa..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/WsTelemetryResponse.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.aba; - -import lombok.Data; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -@Data -public class WsTelemetryResponse { - - private int subscriptionId; - private int errorCode; - private String errorMsg; - private Map>> data; - private Map latestValues; - private long arriveTs; - - public List getDataValuesByKey(String key) { - return data.entrySet().stream() - .filter(e -> e.getKey().equals(key)) - .flatMap(e -> e.getValue().stream().flatMap(Collection::stream)) - .collect(Collectors.toList()); - } -} diff --git a/monitoring/src/main/java/org/thingsboard/aba/application.yml b/monitoring/src/main/java/org/thingsboard/aba/application.yml deleted file mode 100644 index 0e8ccbf6cc..0000000000 --- a/monitoring/src/main/java/org/thingsboard/aba/application.yml +++ /dev/null @@ -1,28 +0,0 @@ -# -# Copyright © 2016-2022 The Thingsboard Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -server.address: "${HTTP_BIND_ADDRESS:0.0.0.0}" -server.port: "${HTTP_BIND_PORT:9712}" - -server.compression.enabled: true -server.compression.mime-types: "text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json" -server.compression.min-response-size: 1024 - - -saas.host: "${SAAS_CHECK_HOST:thingsboard.cloud}" -saas.check.interval.sec: "${SAAS_CHECK_INTERVAL_SEC:60}" -saas.slack.token: "${SAAS_SLACK_TOKEN:T2QD8CLUS/B02TP4E0Y02/RenftHjCFYdVlOU8OiPGOhJV}" -saas.notifyThreshold.ms: "${SAAS_NOTIFY_THRESHOLD_MS:28000}" diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java index b177614aba..cbfba4da51 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java @@ -15,7 +15,9 @@ */ package org.thingsboard.monitoring; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -23,8 +25,19 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.DeviceConfig; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; import org.thingsboard.monitoring.config.service.TransportMonitoringServiceConfig; +import org.thingsboard.monitoring.service.TransportMonitoringService; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; +import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; +import org.thingsboard.server.common.data.device.data.DeviceData; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -33,6 +46,7 @@ import java.util.concurrent.ScheduledExecutorService; @SpringBootApplication @EnableScheduling +@Slf4j public class ThingsboardMonitoringApplication { public static void main(String[] args) { @@ -42,22 +56,55 @@ public class ThingsboardMonitoringApplication { } @Bean - public ApplicationRunner initMonitoringServices(List configs, ApplicationContext context) { + public ApplicationRunner initAndStartMonitoringServices(List configs, TbClient tbClient, ApplicationContext context) { return args -> { + List> monitoringServices = new LinkedList<>(); configs.forEach(config -> { config.getTargets().stream() .filter(target -> StringUtils.isNotBlank(target.getBaseUrl())) + .peek(target -> checkMonitoringTarget(config, target, tbClient)) .forEach(target -> { - context.getBean(config.getTransportType().getMonitoringServiceClass(), config, target); + TransportMonitoringService monitoringService = context.getBean(config.getTransportType().getMonitoringServiceClass(), config, target); + monitoringServices.add(monitoringService); }); }); + monitoringServices.forEach(TransportMonitoringService::startMonitoring); }; } + private void checkMonitoringTarget(TransportMonitoringServiceConfig config, MonitoringTargetConfig target, TbClient tbClient) { + DeviceConfig deviceConfig = target.getDevice(); + tbClient.logIn(); + + DeviceId deviceId; + if (deviceConfig == null || deviceConfig.getId() == null) { + String deviceName = String.format("[%s] Monitoring device (%s)", config.getTransportType(), target.getBaseUrl()); + Device device = tbClient.getTenantDevice(deviceName) + .orElseGet(() -> { + log.info("Creating new device '{}'", deviceName); + Device monitoringDevice = new Device(); + monitoringDevice.setName(deviceName); + monitoringDevice.setType("default"); + DeviceData deviceData = new DeviceData(); + deviceData.setConfiguration(new DefaultDeviceConfiguration()); + deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); + return tbClient.saveDevice(monitoringDevice); + }); + deviceId = device.getId(); + target.getDevice().setId(deviceId.toString()); + } else { + deviceId = new DeviceId(deviceConfig.getId()); + } + + log.debug("Loading credentials for device {}", deviceId); + DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(deviceId) + .orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + deviceId)); + target.getDevice().setCredentials(credentials); + } + @Bean - public ScheduledExecutorService monitoringExecutor(List configs) { - int targetsCount = configs.stream().mapToInt(config -> config.getTargets().size()).sum(); - return Executors.newScheduledThreadPool(targetsCount, ThingsBoardThreadFactory.forName("monitoring-executor")); + public ScheduledExecutorService monitoringExecutor(@Value("${monitoring.monitoring_thread_pool_size}") int threadPoolSize) { + return Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("monitoring-executor")); } @Bean diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/Lwm2mClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/Lwm2mClient.java deleted file mode 100644 index e608bbc28a..0000000000 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/Lwm2mClient.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.monitoring.client; - -import lombok.Getter; -import lombok.Setter; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.eclipse.californium.core.network.CoapEndpoint; -import org.eclipse.californium.core.network.config.NetworkConfig; -import org.eclipse.californium.core.observe.ObservationStore; -import org.eclipse.californium.scandium.DTLSConnector; -import org.eclipse.californium.scandium.config.DtlsConnectorConfig; -import org.eclipse.leshan.client.californium.LeshanClient; -import org.eclipse.leshan.client.californium.LeshanClientBuilder; -import org.eclipse.leshan.client.engine.DefaultRegistrationEngineFactory; -import org.eclipse.leshan.client.object.Security; -import org.eclipse.leshan.client.object.Server; -import org.eclipse.leshan.client.resource.BaseInstanceEnabler; -import org.eclipse.leshan.client.resource.DummyInstanceEnabler; -import org.eclipse.leshan.client.resource.ObjectsInitializer; -import org.eclipse.leshan.client.servers.ServerIdentity; -import org.eclipse.leshan.core.californium.EndpointFactory; -import org.eclipse.leshan.core.model.InvalidDDFFileException; -import org.eclipse.leshan.core.model.LwM2mModel; -import org.eclipse.leshan.core.model.ObjectLoader; -import org.eclipse.leshan.core.model.ObjectModel; -import org.eclipse.leshan.core.model.StaticModel; -import org.eclipse.leshan.core.node.codec.DefaultLwM2mDecoder; -import org.eclipse.leshan.core.node.codec.DefaultLwM2mEncoder; -import org.eclipse.leshan.core.response.ReadResponse; - -import javax.security.auth.Destroyable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import static org.eclipse.leshan.client.object.Security.noSec; -import static org.eclipse.leshan.core.LwM2mId.ACCESS_CONTROL; -import static org.eclipse.leshan.core.LwM2mId.DEVICE; -import static org.eclipse.leshan.core.LwM2mId.SECURITY; -import static org.eclipse.leshan.core.LwM2mId.SERVER; - -@Slf4j -public class Lwm2mClient extends BaseInstanceEnabler implements Destroyable { - - @Getter - @Setter - private String name; - - @Getter - @Setter - private LeshanClient leshanClient; - - private List models; - private Security security; - private NetworkConfig coapConfig; - - private static final List supportedResources = Collections.singletonList(2); - - private String data = UUID.randomUUID().toString(); - - private String serverUri; - private String endpoint; - - public Lwm2mClient(String serverUri, String endpoint) { - this.serverUri = serverUri; - this.endpoint = endpoint; - } - - public Lwm2mClient() { - } - - public void initClient() throws InvalidDDFFileException, IOException { - String[] resources = new String[]{"0.xml", "1.xml", "2.xml", "3.xml"}; - models = new ArrayList<>(); - for (String resourceName : resources) { - models.addAll(ObjectLoader.loadDdfFile(getClass().getClassLoader().getResourceAsStream("lwm2m/" + resourceName), resourceName)); - } - - security = noSec(serverUri, 123); - coapConfig = new NetworkConfig().setString("COAP_PORT", StringUtils.substringAfterLast(serverUri, ":")); - - - setName(endpoint); - - LeshanClient leshanClient; - - LwM2mModel model = new StaticModel(models); - ObjectsInitializer initializer = new ObjectsInitializer(model); - initializer.setInstancesForObject(SECURITY, security); - initializer.setInstancesForObject(SERVER, new Server(123, 300)); - initializer.setInstancesForObject(DEVICE, this); - initializer.setClassForObject(ACCESS_CONTROL, DummyInstanceEnabler.class); - DtlsConnectorConfig.Builder dtlsConfig = new DtlsConnectorConfig.Builder(); - dtlsConfig.setRecommendedCipherSuitesOnly(true); - dtlsConfig.setClientOnly(); - - DefaultRegistrationEngineFactory engineFactory = new DefaultRegistrationEngineFactory(); - engineFactory.setReconnectOnUpdate(false); - engineFactory.setResumeOnConnect(true); - - EndpointFactory endpointFactory = new EndpointFactory() { - - @Override - public CoapEndpoint createUnsecuredEndpoint(InetSocketAddress address, NetworkConfig coapConfig, - ObservationStore store) { - CoapEndpoint.Builder builder = new CoapEndpoint.Builder(); - builder.setInetSocketAddress(address); - builder.setNetworkConfig(coapConfig); - return builder.build(); - } - - @Override - public CoapEndpoint createSecuredEndpoint(DtlsConnectorConfig dtlsConfig, NetworkConfig coapConfig, - ObservationStore store) { - CoapEndpoint.Builder builder = new CoapEndpoint.Builder(); - DtlsConnectorConfig.Builder dtlsConfigBuilder = new DtlsConnectorConfig.Builder(dtlsConfig); - builder.setConnector(new DTLSConnector(dtlsConfigBuilder.build())); - builder.setNetworkConfig(coapConfig); - return builder.build(); - } - }; - - LeshanClientBuilder builder = new LeshanClientBuilder(endpoint); - builder.setObjects(initializer.createAll()); - builder.setCoapConfig(coapConfig); - builder.setDtlsConfig(dtlsConfig); - builder.setRegistrationEngineFactory(engineFactory); - builder.setEndpointFactory(endpointFactory); - builder.setDecoder(new DefaultLwM2mDecoder(false)); - builder.setEncoder(new DefaultLwM2mEncoder(false)); - leshanClient = builder.build(); - - setLeshanClient(leshanClient); - - leshanClient.start(); - } - - @Override - public ReadResponse read(ServerIdentity identity, int resourceId) { - if (resourceId == 2) { - return ReadResponse.success(resourceId, data); - } - return ReadResponse.notFound(); - } - - @Override - public List getAvailableResourceIds(ObjectModel model) { - return supportedResources; - } - - @SneakyThrows - public void send(String data) { - this.data = data; - fireResourcesChange(2); - } - - @Override - public void destroy() { - if (leshanClient != null) { - leshanClient.destroy(true); - } - } -} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java new file mode 100644 index 0000000000..ea5ac53c63 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.thingsboard.rest.client.RestClient; + +import java.time.Duration; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class TbClient extends RestClient { + + @Value("${monitoring.auth.username}") + private String username; + @Value("${monitoring.auth.password}") + private String password; + + public TbClient(@Value("${monitoring.auth.base_url}") String baseUrl, + @Value("${monitoring.rest_request_timeout_ms}") int requestTimeoutMs) { + super(new RestTemplateBuilder() + .setConnectTimeout(Duration.ofMillis(requestTimeoutMs)) + .setReadTimeout(Duration.ofMillis(requestTimeoutMs)) + .build(), baseUrl); + } + + public String logIn() { + login(username, password); + return getToken(); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClientFactory.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClientFactory.java deleted file mode 100644 index 1a723e2db8..0000000000 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClientFactory.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.thingsboard.monitoring.client; - -import org.springframework.beans.factory.annotation.Lookup; -import org.springframework.stereotype.Component; - -@Component -public class TbClientFactory { - - @Lookup - public TbRestClient createClient() { - return null; - } - -} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbRestClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbRestClient.java deleted file mode 100644 index 17df7d7801..0000000000 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbRestClient.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.thingsboard.monitoring.client; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.boot.web.client.RestTemplateBuilder; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; -import org.thingsboard.monitoring.config.TransportType; -import org.thingsboard.monitoring.data.TransportInfo; -import org.thingsboard.rest.client.RestClient; -import org.thingsboard.server.common.data.Device; - -import java.time.Duration; - -@Component -@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -public class TbRestClient extends RestClient { - - @Value("${monitoring.auth.username}") - private String username; - @Value("${monitoring.auth.password}") - private String password; - - public TbRestClient(@Value("${monitoring.auth.base_url}") String baseUrl) { - super(new RestTemplateBuilder() - .setConnectTimeout(Duration.ofSeconds(5)) - .setReadTimeout(Duration.ofSeconds(2)) - .build(), baseUrl); - } - - public String logIn() { - login(username, password); - return getToken(); - } - - public Device createDeviceForMonitoringIfNotExists(TransportInfo transportInfo, String deviceName) { -// getTenantDevice(name) -// .orElseGet(() -> { -// Device device = -// }) - } - -} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java index d8e2cbdfec..1c33da0338 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java @@ -15,8 +15,10 @@ */ package org.thingsboard.monitoring.client; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.thingsboard.common.util.JacksonUtil; @@ -24,6 +26,7 @@ import org.thingsboard.monitoring.data.cmd.CmdsWrapper; import org.thingsboard.monitoring.data.cmd.TimeseriesSubscriptionCmd; import org.thingsboard.monitoring.data.cmd.TimeseriesUpdate; +import javax.net.ssl.SSLParameters; import java.net.URI; import java.nio.channels.NotYetConnectedException; import java.util.List; @@ -92,10 +95,10 @@ public class WsClient extends WebSocketClient { send(JacksonUtil.toString(wrapper)); } - public String waitForUpdate(long ms) { + public JsonNode waitForUpdate(long ms) { try { if (update.await(ms, TimeUnit.MILLISECONDS)) { - return lastMsg; + return getLastMsg(); } } catch (InterruptedException e) { log.debug("Failed to await reply", e); @@ -103,10 +106,10 @@ public class WsClient extends WebSocketClient { return null; } - public String waitForReply(int ms) { + public JsonNode waitForReply(int ms) { try { if (reply.await(ms, TimeUnit.MILLISECONDS)) { - return lastMsg; + return getLastMsg(); } } catch (InterruptedException e) { log.debug("Failed to await reply", e); @@ -114,10 +117,30 @@ public class WsClient extends WebSocketClient { return null; } + public JsonNode getLastMsg() { + JsonNode msg = JacksonUtil.toJsonNode(lastMsg); + if (msg != null) { + JsonNode errorMsg = msg.get("errorMsg"); + if (errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText())) { + throw new RuntimeException("WS error from server: " + errorMsg.asText()); + } else { + return msg; + } + } else { + return null; + } + } + public Object getTelemetryKeyUpdate(String key) { - if (lastMsg == null) return null; - TimeseriesUpdate update = JacksonUtil.fromString(lastMsg, TimeseriesUpdate.class); + JsonNode lastMsg = getLastMsg(); + if (lastMsg == null || lastMsg.isNull()) return null; + TimeseriesUpdate update = JacksonUtil.treeToValue(lastMsg, TimeseriesUpdate.class); return update.getLatest(key); } + @Override + protected void onSetSSLParameters(SSLParameters sslParameters) { + sslParameters.setEndpointIdentificationAlgorithm(null); + } + } \ No newline at end of file diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java new file mode 100644 index 0000000000..1ed9fb8a24 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClientFactory.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import lombok.RequiredArgsConstructor; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.http.ssl.TrustStrategy; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.WsConfig; +import org.thingsboard.monitoring.data.Latencies; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.monitoring.util.TbStopWatch; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +@Component +@RequiredArgsConstructor +public class WsClientFactory { + + private final WsConfig wsConfig; + private final MonitoringReporter monitoringReporter; + private final TbStopWatch stopWatch; + + public WsClient createClient(String accessToken) throws Exception { + URI uri = new URI(wsConfig.getBaseUrl() + "/api/ws/plugins/telemetry?token=" + accessToken); + stopWatch.start(); + WsClient wsClient = new WsClient(uri); + if (wsConfig.getBaseUrl().startsWith("wss")) { + SSLContextBuilder builder = SSLContexts.custom(); + builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); + wsClient.setSocketFactory(builder.build().getSocketFactory()); + } + boolean connected = wsClient.connectBlocking(wsConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); + if (!connected) { + throw new IllegalStateException("Failed to establish WS session"); + } + monitoringReporter.reportLatency(Latencies.WS_CONNECT, stopWatch.getTime()); + return wsClient; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java index 1a16094dd9..15517b5f5b 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java @@ -17,13 +17,15 @@ package org.thingsboard.monitoring.config; import lombok.Data; import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.security.DeviceCredentials; import java.util.UUID; @Data public class DeviceConfig { + private UUID id; - private String accessToken; + private DeviceCredentials credentials; public void setId(String id) { this.id = StringUtils.isNotEmpty(id) ? UUID.fromString(id) : null; diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java index afbe62bb05..4530c5211d 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java @@ -23,8 +23,4 @@ public class MonitoringTargetConfig { private String baseUrl; private DeviceConfig device; - @Override - public String toString() { - return "Monitoring target [base url: '" + baseUrl + "', device: " + device.getId() + "]"; - } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java index fc93c6d71e..49a9cda010 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java @@ -20,7 +20,6 @@ import lombok.Getter; import org.thingsboard.monitoring.service.TransportMonitoringService; import org.thingsboard.monitoring.service.impl.CoapTransportMonitoringService; import org.thingsboard.monitoring.service.impl.HttpTransportMonitoringService; -import org.thingsboard.monitoring.service.impl.Lwm2mTransportMonitoringService; import org.thingsboard.monitoring.service.impl.MqttTransportMonitoringService; @AllArgsConstructor @@ -28,7 +27,6 @@ import org.thingsboard.monitoring.service.impl.MqttTransportMonitoringService; public enum TransportType { MQTT(MqttTransportMonitoringService.class), COAP(CoapTransportMonitoringService.class), - LWM2M(Lwm2mTransportMonitoringService.class), HTTP(HttpTransportMonitoringService.class); private final Class> monitoringServiceClass; diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java index d4efe64091..4dcca5ea6c 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.monitoring.config; import lombok.Getter; diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java index ad58430892..9218df12ee 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java @@ -19,9 +19,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.thingsboard.monitoring.config.TransportType; -import org.thingsboard.rest.client.RestClient; - -import javax.annotation.PostConstruct; @Component @ConditionalOnProperty(name = "monitoring.transports.coap.enabled", havingValue = "true") diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java index 49ec69191b..90f7d86957 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java @@ -24,8 +24,10 @@ import org.thingsboard.monitoring.config.TransportType; @ConditionalOnProperty(name = "monitoring.transports.http.enabled", havingValue = "true") @ConfigurationProperties(prefix = "monitoring.transports.http") public class HttpTransportMonitoringServiceConfig extends TransportMonitoringServiceConfig { + @Override public TransportType getTransportType() { return TransportType.HTTP; } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java index 348be0146d..b2e58510ff 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java @@ -16,6 +16,7 @@ package org.thingsboard.monitoring.config.service; import lombok.Data; +import lombok.EqualsAndHashCode; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @@ -25,7 +26,9 @@ import org.thingsboard.monitoring.config.TransportType; @ConditionalOnProperty(name = "monitoring.transports.mqtt.enabled", havingValue = "true") @ConfigurationProperties(prefix = "monitoring.transports.mqtt") @Data +@EqualsAndHashCode(callSuper = true) public class MqttTransportMonitoringServiceConfig extends TransportMonitoringServiceConfig { + private Integer qos; @Override diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java index 7cdbb78ad1..16c4bd1473 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java @@ -16,7 +16,6 @@ package org.thingsboard.monitoring.config.service; import lombok.Data; -import org.springframework.beans.factory.annotation.Value; import org.thingsboard.monitoring.config.MonitoringTargetConfig; import org.thingsboard.monitoring.config.TransportType; @@ -24,11 +23,10 @@ import java.util.List; @Data public abstract class TransportMonitoringServiceConfig { + private int monitoringRateMs; private int requestTimeoutMs; private int initialDelayMs; - @Value("${monitoring.failure_threshold}") - private int failureThreshold; private List targets; diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringFailureNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java similarity index 54% rename from monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringFailureNotificationInfo.java rename to monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index 2c252266cc..1c380ae9a3 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringFailureNotificationInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -13,22 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.monitoring.data.notification; +package org.thingsboard.monitoring.data; -import lombok.Getter; -import org.thingsboard.monitoring.data.TransportInfo; +import org.thingsboard.monitoring.config.TransportType; -@Getter -public class MonitoringFailureNotificationInfo extends NotificationInfo { - private final Exception error; +public class Latencies { - public MonitoringFailureNotificationInfo(TransportInfo transportInfo, Exception error) { - super(transportInfo); - this.error = error; - } + public static final String WS_UPDATE = "wsUpdate"; + public static final String WS_CONNECT = "wsConnect"; + public static final String LOG_IN = "logIn"; + public static final String WEB_UI_LOAD = "webUiLoad"; - @Override - public NotificationType getType() { - return NotificationType.MONITORING_FAILURE; + public static String transportRequest(TransportType transportType) { + int a = 2; + return String.format("%sTransportRequest", transportType.name().toLowerCase()); } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java new file mode 100644 index 0000000000..acfa49b1fc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +import com.google.common.util.concurrent.AtomicDouble; +import lombok.RequiredArgsConstructor; + +import java.util.concurrent.atomic.AtomicInteger; + +@RequiredArgsConstructor +public class Latency { + + private final String key; + private final AtomicDouble latencySum = new AtomicDouble(); + private final AtomicInteger counter = new AtomicInteger(); + + public synchronized void report(double latencyInMs) { + latencySum.addAndGet(latencyInMs); + counter.incrementAndGet(); + } + + public synchronized double getAvg() { + return latencySum.get() / counter.get(); + } + + public synchronized void reset() { + latencySum.set(0.0); + counter.set(0); + } + + public String getKey() { + return key; + } + + @Override + public String toString() { + return "Latency{" + + "key='" + key + '\'' + + ", avgLatency=" + getAvg() + + '}'; + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/MonitoringServiceApplicationTests.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java similarity index 72% rename from monitoring/src/test/java/org/thingsboard/monitoring/MonitoringServiceApplicationTests.java rename to monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java index 02f30518f6..f2227978ae 100644 --- a/monitoring/src/test/java/org/thingsboard/monitoring/MonitoringServiceApplicationTests.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java @@ -13,16 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.monitoring; +package org.thingsboard.monitoring.data; -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; +public class MonitoredServiceKey { -@SpringBootTest -class MonitoringServiceApplicationTests { - - @Test - void contextLoads() { - } + public static final String GENERAL = "Monitoring"; + public static final String WEB_UI = "Web UI"; } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoringStats.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoringStats.java deleted file mode 100644 index e3e94c05dc..0000000000 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoringStats.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.thingsboard.monitoring.data; - -public class MonitoringStats { -} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java similarity index 67% rename from monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationInfo.java rename to monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java index 99d47f1345..c9160fcffc 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportFailureException.java @@ -13,15 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.monitoring.data.notification; +package org.thingsboard.monitoring.data; -import lombok.Data; -import org.thingsboard.monitoring.data.TransportInfo; +public class TransportFailureException extends RuntimeException { -@Data -public abstract class NotificationInfo { - private final TransportInfo transportInfo; + public TransportFailureException(Throwable cause) { + super(cause.getMessage(), cause); + } - public abstract NotificationType getType(); + public TransportFailureException(String message) { + super(message); + } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java index 099fd1cf7a..e3ae1dc4d9 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java @@ -20,6 +20,13 @@ import org.thingsboard.monitoring.config.TransportType; @Data public class TransportInfo { + private final TransportType transportType; private final String url; + + @Override + public String toString() { + return String.format("%s (%s)", transportType, url); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java index d527e8876a..2f3d65647a 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java @@ -19,7 +19,6 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import lombok.RequiredArgsConstructor; @Data @AllArgsConstructor @@ -33,8 +32,4 @@ public class TimeseriesSubscriptionCmd { private String keys; private String scope; - public String getType() { - return "TIMESERIES"; - } - } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java index 7dc24115eb..515fae2c1d 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.monitoring.data.cmd; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -13,7 +28,7 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) public class TimeseriesUpdate { - private final int cmdId; + private final int subscriptionId; private final int errorCode; private final String errorMsg; private Map>> data; diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java new file mode 100644 index 0000000000..6639b8c753 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import org.thingsboard.monitoring.data.Latency; + +import java.util.Collection; + +public class HighLatencyNotification implements Notification { + + private final Collection latencies; + private final int thresholdMs; + + public HighLatencyNotification(Collection latencies, int thresholdMs) { + this.latencies = latencies; + this.thresholdMs = thresholdMs; + } + + @Override + public String getText() { + StringBuilder text = new StringBuilder(); + text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); + latencies.forEach(latency -> { + text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg())); + }); + return text.toString(); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringRecoveryNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringRecoveryNotificationInfo.java deleted file mode 100644 index 16bf577f71..0000000000 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringRecoveryNotificationInfo.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.monitoring.data.notification; - -import org.thingsboard.monitoring.data.TransportInfo; - -public class MonitoringRecoveryNotificationInfo extends NotificationInfo { - public MonitoringRecoveryNotificationInfo(TransportInfo transportInfo) { - super(transportInfo); - } - - @Override - public NotificationType getType() { - return NotificationType.MONITORING_RECOVERY; - } -} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationType.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java similarity index 84% rename from monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationType.java rename to monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java index 4a5bfffdd1..5e07abf7fa 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationType.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/Notification.java @@ -15,9 +15,8 @@ */ package org.thingsboard.monitoring.data.notification; -public enum NotificationType { - TRANSPORT_FAILURE, - MONITORING_FAILURE, - TRANSPORT_RECOVERY, - MONITORING_RECOVERY; +public interface Notification { + + String getText(); + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportFailureNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java similarity index 62% rename from monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportFailureNotificationInfo.java rename to monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java index 29e68b27de..d58f3ea39f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportFailureNotificationInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java @@ -16,20 +16,23 @@ package org.thingsboard.monitoring.data.notification; import lombok.Getter; -import org.thingsboard.monitoring.data.TransportInfo; @Getter -public class TransportFailureNotificationInfo extends NotificationInfo { +public class ServiceFailureNotification implements Notification { + + private final Object serviceKey; private final Exception error; + private final int failuresCount; - public TransportFailureNotificationInfo(TransportInfo transportInfo, Exception error) { - super(transportInfo); + public ServiceFailureNotification(Object serviceKey, Exception error, int failuresCount) { + this.serviceKey = serviceKey; this.error = error; + this.failuresCount = failuresCount; } @Override - public NotificationType getType() { - return NotificationType.TRANSPORT_FAILURE; + public String getText() { + return String.format("[%s] Failure: %s (number of subsequent failures: %s)", serviceKey, error.getMessage(), failuresCount); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportRecoveryNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java similarity index 68% rename from monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportRecoveryNotificationInfo.java rename to monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java index 1c35944d02..f8e69e5bf6 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportRecoveryNotificationInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java @@ -15,15 +15,17 @@ */ package org.thingsboard.monitoring.data.notification; -import org.thingsboard.monitoring.data.TransportInfo; +public class ServiceRecoveryNotification implements Notification { -public class TransportRecoveryNotificationInfo extends NotificationInfo { - public TransportRecoveryNotificationInfo(TransportInfo transportInfo) { - super(transportInfo); + private final Object serviceKey; + + public ServiceRecoveryNotification(Object serviceKey) { + this.serviceKey = serviceKey; } @Override - public NotificationType getType() { - return NotificationType.TRANSPORT_RECOVERY; + public String getText() { + return String.format("[%s] is OK", serviceKey); } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java index 18bada188f..ae3619a27f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java @@ -17,25 +17,36 @@ package org.thingsboard.monitoring.notification; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.data.notification.Notification; import org.thingsboard.monitoring.notification.channels.NotificationChannel; -import org.thingsboard.monitoring.data.notification.NotificationInfo; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; -@Service +@Component @RequiredArgsConstructor @Slf4j public class NotificationService { + private final List notificationChannels; + private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(); + + public void sendNotification(Notification notification) { + forEachNotificationChannel(notificationChannel -> notificationChannel.sendNotification(notification)); + } - public void notify(NotificationInfo notificationInfo) { + private void forEachNotificationChannel(Consumer function) { notificationChannels.forEach(notificationChannel -> { - try { - notificationChannel.sendNotification(notificationInfo); - } catch (Exception e) { - log.error("Failed to send notification to {} ({})", notificationChannel.getClass().getSimpleName(), notificationInfo, e); - } + notificationExecutor.submit(() -> { + try { + function.accept(notificationChannel); + } catch (Exception e) { + log.error("Failed to send notification to {}", notificationChannel.getClass().getSimpleName(), e); + } + }); }); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java index 259ffdf9b8..7b84d7b9ae 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java @@ -15,45 +15,12 @@ */ package org.thingsboard.monitoring.notification.channels; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.thingsboard.monitoring.data.notification.MonitoringFailureNotificationInfo; -import org.thingsboard.monitoring.data.notification.NotificationInfo; -import org.thingsboard.monitoring.data.notification.TransportFailureNotificationInfo; +import org.thingsboard.monitoring.data.notification.Notification; -@Slf4j -public abstract class NotificationChannel { +public interface NotificationChannel { - public abstract void sendNotification(NotificationInfo notificationInfo); + void sendNotification(Notification notification); - - protected String createNotificationMessage(NotificationInfo notificationInfo) { - String message = String.format("[%s transport (%s)]", notificationInfo.getTransportInfo().getTransportType(), notificationInfo.getTransportInfo().getUrl()); - - switch (notificationInfo.getType()) { - case TRANSPORT_FAILURE: - TransportFailureNotificationInfo transportFailureNotificationInfo = (TransportFailureNotificationInfo) notificationInfo; - message += " Transport failure: " + getErrorMessage(transportFailureNotificationInfo.getError()); - break; - case MONITORING_FAILURE: - MonitoringFailureNotificationInfo monitoringFailureNotificationInfo = (MonitoringFailureNotificationInfo) notificationInfo; - message += " Monitoring failure: " + getErrorMessage(monitoringFailureNotificationInfo.getError()); - break; - case TRANSPORT_RECOVERY: - message += " Transport is now working"; - break; - case MONITORING_RECOVERY: - message += " Monitoring is now working"; - break; - default: - throw new UnsupportedOperationException("Notification type " + notificationInfo.getType() + " not supported"); - } - - return message; - } - - protected String getErrorMessage(Exception error) { - return ExceptionUtils.getMessage(error); - } + void sendNotification(String message); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java index 39f8cbf8d5..394ac21287 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java @@ -21,8 +21,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import org.thingsboard.monitoring.data.notification.Notification; import org.thingsboard.monitoring.notification.channels.NotificationChannel; -import org.thingsboard.monitoring.data.notification.NotificationInfo; import javax.annotation.PostConstruct; import java.time.Duration; @@ -31,7 +31,8 @@ import java.util.Map; @Component @ConditionalOnProperty(value = "monitoring.notification_channels.slack.enabled", havingValue = "true") @Slf4j -public class SlackNotificationChannel extends NotificationChannel { +public class SlackNotificationChannel implements NotificationChannel { + @Value("${monitoring.notification_channels.slack.webhook_url}") private String webhookUrl; @@ -46,8 +47,12 @@ public class SlackNotificationChannel extends NotificationChannel { } @Override - public void sendNotification(NotificationInfo notificationInfo) { - String message = createNotificationMessage(notificationInfo); + public void sendNotification(Notification notification) { + sendNotification(notification.getText()); + } + + @Override + public void sendNotification(String message) { restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java index 2a7f7ffe70..fdcf935d5f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -1,32 +1,131 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.monitoring.service; -import org.springframework.stereotype.Service; -import org.thingsboard.monitoring.data.TransportInfo; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.data.Latency; +import org.thingsboard.monitoring.data.notification.HighLatencyNotification; +import org.thingsboard.monitoring.data.notification.ServiceFailureNotification; +import org.thingsboard.monitoring.data.notification.ServiceRecoveryNotification; +import org.thingsboard.monitoring.notification.NotificationService; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; -@Service +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +@RequiredArgsConstructor +@Slf4j public class MonitoringReporter { - public void reportTransportRequestLatency(TransportInfo transportInfo, long latencyInNanos) { - double latencyInMs = (double) latencyInNanos / 1000_000; - } + private final TbClient tbClient; + private final NotificationService notificationService; + private final ScheduledExecutorService monitoringExecutor; - public void reportTransportConnectLatency(TransportInfo transportInfo, long latencyInNanos) { + private final Map latencies = new ConcurrentHashMap<>(); + private final Map failuresCounters = new ConcurrentHashMap<>(); - } + @Value("${monitoring.failures_threshold}") + private int failuresThreshold; + @Value("${monitoring.latency.threshold_ms}") + private int latencyThresholdMs; + @Value("${monitoring.send_repeated_failure_notification}") + private boolean sendRepeatedFailureNotification; + @Value("${monitoring.latency.reporting_entity_type}") + private EntityType reportingEntityType; + @Value("${monitoring.latency.reporting_entity_id}") + private String reportingEntityId; + @Value("${monitoring.latency.monitoring_rate_ms}") + private int latenciesMonitoringRateMs; - public void reportWsUpdateLatency(long latencyInNanos) { + @EventListener(ApplicationReadyEvent.class) + public void startLatenciesMonitoring() { + monitoringExecutor.scheduleWithFixedDelay(() -> { + if (latencies.isEmpty()) { + return; + } + log.info("Latencies:\n{}", latencies.values()); + if (latencies.values().stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) { + HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies.values(), latencyThresholdMs); + notificationService.sendNotification(highLatencyNotification); + } + if (reportingEntityType != null && StringUtils.isNotBlank(reportingEntityId)) { + try { + EntityId entityId; + try { + entityId = EntityIdFactory.getByTypeAndUuid(reportingEntityType, reportingEntityId); + } catch (Exception e) { + return; + } + tbClient.logIn(); + ObjectNode msg = JacksonUtil.newObjectNode(); + latencies.forEach((key, latency) -> { + msg.set(key, new DoubleNode(latency.getAvg())); + latency.reset(); + }); + tbClient.saveEntityTelemetry(entityId, "time", msg); + } catch (Exception e) { + log.error("Failed to report latencies: {}", e.getMessage()); + } + } + }, latenciesMonitoringRateMs, latenciesMonitoringRateMs, TimeUnit.MILLISECONDS); } - public void reportWsConnectLatency(long latencyInNanos) { - + public void reportLatency(String key, long latencyInNanos) { + String latencyKey = key + "Latency"; + double latencyInMs = (double) latencyInNanos / 1000_000; + latencies.computeIfAbsent(key, k -> new Latency(latencyKey)).report(latencyInMs); } - public void reportLogInLatency(long latencyInNanos) { - + public void serviceFailure(Object serviceKey, Exception error) { + int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet(); + ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount); + log.error(notification.getText()); + if (failuresCount == failuresThreshold || (sendRepeatedFailureNotification && failuresCount % failuresThreshold == 0)) { + notificationService.sendNotification(notification); + } } - public void reportFailure(TransportInfo transportInfo, Exception error) { + public void serviceIsOk(Object serviceKey) { + ServiceRecoveryNotification notification = new ServiceRecoveryNotification(serviceKey); + log.info(notification.getText()); + AtomicInteger failuresCounter = failuresCounters.get(serviceKey); + if (failuresCounter != null) { + if (failuresCounter.get() >= failuresThreshold) { + notificationService.sendNotification(notification); + } + failuresCounter.set(0); + } } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java index 22e24cf4e1..2030e01a44 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java @@ -17,32 +17,31 @@ package org.thingsboard.monitoring.service; import com.fasterxml.jackson.databind.node.TextNode; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.time.StopWatch; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.monitoring.client.TbClientFactory; +import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.client.WsClient; +import org.thingsboard.monitoring.client.WsClientFactory; import org.thingsboard.monitoring.config.MonitoringTargetConfig; import org.thingsboard.monitoring.config.TransportType; import org.thingsboard.monitoring.config.WsConfig; import org.thingsboard.monitoring.config.service.TransportMonitoringServiceConfig; +import org.thingsboard.monitoring.data.Latencies; +import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.data.TransportFailureException; import org.thingsboard.monitoring.data.TransportInfo; -import org.thingsboard.monitoring.data.notification.MonitoringFailureNotificationInfo; -import org.thingsboard.monitoring.data.notification.MonitoringRecoveryNotificationInfo; -import org.thingsboard.monitoring.data.notification.TransportFailureNotificationInfo; -import org.thingsboard.monitoring.data.notification.TransportRecoveryNotificationInfo; -import org.thingsboard.monitoring.notification.NotificationService; +import org.thingsboard.monitoring.util.TbStopWatch; import javax.annotation.PostConstruct; -import java.net.URI; +import javax.annotation.PreDestroy; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeoutException; @Slf4j public abstract class TransportMonitoringService { @@ -53,23 +52,18 @@ public abstract class TransportMonitoringService { + WsClient wsClient = null; try { - startStopWatch(); - initClient(); - monitoringReporter.reportTransportConnectLatency(transportInfo, getElapsedTime()); - - WsClient wsClient = establishWsClient(); + wsClient = establishWsClient(); wsClient.registerWaitForUpdate(); - String testPayload = createTestPayload(UUID.randomUUID().toString()); - - startStopWatch(); - Future resultFuture = requestExecutor.submit(() -> { - try { - sendTestPayload(testPayload); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - resultFuture.get(config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); - monitoringReporter.reportTransportRequestLatency(transportInfo, getElapsedTime()); - - startStopWatch(); - wsClient.waitForUpdate(wsConfig.getResultCheckTimeoutMs()); - Object update = wsClient.getTelemetryKeyUpdate(TEST_TELEMETRY_KEY); - boolean success = update != null && update.toString().equals(testPayload); - if (!success) { - throw new RuntimeException("No WS update arrived"); + + String testValue = UUID.randomUUID().toString(); + String testPayload = JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString(); + try { + initClientAndSendPayload(testPayload); + } catch (Throwable e) { + throw new TransportFailureException(e); } - monitoringReporter.reportWsUpdateLatency(getElapsedTime()); - wsClient.closeBlocking(); - destroyClient(); + + checkWsUpdate(wsClient, testValue); + + monitoringReporter.serviceIsOk(transportInfo); + } catch (TransportFailureException transportFailureException) { + monitoringReporter.serviceFailure(transportInfo, transportFailureException); } catch (Exception e) { - monitoringReporter.reportFailure(transportInfo, e); + monitoringReporter.serviceFailure(MonitoredServiceKey.GENERAL, e); + } finally { + if (wsClient != null) wsClient.close(); } }, config.getInitialDelayMs(), config.getMonitoringRateMs(), TimeUnit.MILLISECONDS); log.info("Started monitoring for transport type {} for target {}", getTransportType(), target); } - private WsClient establishWsClient() throws Exception { - startStopWatch(); - String accessToken = tbClientFactory.createClient().logIn(); - monitoringReporter.reportLogInLatency(getElapsedTime()); - - URI uri = new URI(wsConfig.getBaseUrl() + "/api/ws/plugins/telemetry?token=" + accessToken); - startStopWatch(); - WsClient wsClient = new WsClient(uri); - boolean connected = wsClient.connectBlocking(wsConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); - if (!connected) { - throw new IllegalStateException("Failed to establish WS session"); + private void initClientAndSendPayload(String payload) throws Throwable { + initClient(); + stopWatch.start(); + Future resultFuture = requestExecutor.submit(() -> { + try { + sendTestPayload(payload); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + }); + try { + resultFuture.get(config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } catch (TimeoutException e) { + throw new TimeoutException("Transport request timeout"); } + monitoringReporter.reportLatency(Latencies.transportRequest(getTransportType()), stopWatch.getTime()); + } + + private WsClient establishWsClient() throws Exception { + stopWatch.start(); + String accessToken = tbClient.logIn(); + monitoringReporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime()); + + WsClient wsClient = wsClientFactory.createClient(accessToken); wsClient.subscribeForTelemetry(target.getDevice().getId(), TEST_TELEMETRY_KEY); Optional.ofNullable(wsClient.waitForReply(wsConfig.getRequestTimeoutMs())) .orElseThrow(() -> new IllegalStateException("Failed to subscribe for telemetry")); - monitoringReporter.reportWsConnectLatency(getElapsedTime()); return wsClient; } - protected abstract void initClient() throws Exception; - - protected String createTestPayload(String testValue) { - return JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString(); - } - - protected abstract void sendTestPayload(String payload) throws Exception; - - protected abstract void destroyClient() throws Exception; - - - private void startStopWatch() { + private void checkWsUpdate(WsClient wsClient, String testValue) { stopWatch.start(); - } - - private long getElapsedTime() { - stopWatch.stop(); - long nanoTime = stopWatch.getNanoTime(); - stopWatch.reset(); - return nanoTime; - } - - private void onTransportFailure(Exception e) { - log.debug("[{}] Transport failure", transportInfo, e); - - int failuresCount = transportFailuresCounter.incrementAndGet(); - if (failuresCount == config.getFailureThreshold()) { - notificationService.notify(new TransportFailureNotificationInfo(transportInfo, e)); - } - } - - private void onMonitoringFailure(Exception e) { - log.debug("[{}] Monitoring failure", transportInfo, e); - - int failuresCount = monitoringFailuresCounter.incrementAndGet(); - if (failuresCount == config.getFailureThreshold()) { - notificationService.notify(new MonitoringFailureNotificationInfo(transportInfo, e)); + wsClient.waitForUpdate(wsConfig.getResultCheckTimeoutMs()); + Object update = wsClient.getTelemetryKeyUpdate(TEST_TELEMETRY_KEY); + if (update == null) { + throw new TransportFailureException("No WS update arrived"); + } else if (!update.toString().equals(testValue)) { + throw new TransportFailureException("Was expecting value " + testValue + " but got " + update); } + monitoringReporter.reportLatency(Latencies.WS_UPDATE, stopWatch.getTime()); } - private void onTransportIsOk() { - log.debug("[{}] Transport is OK", transportInfo); - if (transportFailuresCounter.get() >= config.getFailureThreshold()) { - notificationService.notify(new TransportRecoveryNotificationInfo(transportInfo)); - } - if (monitoringFailuresCounter.get() >= config.getFailureThreshold()) { - notificationService.notify(new MonitoringRecoveryNotificationInfo(transportInfo)); - } + protected abstract void initClient() throws Exception; - transportFailuresCounter.set(0); - monitoringFailuresCounter.set(0); - } + protected abstract void sendTestPayload(String payload) throws Exception; + @PreDestroy + protected abstract void destroyClient() throws Exception; protected abstract TransportType getTransportType(); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/WebUiHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/WebUiHealthChecker.java new file mode 100644 index 0000000000..cfba299137 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/WebUiHealthChecker.java @@ -0,0 +1,111 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.openqa.selenium.WebDriver; +import org.openqa.selenium.firefox.FirefoxDriver; +import org.openqa.selenium.firefox.FirefoxOptions; +import org.openqa.selenium.support.ui.WebDriverWait; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.data.Latencies; +import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.util.TbStopWatch; + +import javax.annotation.PostConstruct; +import java.io.File; +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.openqa.selenium.By.xpath; +import static org.openqa.selenium.support.ui.ExpectedConditions.elementToBeClickable; +import static org.openqa.selenium.support.ui.ExpectedConditions.urlContains; + +@Component +@RequiredArgsConstructor +@Slf4j +@ConditionalOnProperty(name = "monitoring.ui.enabled", havingValue = "true") +public class WebUiHealthChecker { + + @Value("${monitoring.ui.url}") + private String url; + @Value("${monitoring.auth.username}") + private String username; + @Value("${monitoring.auth.password}") + private String password; + @Value("${monitoring.ui.monitoring_rate_sec}") + private int monitoringRateSec; + @Value("${monitoring.rest_request_timeout_ms}") + private int timeoutMs; + @Value("${monitoring.ui.webdriver_location}") + private File webdriverLocation; + + private final MonitoringReporter monitoringReporter; + private final ScheduledExecutorService monitoringExecutor; + private final TbStopWatch stopWatch; + + private static final String EMAIL_FIELD = "//input[@id='username-input']"; + private static final String PASSWORD_FIELD = "//input[@id='password-input']"; + private static final String SUBMIT_BTN = "//button[@type='submit']"; + private static final String DEVICES_BTN = "//mat-toolbar//a[@href='/devices']"; + + @PostConstruct + private void init() { + System.setProperty("webdriver.gecko.driver", webdriverLocation.getAbsolutePath()); + System.setProperty(FirefoxDriver.SystemProperty.BROWSER_LOGFILE, "/dev/null"); + } + + @EventListener(ApplicationReadyEvent.class) + public void startMonitoring() { + monitoringExecutor.scheduleWithFixedDelay(() -> { + WebDriver driver = null; + try { + FirefoxOptions options = new FirefoxOptions(); + options.setHeadless(true); + driver = new FirefoxDriver(options); + WebDriverWait wait = new WebDriverWait(driver, Duration.ofMillis(timeoutMs)); + driver.manage().window().maximize(); + driver.get(url + "/login"); + + try { + stopWatch.start(); + wait.until(elementToBeClickable(xpath(EMAIL_FIELD))).sendKeys(username); + wait.until(elementToBeClickable(xpath(PASSWORD_FIELD))).sendKeys(password); + wait.until(elementToBeClickable(xpath(SUBMIT_BTN))).click(); + monitoringReporter.reportLatency(Latencies.WEB_UI_LOAD, stopWatch.getTime()); + + wait.until(urlContains("/home")); + wait.until(elementToBeClickable(xpath(DEVICES_BTN))).click(); + } catch (Exception e) { + throw new RuntimeException("Expected web UI elements were not displayed", e); + } + + monitoringReporter.serviceIsOk(MonitoredServiceKey.WEB_UI); + } catch (Exception e) { + monitoringReporter.serviceFailure(MonitoredServiceKey.WEB_UI, e); + } finally { + if (driver != null) driver.quit(); + } + }, 0, monitoringRateSec, TimeUnit.SECONDS); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java index 912d3d78c7..30315fbd50 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java @@ -21,15 +21,15 @@ import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Service; -import org.thingsboard.monitoring.config.TransportType; +import org.springframework.stereotype.Component; import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; import org.thingsboard.monitoring.config.service.CoapTransportMonitoringServiceConfig; import org.thingsboard.monitoring.service.TransportMonitoringService; import java.io.IOException; -@Service +@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class CoapTransportMonitoringService extends TransportMonitoringService { @@ -41,9 +41,12 @@ public class CoapTransportMonitoringService extends TransportMonitoringService { @@ -50,7 +49,8 @@ public class HttpTransportMonitoringService extends TransportMonitoringService { - - private Lwm2mClient lwm2mClient; - - protected Lwm2mTransportMonitoringService(Lwm2mTransportMonitoringServiceConfig config, MonitoringTargetConfig target) { - super(config, target); - } - - @Override - protected void initClient() throws Exception { - lwm2mClient = new Lwm2mClient(target.getBaseUrl(), target.getDevice().getAccessToken()); - lwm2mClient.initClient(); - } - - @Override - protected void sendTestPayload(String payload) throws Exception { - lwm2mClient.send(payload); - } - - @Override - protected String createTestPayload(String testValue) { - return testValue; - } - - @Override - protected void destroyClient() throws Exception { - lwm2mClient.destroy(); - lwm2mClient = null; - } - - @Override - protected TransportType getTransportType() { - return TransportType.LWM2M; - } - -} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java index 52674a77ca..037463783c 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java @@ -21,13 +21,13 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Service; -import org.thingsboard.monitoring.config.TransportType; +import org.springframework.stereotype.Component; import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; import org.thingsboard.monitoring.config.service.MqttTransportMonitoringServiceConfig; import org.thingsboard.monitoring.service.TransportMonitoringService; -@Service +@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class MqttTransportMonitoringService extends TransportMonitoringService { @@ -41,12 +41,15 @@ public class MqttTransportMonitoringService extends TransportMonitoringService internal = ThreadLocal.withInitial(StopWatch::new); + + public void start() { + StopWatch internal = getInternal(); + internal.reset(); + internal.start(); + } + + public long getTime() { + StopWatch internal = getInternal(); + internal.stop(); + long nanoTime = internal.getNanoTime(); + internal.reset(); + return nanoTime; } + + private StopWatch getInternal() { + return this.internal.get(); + } + } diff --git a/monitoring/src/main/resources/geckodriver b/monitoring/src/main/resources/geckodriver new file mode 100755 index 0000000000..01165dbc92 Binary files /dev/null and b/monitoring/src/main/resources/geckodriver differ diff --git a/monitoring/src/main/resources/logback.xml b/monitoring/src/main/resources/logback.xml index 88873263f6..945c7dd18e 100644 --- a/monitoring/src/main/resources/logback.xml +++ b/monitoring/src/main/resources/logback.xml @@ -25,9 +25,12 @@ - - - + + + + + + diff --git a/monitoring/src/main/resources/lwm2m/0.xml b/monitoring/src/main/resources/lwm2m/0.xml deleted file mode 100644 index b49381fb28..0000000000 --- a/monitoring/src/main/resources/lwm2m/0.xml +++ /dev/null @@ -1,364 +0,0 @@ - - - - - LWM2M Security - - 0 - urn:oma:lwm2m:oma:0:1.2 - 1.1 - 1.2 - Multiple - Mandatory - - - LWM2M Server URI - - Single - Mandatory - String - 0..255 - - - - - Bootstrap-Server - - Single - Mandatory - Boolean - - - - - - Security Mode - - Single - Mandatory - Integer - 0..4 - - - - - Public Key or Identity - - Single - Mandatory - Opaque - - - - - - Server Public Key - - Single - Mandatory - Opaque - - - - - - Secret Key - - Single - Mandatory - Opaque - - - - - - SMS Security Mode - - Single - Optional - Integer - 0..255 - - - - - SMS Binding Key Parameters - - Single - Optional - Opaque - 6 - - - - - SMS Binding Secret Key(s) - - Single - Optional - Opaque - 16,32,48 - - - - - LwM2M Server SMS Number - - Single - Optional - String - - - - - - Short Server ID - - Single - Optional - Integer - 1..65534 - - - - - Client Hold Off Time - - Single - Optional - Integer - - s - - - - Bootstrap-Server Account Timeout - - Single - Optional - Integer - - s - - - - Matching Type - - Single - Optional - Integer - 0..3 - - - - - SNI - - Single - Optional - String - - - - - - Certificate Usage - - Single - Optional - Integer - 0..3 - - - - - DTLS/TLS Ciphersuite - - Multiple - Optional - Integer - - - - - OSCORE Security Mode - - Single - Optional - Objlnk - - - - - - Groups To Use by Client - - Multiple - Optional - Integer - 0..65535 - - - - - Signature Algorithms Supported by Server - - Multiple - Optional - Integer - 0..65535 - - - - Signature Algorithms To Use by Client - - Multiple - Optional - Integer - 0..65535 - - - - - Signature Algorithm Certs Supported by Server - - Multiple - Optional - Integer - 0..65535 - - - - - TLS 1.3 Features To Use by Client - - Single - Optional - Integer - 0..65535 - - - - - TLS Extensions Supported by Server - - Single - Optional - Integer - 0..65535 - - - - - TLS Extensions To Use by Client - - Single - Optional - Integer - 0..65535 - - - - - Secondary LwM2M Server URI - - Multiple - Optional - String - 0..255 - - - - MQTT Server - - Single - Optional - Objlnk - - - - - LwM2M COSE Security - - Multiple - Optional - Objlnk - - - - - RDS Destination Port - - Single - Optional - Integer - 0..15 - - - - RDS Source Port - - Single - Optional - Integer - 0..15 - - - - RDS Application ID - - Single - Optional - String - - - - - - - - diff --git a/monitoring/src/main/resources/lwm2m/1.xml b/monitoring/src/main/resources/lwm2m/1.xml deleted file mode 100644 index 45946b118b..0000000000 --- a/monitoring/src/main/resources/lwm2m/1.xml +++ /dev/null @@ -1,319 +0,0 @@ - - - - - LwM2M Server - - 1 - urn:oma:lwm2m:oma:1:1.2 - 1.2 - 1.2 - Multiple - Mandatory - - - Short Server ID - R - Single - Mandatory - Integer - 1..65534 - - - - - Lifetime - RW - Single - Mandatory - Integer - - s - - - - Default Minimum Period - RW - Single - Optional - Integer - - s - - - - Default Maximum Period - RW - Single - Optional - Integer - - s - - - - Disable - E - Single - Optional - - - - - - - Disable Timeout - RW - Single - Optional - Integer - - s - - - - Notification Storing When Disabled or Offline - RW - Single - Mandatory - Boolean - - - - - - Binding - RW - Single - Mandatory - String - - - - - - Registration Update Trigger - E - Single - Mandatory - - - - - - - Bootstrap-Request Trigger - E - Single - Optional - - - - - - - APN Link - RW - Single - Optional - Objlnk - - - - - - TLS-DTLS Alert Code - R - Single - Optional - Integer - 0..255 - - - - - Last Bootstrapped - R - Single - Optional - Time - - - - - - Registration Priority Order - R - Single - Optional - Integer - - - - - - Initial Registration Delay Timer - RW - Single - Optional - Integer - - s - - - - Registration Failure Block - R - Single - Optional - Boolean - - - - - - Bootstrap on Registration Failure - R - Single - Optional - Boolean - - - - - - Communication Retry Count - RW - Single - Optional - Integer - - - - - - Communication Retry Timer - RW - Single - Optional - Integer - - s - - - - Communication Sequence Delay Timer - RW - Single - Optional - Integer - - s - - - - Communication Sequence Retry Count - RW - Single - Optional - Integer - - - - - - Trigger - RW - Single - Optional - Boolean - - - - - - Preferred Transport - RW - Single - Optional - String - The possible values are those listed in the LwM2M Core Specification - - - - Mute Send - RW - Single - Optional - Boolean - - - - - - Alternate APN Links - RW - Multiple - Optional - Objlnk - - - - - - Supported Server Versions - RW - Multiple - Optional - String - - - - - - Default Notification Mode - RW - Single - Optional - Integer - 0..1 - - - - - Profile ID Hash Algorithm - RW - Single - Optional - Integer - 0..255 - - - - - - - diff --git a/monitoring/src/main/resources/lwm2m/2.xml b/monitoring/src/main/resources/lwm2m/2.xml deleted file mode 100644 index fc4d0c6d1c..0000000000 --- a/monitoring/src/main/resources/lwm2m/2.xml +++ /dev/null @@ -1,83 +0,0 @@ - - - - - LwM2M Access Control - - 2 - urn:oma:lwm2m:oma:2:1.1 - 1.0 - 1.1 - Multiple - Optional - - - Object ID - R - Single - Mandatory - Integer - 1..65534 - - - - - Object Instance ID - R - Single - Mandatory - Integer - 0..65535 - - - - - ACL - RW - Multiple - Optional - Integer - 0..31 - - - - - Access Control Owner - RW - Single - Mandatory - Integer - 0..65535 - - - - - - - diff --git a/monitoring/src/main/resources/lwm2m/3.xml b/monitoring/src/main/resources/lwm2m/3.xml deleted file mode 100644 index bab017e23b..0000000000 --- a/monitoring/src/main/resources/lwm2m/3.xml +++ /dev/null @@ -1,290 +0,0 @@ - - - - - Device - - 3 - urn:oma:lwm2m:oma:3:1.0 - 1.1 - 1.0 - Single - Mandatory - - - Manufacturer - R - Single - Optional - String - - - - - - Model Number - R - Single - Optional - String - - - - - - Serial Number - R - Single - Optional - String - - - - - - Firmware Version - R - Single - Optional - String - - - - - - Reboot - E - Single - Mandatory - - - - - - - Factory Reset - E - Single - Optional - - - - - - - Available Power Sources - R - Multiple - Optional - Integer - 0..7 - - - - - Power Source Voltage - R - Multiple - Optional - Integer - - - - - - Power Source Current - R - Multiple - Optional - Integer - - - - - - Battery Level - R - Single - Optional - Integer - 0..100 - /100 - - - - Memory Free - R - Single - Optional - Integer - - - - - - Error Code - R - Multiple - Mandatory - Integer - 0..32 - - - - - Reset Error Code - E - Single - Optional - - - - - - - Current Time - RW - Single - Optional - Time - - - - - - UTC Offset - RW - Single - Optional - String - - - - - - Timezone - RW - Single - Optional - String - - - - - - Supported Binding and Modes - R - Single - Mandatory - String - - - - - Device Type - R - Single - Optional - String - - - - - Hardware Version - R - Single - Optional - String - - - - - Software Version - R - Single - Optional - String - - - - - Battery Status - R - Single - Optional - Integer - 0..6 - - - - Memory Total - R - Single - Optional - Integer - - - - - ExtDevInfo - R - Multiple - Optional - Objlnk - - - - - - - diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index 3ad0e8935a..6ff1230a41 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -14,6 +14,9 @@ # limitations under the License. # +server: + port: '${SERVER_PORT:8990}' + monitoring: auth: base_url: '${AUTH_BASE_URL:http://localhost:8080}' @@ -21,23 +24,24 @@ monitoring: password: '${AUTH_PASSWORD:tenant}' ws: base_url: '${WS_BASE_URL:ws://localhost:8080}' - check_timeout_ms: '${WS_CHECK_TIMEOUT_MS:3000}' - request_timeout_ms: '${WS_REQUEST_TIMEOUT_MS:5000}' + check_timeout_ms: '${WS_CHECK_TIMEOUT_MS:5000}' + request_timeout_ms: '${WS_REQUEST_TIMEOUT_MS:3000}' + rest_request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:5000}' - failure_threshold: '${MONITORING_FAILURE_THRESHOLD:1}' + failures_threshold: '${FAILURES_THRESHOLD:2}' + send_repeated_failure_notification: '${SEND_REPEATED_FAILURE_NOTIFICATION:true}' transports: mqtt: - enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}' - monitoring_rate_ms: '${MQTT_TRANSPORT_MONITORING_RATE_MS:10000}' + enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:false}' + monitoring_rate_ms: '${MQTT_TRANSPORT_MONITORING_RATE_MS:3000}' request_timeout_ms: '${MQTT_REQUEST_TIMEOUT_MS:4000}' initial_delay_ms: '${MQTT_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' qos: '${MQTT_QOS_LEVEL:1}' targets: - - base_url: "tcp://localhost" + - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://localhost:1883}' device: - id: - access_token: + id: '${MQTT_TRANSPORT_TARGET_DEVICE_ID:}' coap: enabled: '${COAP_TRANSPORT_MONITORING_ENABLED:false}' @@ -45,21 +49,9 @@ monitoring: request_timeout_ms: '${COAP_REQUEST_TIMEOUT_MS:4000}' initial_delay_ms: '${COAP_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' targets: - - base_url: - device: - id: - access_token: - - lwm2m: - enabled: '${LWM2M_TRANSPORT_MONITORING_ENABLED:false}' - monitoring_rate_ms: '${LWM2M_TRANSPORT_MONITORING_RATE_MS:10000}' - request_timeout_ms: '${LWM2M_REQUEST_TIMEOUT_MS:4000}' - initial_delay_ms: '${LWM2M_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' - targets: - - base_url: + - base_url: '${COAP_TRANSPORT_BASE_URL:coap://localhost}' device: - id: - access_token: + id: '${COAP_TRANSPORT_TARGET_DEVICE_ID:}' http: enabled: '${HTTP_TRANSPORT_MONITORING_ENABLED:false}' @@ -67,12 +59,25 @@ monitoring: request_timeout_ms: '${HTTP_REQUEST_TIMEOUT_MS:4000}' initial_delay_ms: '${HTTP_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' targets: - - base_url: + - base_url: '${HTTP_TRANSPORT_BASE_URL:http://localhost:8080}' device: - id: - access_token: + id: '${HTTP_TRANSPORT_TARGET_DEVICE_ID:}' + + ui: + enabled: '${UI_MONITORING_ENABLED:false}' + url: '${UI_URL:http://localhost:4200}' + monitoring_rate_sec: '${UI_MONITORING_RATE_SEC:300}' + webdriver_location: '${WEBDRIVER_LOCATION:classpath:geckodriver}' notification_channels: slack: enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}' webhook_url: '${SLACK_WEBHOOK_URL:}' + + latency: + monitoring_rate_ms: '${LATENCY_MONITORING_RATE_MS:30000}' + threshold_ms: '${LATENCY_THRESHOLD:2000}' + reporting_entity_type: '${LATENCY_REPORTING_ENTITY_TYPE:ASSET}' + reporting_entity_id: '${LATENCY_REPORTING_ENTITY_ID:}' + + monitoring_thread_pool_size: '${MONITORING_THREAD_POOL_SIZE:10}'