From 1b9dfa17c25c188d67a1b86076c765da5538bed9 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 9 Oct 2023 18:24:43 +0300 Subject: [PATCH 1/4] Refactoring for latencies tracking --- .../monitoring/data/Latencies.java | 4 ++ .../thingsboard/monitoring/data/Latency.java | 48 ++----------------- .../notification/HighLatencyNotification.java | 10 ++-- .../monitoring/service/BaseHealthChecker.java | 2 +- .../service/BaseMonitoringService.java | 5 +- .../service/MonitoringReporter.java | 26 +++++----- 6 files changed, 30 insertions(+), 65 deletions(-) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index 3370d42462..52e73a64cd 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -25,4 +25,8 @@ public class Latencies { return String.format("%sRequest", key); } + public static String wsUpdate(String key) { + return String.format("%sWsUpdate", key); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java index c64291e30f..4b17bf179f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java @@ -15,53 +15,15 @@ */ package org.thingsboard.monitoring.data; -import com.google.common.util.concurrent.AtomicDouble; -import lombok.RequiredArgsConstructor; +import lombok.Data; -import java.util.concurrent.atomic.AtomicInteger; - -@RequiredArgsConstructor +@Data(staticConstructor = "of") public class Latency { - private final String key; - private final AtomicDouble latencySum = new AtomicDouble(); - private final AtomicInteger counter = new AtomicInteger(); - - public synchronized void report(double latencyInMs) { - latencySum.addAndGet(latencyInMs); - counter.incrementAndGet(); - } - - public synchronized double getAvg() { - return latencySum.get() / counter.get(); - } - - public boolean isNotEmpty() { - return counter.get() > 0; - } - - public synchronized void reset() { - latencySum.set(0.0); - counter.set(0); - } - - public String getKey() { - return key; - } - - public synchronized Latency snapshot() { - Latency snapshot = new Latency(key); - snapshot.latencySum.set(latencySum.get()); - snapshot.counter.set(counter.get()); - return snapshot; - } + private final double value; - @Override - public String toString() { - return "Latency{" + - "key='" + key + '\'' + - ", avgLatency=" + getAvg() + - '}'; + public String getFormattedValue() { + return String.format("%,.2f ms", value); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java index 939cb7440f..5744e40d41 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java @@ -21,11 +21,11 @@ import java.util.Collection; public class HighLatencyNotification implements Notification { - private final Collection latencies; + private final Collection highLatencies; private final int thresholdMs; - public HighLatencyNotification(Collection latencies, int thresholdMs) { - this.latencies = latencies; + public HighLatencyNotification(Collection highLatencies, int thresholdMs) { + this.highLatencies = highLatencies; this.thresholdMs = thresholdMs; } @@ -33,8 +33,8 @@ public class HighLatencyNotification implements Notification { public String getText() { StringBuilder text = new StringBuilder(); text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); - latencies.forEach(latency -> { - text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg())); + highLatencies.forEach(latency -> { + text.append(String.format("[%s] %s\n", latency.getKey(), latency.getFormattedValue())); }); return text.toString(); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index affca1ce09..697573289d 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -96,7 +96,7 @@ public abstract class BaseHealthChecker, T extends MonitoringTarget> { - @Autowired + @Autowired(required = false) private List configs; private final List> healthCheckers = new LinkedList<>(); private final List devices = new LinkedList<>(); @@ -54,6 +54,9 @@ public abstract class BaseMonitoringService, T ext @PostConstruct private void init() { + if (configs == null || configs.isEmpty()) { + return; + } tbClient.logIn(); configs.forEach(config -> { config.getTargets().forEach(target -> { diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java index f41454a76a..aabf04caee 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -63,25 +63,20 @@ public class MonitoringReporter { private String reportingAssetId; public void reportLatencies(TbClient tbClient) { - List latencies = this.latencies.values().stream() - .filter(Latency::isNotEmpty) - .map(latency -> { - Latency snapshot = latency.snapshot(); - latency.reset(); - return snapshot; - }) - .collect(Collectors.toList()); if (latencies.isEmpty()) { return; } - log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms") + log.debug("Latencies:\n{}", latencies.values().stream().map(latency -> latency.getKey() + ": " + latency.getFormattedValue()) .collect(Collectors.joining("\n")) + "\n"); - if (!latencyReportingEnabled) return; - if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) { - HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs); + List highLatencies = latencies.values().stream() + .filter(latency -> latency.getValue() >= (double) latencyThresholdMs) + .collect(Collectors.toList()); + if (!highLatencies.isEmpty()) { + HighLatencyNotification highLatencyNotification = new HighLatencyNotification(highLatencies, latencyThresholdMs); notificationService.sendNotification(highLatencyNotification); + log.warn("{}", highLatencyNotification.getText()); } try { @@ -99,10 +94,11 @@ public class MonitoringReporter { } ObjectNode msg = JacksonUtil.newObjectNode(); - latencies.forEach(latency -> { - msg.set(latency.getKey(), new DoubleNode(latency.getAvg())); + latencies.values().forEach(latency -> { + msg.set(latency.getKey(), new DoubleNode(latency.getValue())); }); tbClient.saveEntityTelemetry(new AssetId(UUID.fromString(reportingAssetId)), "time", msg); + latencies.clear(); } catch (Exception e) { log.error("Failed to report latencies: {}", e.getMessage()); } @@ -112,7 +108,7 @@ public class MonitoringReporter { String latencyKey = key + "Latency"; double latencyInMs = (double) latencyInNanos / 1000_000; log.trace("Reporting latency [{}]: {} ms", key, latencyInMs); - latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs); + latencies.put(latencyKey, Latency.of(latencyKey, latencyInMs)); } public void serviceFailure(Object serviceKey, Throwable error) { From e8ba1e17ebfc3dfb6b74ca2bc14bb2018ac4ccae Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 27 Nov 2023 11:42:54 +0200 Subject: [PATCH 2/4] Monitoring for IPs associated with the domain --- .../monitoring/config/MonitoringTarget.java | 4 ++ .../transport/TransportMonitoringConfig.java | 3 +- .../transport/TransportMonitoringTarget.java | 1 + .../service/BaseMonitoringService.java | 39 ++++++++++++++++--- .../TransportsMonitoringService.java | 7 ++++ .../src/main/resources/tb-monitoring.yml | 16 ++++++-- 6 files changed, 59 insertions(+), 11 deletions(-) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java index 0e62670f81..0176b14d54 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java @@ -21,4 +21,8 @@ public interface MonitoringTarget { UUID getDeviceId(); + String getBaseUrl(); + + boolean isCheckDomainIps(); + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java index 77d702f779..c5f843192c 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java @@ -23,9 +23,8 @@ import java.util.List; @Data public abstract class TransportMonitoringConfig implements MonitoringConfig { - private int requestTimeoutMs; - private List targets; + private int requestTimeoutMs; public abstract TransportType getTransportType(); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java index 816f64fbce..e3277af0ff 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java @@ -25,6 +25,7 @@ public class TransportMonitoringTarget implements MonitoringTarget { private String baseUrl; private DeviceConfig device; // set manually during initialization + private boolean checkDomainIps; @Override public UUID getDeviceId() { diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java index def53557db..24ca9de4d4 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.monitoring.service; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -29,9 +30,14 @@ import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import org.thingsboard.monitoring.util.TbStopWatch; import javax.annotation.PostConstruct; +import java.net.InetAddress; +import java.net.URI; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; @Slf4j public abstract class BaseMonitoringService, T extends MonitoringTarget> { @@ -60,15 +66,36 @@ public abstract class BaseMonitoringService, T ext tbClient.logIn(); configs.forEach(config -> { config.getTargets().forEach(target -> { - BaseHealthChecker healthChecker = (BaseHealthChecker) createHealthChecker(config, target); - log.info("Initializing {}", healthChecker.getClass().getSimpleName()); - healthChecker.initialize(tbClient); - devices.add(target.getDeviceId()); - healthCheckers.add(healthChecker); + initHealthChecker(target, config); + if (target.isCheckDomainIps()) { + initIpsHealthCheckers(target, config); + } }); }); } + private void initHealthChecker(T target, C config) { + BaseHealthChecker healthChecker = (BaseHealthChecker) createHealthChecker(config, target); + log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl()); + healthChecker.initialize(tbClient); + devices.add(target.getDeviceId()); + healthCheckers.add(healthChecker); + } + + @SneakyThrows + private void initIpsHealthCheckers(T target, C config) { + URI baseUrl = new URI(target.getBaseUrl()); + String domain = baseUrl.getHost(); + + Set ips = Arrays.stream(InetAddress.getAllByName(domain)) + .map(InetAddress::getHostAddress) + .collect(Collectors.toSet()); + for (String ip : ips) { + String url = new URI(baseUrl.getScheme(), null, ip, baseUrl.getPort(), "", null, null).toString(); + initHealthChecker(createTarget(url), config); + } + } + public final void runChecks() { if (healthCheckers.isEmpty()) { return; @@ -99,6 +126,8 @@ public abstract class BaseMonitoringService, T ext protected abstract BaseHealthChecker createHealthChecker(C config, T target); + protected abstract T createTarget(String baseUrl); + protected abstract String getName(); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java index b3ce86e799..ca7c5a94da 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java @@ -33,6 +33,13 @@ public final class TransportsMonitoringService extends BaseMonitoringService Date: Mon, 27 Nov 2023 11:47:28 +0200 Subject: [PATCH 3/4] Root Rule Chain for monitoring tenant --- .../src/main/resources/root_rule_chain.json | 436 ++++++++++++++++++ 1 file changed, 436 insertions(+) create mode 100644 monitoring/src/main/resources/root_rule_chain.json diff --git a/monitoring/src/main/resources/root_rule_chain.json b/monitoring/src/main/resources/root_rule_chain.json new file mode 100644 index 0000000000..ed8a93fb63 --- /dev/null +++ b/monitoring/src/main/resources/root_rule_chain.json @@ -0,0 +1,436 @@ +{ + "ruleChain": { + "additionalInfo": null, + "name": "Root Rule Chain", + "type": "CORE", + "firstRuleNodeId": null, + "root": false, + "debugMode": false, + "configuration": null, + "externalId": null + }, + "metadata": { + "firstNodeIndex": 12, + "nodes": [ + { + "additionalInfo": { + "description": null, + "layoutX": 1202, + "layoutY": 221 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "Save Timeseries", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 0 + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 1000, + "layoutY": 167 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", + "name": "Save Attributes", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 1, + "configuration": { + "scope": "CLIENT_SCOPE", + "notifyDevice": false, + "sendAttributesUpdatedNotification": false, + "updateAttributesOnlyOnValueChange": false + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 566, + "layoutY": 302 + }, + "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", + "name": "Message Type Switch", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "version": 0 + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 1000, + "layoutY": 381 + }, + "type": "org.thingsboard.rule.engine.action.TbLogNode", + "name": "Log RPC from Device", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", + "tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 1000, + "layoutY": 494 + }, + "type": "org.thingsboard.rule.engine.action.TbLogNode", + "name": "Log Other", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", + "tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 1000, + "layoutY": 583 + }, + "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", + "name": "RPC Call Request", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "timeoutInSeconds": 60 + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 255, + "layoutY": 301 + }, + "type": "org.thingsboard.rule.engine.filter.TbOriginatorTypeFilterNode", + "name": "Is Entity Group", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "originatorTypes": [ + "ENTITY_GROUP" + ] + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 319, + "layoutY": 109 + }, + "type": "org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode", + "name": "Post attributes or RPC request", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "messageTypes": [ + "POST_ATTRIBUTES_REQUEST", + "RPC_CALL_FROM_SERVER_TO_DEVICE" + ] + }, + "externalId": null + }, + { + "additionalInfo": { + "layoutX": 627, + "layoutY": 108 + }, + "type": "org.thingsboard.rule.engine.transform.TbDuplicateMsgToGroupNode", + "name": "Duplicate To Group Entities", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "entityGroupId": null, + "entityGroupIsMessageOriginator": true + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", + "layoutX": 45, + "layoutY": 359 + }, + "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", + "name": "Device Profile Node", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "persistAlarmRulesState": false, + "fetchAlarmRulesStateOnStart": false + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 160, + "layoutY": 631 + }, + "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", + "name": "Test JS script", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "JS", + "jsScript": "var test = {\n a: 'a',\n b: 'b'\n};\nreturn test.a === 'a' && test.b === 'b';", + "tbelScript": "return msg.temperature > 20;" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 427, + "layoutY": 541 + }, + "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", + "name": "Test TBEL script", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return msg.temperature > 20;", + "tbelScript": "var a = \"a\";\nvar b = \"b\";\nreturn a.equals(\"a\") && b.equals(\"b\");" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 40, + "layoutY": 252 + }, + "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", + "name": "Add arrival timestamp", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", + "tbelScript": "metadata.arrivalTs = Date.now();\nreturn {msg: msg, metadata: metadata, msgType: msgType};" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 1467, + "layoutY": 267 + }, + "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", + "name": "Calculate additional latencies", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", + "tbelScript": "var arrivalLatency = metadata.arrivalTs - metadata.ts;\nvar processingTime = Date.now() - metadata.arrivalTs;\nmsg = {\n arrivalLatency: arrivalLatency,\n processingTime: processingTime\n};\nreturn {msg: msg, metadata: metadata, msgType: msgType};" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 1438, + "layoutY": 403 + }, + "type": "org.thingsboard.rule.engine.transform.TbChangeOriginatorNode", + "name": "To latencies asset", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "originatorSource": "ENTITY", + "entityType": "ASSET", + "entityNamePattern": "[Monitoring] Latencies", + "relationsQuery": { + "direction": "FROM", + "maxLevel": 1, + "filters": [ + { + "relationType": "Contains", + "entityTypes": [] + } + ], + "fetchLastLevelOnly": false + } + }, + "externalId": null + }, + { + "additionalInfo": { + "description": null, + "layoutX": 1458, + "layoutY": 505 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "Save Timeseries", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 0 + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 928, + "layoutY": 266 + }, + "type": "org.thingsboard.rule.engine.filter.TbCheckMessageNode", + "name": "Has testData", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "messageNames": [ + "testData" + ], + "metadataNames": [], + "checkAllKeys": true + }, + "externalId": null + }, + { + "additionalInfo": { + "description": null, + "layoutX": 1203, + "layoutY": 327 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "Save Timeseries with TTL", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 180, + "skipLatestPersistence": null, + "useServerTs": null + }, + "externalId": null + } + ], + "connections": [ + { + "fromIndex": 2, + "toIndex": 1, + "type": "Post attributes" + }, + { + "fromIndex": 2, + "toIndex": 3, + "type": "RPC Request from Device" + }, + { + "fromIndex": 2, + "toIndex": 4, + "type": "Other" + }, + { + "fromIndex": 2, + "toIndex": 5, + "type": "RPC Request to Device" + }, + { + "fromIndex": 2, + "toIndex": 16, + "type": "Post telemetry" + }, + { + "fromIndex": 6, + "toIndex": 2, + "type": "False" + }, + { + "fromIndex": 6, + "toIndex": 7, + "type": "True" + }, + { + "fromIndex": 7, + "toIndex": 2, + "type": "False" + }, + { + "fromIndex": 7, + "toIndex": 8, + "type": "True" + }, + { + "fromIndex": 8, + "toIndex": 2, + "type": "Success" + }, + { + "fromIndex": 9, + "toIndex": 10, + "type": "Success" + }, + { + "fromIndex": 10, + "toIndex": 11, + "type": "True" + }, + { + "fromIndex": 11, + "toIndex": 6, + "type": "True" + }, + { + "fromIndex": 12, + "toIndex": 9, + "type": "Success" + }, + { + "fromIndex": 13, + "toIndex": 14, + "type": "Success" + }, + { + "fromIndex": 14, + "toIndex": 15, + "type": "Success" + }, + { + "fromIndex": 16, + "toIndex": 0, + "type": "False" + }, + { + "fromIndex": 16, + "toIndex": 17, + "type": "True" + }, + { + "fromIndex": 17, + "toIndex": 13, + "type": "Success" + } + ], + "ruleChainConnections": null + } +} \ No newline at end of file From 98a87bfd33aa3261119759be757f8c070cfbf11b Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 27 Nov 2023 14:01:23 +0200 Subject: [PATCH 4/4] Monitoring: support for dynamic change of load-balancers list --- .../monitoring/service/BaseHealthChecker.java | 13 ++- .../service/BaseMonitoringService.java | 83 ++++++++++++++----- 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index 697573289d..290fd307c6 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -15,6 +15,7 @@ */ package org.thingsboard.monitoring.service; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -30,13 +31,17 @@ import org.thingsboard.monitoring.util.TbStopWatch; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; @RequiredArgsConstructor @Slf4j public abstract class BaseHealthChecker { + @Getter protected final C config; + @Getter protected final T target; private Object info; @@ -48,6 +53,9 @@ public abstract class BaseHealthChecker> associates = new HashMap<>(); + public static final String TEST_TELEMETRY_KEY = "testData"; @PostConstruct @@ -84,6 +92,10 @@ public abstract class BaseHealthChecker { + healthChecker.check(wsClient); + }); } private void checkWsUpdate(WsClient wsClient, String testValue) { @@ -99,7 +111,6 @@ public abstract class BaseHealthChecker, T ext tbClient.logIn(); configs.forEach(config -> { config.getTargets().forEach(target -> { - initHealthChecker(target, config); + BaseHealthChecker healthChecker = initHealthChecker(target, config); + healthCheckers.add(healthChecker); + if (target.isCheckDomainIps()) { - initIpsHealthCheckers(target, config); + getAssociatedUrls(target.getBaseUrl()).forEach(url -> { + healthChecker.getAssociates().put(url, initHealthChecker(createTarget(url), config)); + }); } }); }); } - private void initHealthChecker(T target, C config) { + private BaseHealthChecker initHealthChecker(T target, C config) { BaseHealthChecker healthChecker = (BaseHealthChecker) createHealthChecker(config, target); log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl()); healthChecker.initialize(tbClient); devices.add(target.getDeviceId()); - healthCheckers.add(healthChecker); - } - - @SneakyThrows - private void initIpsHealthCheckers(T target, C config) { - URI baseUrl = new URI(target.getBaseUrl()); - String domain = baseUrl.getHost(); - - Set ips = Arrays.stream(InetAddress.getAllByName(domain)) - .map(InetAddress::getHostAddress) - .collect(Collectors.toSet()); - for (String ip : ips) { - String url = new URI(baseUrl.getScheme(), null, ip, baseUrl.getPort(), "", null, null).toString(); - initHealthChecker(createTarget(url), config); - } + return healthChecker; } public final void runChecks() { @@ -108,9 +101,8 @@ public abstract class BaseMonitoringService, T ext try (WsClient wsClient = wsClientFactory.createClient(accessToken)) { wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply(); - for (BaseHealthChecker healthChecker : healthCheckers) { - healthChecker.check(wsClient); + check(healthChecker, wsClient); } } reporter.reportLatencies(tbClient); @@ -124,6 +116,57 @@ public abstract class BaseMonitoringService, T ext } } + private void check(BaseHealthChecker healthChecker, WsClient wsClient) throws Exception { + healthChecker.check(wsClient); + + T target = healthChecker.getTarget(); + if (target.isCheckDomainIps()) { + Set associatedUrls = getAssociatedUrls(target.getBaseUrl()); + Map> associates = healthChecker.getAssociates(); + Set prevAssociatedUrls = new HashSet<>(associates.keySet()); + + boolean changed = false; + for (String url : associatedUrls) { + if (!prevAssociatedUrls.contains(url)) { + BaseHealthChecker associate = initHealthChecker(createTarget(url), healthChecker.getConfig()); + associates.put(url, associate); + changed = true; + } + } + for (String url : prevAssociatedUrls) { + if (!associatedUrls.contains(url)) { + stopHealthChecker(healthChecker); + associates.remove(url); + changed = true; + } + } + if (changed) { + log.info("Updated IPs for {}: {} (old list: {})", target.getBaseUrl(), associatedUrls, prevAssociatedUrls); + } + } + } + + @SneakyThrows + private Set getAssociatedUrls(String baseUrl) { + URI url = new URI(baseUrl); + return Arrays.stream(InetAddress.getAllByName(url.getHost())) + .map(InetAddress::getHostAddress) + .map(ip -> { + try { + return new URI(url.getScheme(), null, ip, url.getPort(), "", null, null).toString(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + } + + private void stopHealthChecker(BaseHealthChecker healthChecker) throws Exception { + healthChecker.destroyClient(); + devices.remove(healthChecker.getTarget().getDeviceId()); + log.info("Stopped {} for {}", healthChecker.getClass().getSimpleName(), healthChecker.getTarget().getBaseUrl()); + } + protected abstract BaseHealthChecker createHealthChecker(C config, T target); protected abstract T createTarget(String baseUrl);