From 4d4a6d0dcd89506db8c3e2c83c7193f9f8f5190c Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 20:45:32 +0200 Subject: [PATCH 01/10] docs(monitoring): RPC monitoring design (rc baseline) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in two-way RPC companion check to each transport target so the monitoring loop covers the cloud-to-device direction (REST → rule engine → transport → device → response), not just the uplink. The shape mirrors how the calculated_fields flag layers an extra assertion onto the existing telemetry check today: same loop, same auto-provisioned device, same MQTT client (also subscribes the RPC topic when rpc.enabled), one extra round-trip per cycle. Failure attribution flows through MonitoringReporter unchanged, so RPC failures appear as their own row in the IncidentManager header ("MQTT RPC") alongside the transport row. Includes ASCII + mermaid diagrams of the current and proposed flows, a sequence diagram of the round-trip, an incident-attribution table, phased implementation plan, and open decisions. --- .../docs/2026-04-29-rpc-monitoring-design.md | 502 ++++++++++++++++++ 1 file changed, 502 insertions(+) create mode 100644 monitoring/docs/2026-04-29-rpc-monitoring-design.md diff --git a/monitoring/docs/2026-04-29-rpc-monitoring-design.md b/monitoring/docs/2026-04-29-rpc-monitoring-design.md new file mode 100644 index 0000000000..56a614190a --- /dev/null +++ b/monitoring/docs/2026-04-29-rpc-monitoring-design.md @@ -0,0 +1,502 @@ +# RPC Monitoring — Design + +| Status | Draft for review | +|--------|------------------| +| Author | Sergii Matviienko (`smatvienko@thingsboard.io`) | +| Date | 2026-04-29 | +| Module | `monitoring/` | +| Base | `upstream/rc` (CE) | + +## 1. Background + +`tb-monitoring` is an external service that probes a running ThingsBoard +deployment from outside the cluster. On startup it provisions every entity +it needs under a dedicated monitoring tenant — devices, profiles, calculated +fields, a rule chain, the latencies asset, and a public dashboard — and then +runs a continuous loop that, for every configured transport target, sends a +telemetry uplink and validates that the value arrives back over a WebSocket +subscription. + +What the loop does **not** cover today is the **server-to-device direction**: +a working telemetry uplink does not prove that a two-way RPC issued by the +REST API will actually reach the device through the rule engine plus the +target transport. There are production failure modes where the uplink +remains green but RPC delivery is broken — for example, a stuck rule engine +partition, or a transport microservice that can publish telemetry inwards +but cannot deliver an RPC outwards. + +Adding an opt-in RPC companion check to each transport target closes that +gap with a single new round-trip per cycle, reusing every entity and every +transport client the monitoring service already owns. + +## 2. Current architecture (`rc`) + +### 2.1 Bootstrap + +`ThingsboardMonitoringApplication.startMonitoring` runs once on +`ApplicationReadyEvent`: + +1. `MonitoringEntityService.checkEntities()` upserts the rule chain for the + monitoring tenant, finds-or-creates the `[Monitoring] Latencies` asset, + finds-or-creates the `[Monitoring] Cloud monitoring` dashboard, assigns + both to the public customer, and remembers the dashboard public link. +2. For each `BaseMonitoringService` bean (transports today; RPC will fold in + here too), `init()` walks its config's targets and calls + `entityService.checkEntities(config, target)` to lazily create the + device, device profile and (when `monitoring.calculated_fields.enabled`) + the calculated field. The resulting `DeviceConfig` (id + access token) is + written back into the target. +3. The scheduler stagger-schedules each `BaseMonitoringService` with + `(monitoring_rate_ms / count) * i` initial delay — so two services on a + 10 s rate fire at 0 s and 5 s, not on top of each other. +4. A `:rocket: Monitoring started` info notification is emitted with a + clickable public-dashboard link. + +### 2.2 Per-cycle loop (`BaseMonitoringService.runChecks`) + +Each tick is a single REST login → single WebSocket connect → single WS +subscription that covers every device this service owns → for each +`BaseHealthChecker` call `check(WsClient)`. Each phase has its own service +key so failures land precisely: + +``` +LOGIN (MonitoredServiceKey.LOGIN) "Login" +WS_CONNECT (MonitoredServiceKey.WS_CONNECT) "WS Connect" +WS_SUBSCRIBE (MonitoredServiceKey.WS_SUBSCRIBE) "WS Subscribe" +per-target check (TransportInfo, implements ShortNameProvider) +EDQS (optional) (MonitoredServiceKey.EDQS) "*EDQS*" +GENERAL (MonitoredServiceKey.GENERAL) "Monitoring" +``` + +`BaseHealthChecker.check(WsClient)` is `final` and orchestrates a fixed +template: + +``` +initClient() // (re)open transport client +sendTestPayload(uuid) // publish telemetry uplink +checkWsUpdates(...) // wait for WS testData=uuid +report ok / failure +``` + +Each transport implements `initClient`, `sendTestPayload`, `destroyClient`, +plus `getKey` / `getInfo` / `getTransportType`. + +### 2.3 Failure attribution & incident grouping (PR #15456, merged) + +`MonitoringReporter.serviceFailure(Object serviceKey, Throwable error)` +publishes a `ServiceFailureNotification` whose `getAffectedServices()` +returns `[failing(shortName(serviceKey), failuresCount)]`. `shortName` +delegates to `ShortNameProvider` when the key implements it +(`TransportInfo` already does — `"MQTT"`, `"MQTT Foo"` if a non-default +queue, etc.) and falls back to `serviceKey.toString()` otherwise. + +`IncidentManager` (Slack API mode, `monitoring.notifications.incident.*`) +threads alerts that fire within `resolution_timeout_s` (default 90 s) under +a single Slack message. The header tracks failing / high-latency / recovered +services with red / yellow / green circles and updates every minute. After +the quiet window the incident auto-resolves with a final summary. + +Any new service key fed through `MonitoringReporter` automatically lights up +in the incident header — no extra wiring needed beyond producing a stable, +unique short name. + +### 2.4 Current flow + +```text + Bootstrap (once) Per-cycle (each service, staggered) + ──────────────── ────────────────────────────────── + + entityService.checkEntities() Login (LOGIN) + rule chain WS connect (WS_CONNECT) + latencies asset WS subscribe (WS_SUBSCRIBE) + public dashboard For each target (TransportInfo): + │ initClient + ┌───────────┘ publish telemetry → wait WS + ▼ report request, wsUpdate + TransportsMonitoringService.init() EDQS (optional) + provisions devices + creds report latencies + schedule each service with stagger report GENERAL ok + send :rocket: started notification + + ┌───────────────────────────┐ + │ MonitoringReporter │ + │ .serviceFailure(key, e) │ + │ .serviceIsOk(key) │ + └────────────┬──────────────┘ + ▼ + ┌───────────────────────────┐ + │ NotificationService │ + │ Slack webhook (legacy) │ + │ IncidentManager (rc) │ + │ thread + auto-resolve │ + └───────────────────────────┘ +``` + +```mermaid +flowchart TB + subgraph Bootstrap[Bootstrap once] + EntityCheck["entityService.checkEntities()
rule chain · asset · dashboard"] + Init["BaseMonitoringService.init()
provisions devices + credentials"] + Stagger["scheduleWithFixedDelay,
initialDelay = rate / N × index"] + Started[":rocket: Monitoring started
InfoNotification w/ public link"] + end + + subgraph Loop["Per cycle"] + direction TB + Login["Login (LOGIN)"] + WsConn["WS connect (WS_CONNECT)"] + WsSub["WS subscribe (WS_SUBSCRIBE)"] + PerTgt{{"For each transport target"}} + Tgt["initClient → publish telemetry
→ wait WS testData=uuid
(serviceKey = TransportInfo)"] + Edqs["EDQS query (EDQS)"] + GenOk["report GENERAL ok + latencies"] + end + + subgraph Notify[Notification path] + Reporter[MonitoringReporter] + Notif[NotificationService] + Inc[IncidentManager
thread + auto-resolve] + Slack[Slack channel] + Reporter --> Notif --> Inc --> Slack + end + + EntityCheck --> Init --> Stagger --> Started --> Loop + Login --> WsConn --> WsSub --> PerTgt --> Tgt --> Edqs --> GenOk + Tgt -. on failure .-> Reporter +``` + +```text + Monitoring app ThingsBoard MQTT transport Echo device + ────────────── ─────────── ────────────── ─────────── + │ │ │ │ + ├─ login ──────────────►│ │ │ + ├─ WS connect ─────────►│ │ │ + ├─ WS subscribe ───────►│ │ │ + │ │ + │ ── For each transport target ── │ + ├─ MQTT publish v1/.../telemetry {testData:uuid} ─────────────────────►│ + │◄── WS update {testData:uuid} ──────────────────│ │ + │ │ + ├─ EDQS query (optional) │ + └─ report latencies, GENERAL ok │ +``` + +## 3. The gap + +The check above proves a one-way path: REST login → WS subscribe → +device-to-cloud uplink. It does **not** exercise the cloud-to-device path +that production traffic relies on, namely: + +``` +REST API → rule engine partition → transport microservice → device → response +``` + +A failure anywhere along that chain (rule engine partition stuck on a +specific `queue`, a transport microservice unable to publish back, an MQTT +broker that accepts publishes but drops subscriptions) currently goes +undetected by `tb-monitoring`. Adding an RPC companion check to each +transport target closes the loop in both directions. + +## 4. Proposed feature + +**Core idea.** RPC is an opt-in *companion check* on each existing +transport target — same loop, same auto-provisioned device, same transport +client, one extra round-trip when enabled. + +The shape mirrors the way the `monitoring.calculated_fields.enabled` flag +extends the existing telemetry check today: a flag tightens the assertion +performed within the same cycle on the same device — it does not spawn a +parallel monitoring service. + +### 4.1 Configuration + +A new optional `rpc:` sub-block on each `TransportMonitoringTarget`: + +```yaml +monitoring: + transports: + mqtt: + enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}' + qos: 1 + request_timeout_ms: 4000 + targets: + - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}' + queue: Main + rpc: + enabled: '${MQTT_RPC_MONITORING_ENABLED:false}' + request_timeout_ms: '${MQTT_RPC_REQUEST_TIMEOUT_MS:4000}' + - base_url: '${MQTT_TRANSPORT_SSL_BASE_URL:ssl://${monitoring.domain}:8883}' + queue: Main + rpc: + enabled: true # secure variant gets RPC for free +``` + +Properties of this shape: + +- No new top-level config block — RPC is part of the transport target it + exercises. +- The RPC sub-block carries no URL, no credentials, no QoS. Those are + inherited from the parent target. Secure transports (`ssl://`, client + certs) Just Work. +- A target without `rpc:` (or with `rpc.enabled: false`) keeps today's + behaviour identically — pure addition. +- Multiple targets per transport (already supported via `targets[N]`) cover + partition fan-out automatically; each target gets its own RPC sub-check + if enabled. + +### 4.2 Java surface + +``` +BaseHealthChecker + + protected void doRpcCheck(...) // default: no-op + // check(WsClient) stays final; reporter / stopWatch stay private + +TransportMonitoringTarget + + RpcCheckConfig rpc // nullable; defaults to disabled + +config.transport.RpcCheckConfig // new + - boolean enabled + - Integer requestTimeoutMs // optional override + +config.transport.RpcInfo + // ShortNameProvider wrapper that returns " RPC" + // so failures appear as a distinct row in the IncidentManager header. + +MqttTransportHealthChecker + + extends initClient() // when rpc.enabled, also subscribe + // v1/devices/me/rpc/request/+ + // on the existing MqttClient and + // echo params back as the response + + overrides doRpcCheck() // POST /api/rpc/twoway/{deviceId}, + // validate the echoed value, + // report rpcRoundTrip latency +``` + +`BaseHealthChecker.check(WsClient)` is extended once at the end of its +existing template: + +``` +initClient() +sendTestPayload(uuid) +checkWsUpdates(...) +if (rpcCheckEnabled()) doRpcCheck(); // new, after telemetry validates +report ok / failure +``` + +A telemetry-uplink failure short-circuits the RPC sub-check (the existing +try/catch around `sendTestPayload` already does this). The RPC sub-check +reports its own dedicated failure key so the incident header tells apart +"device can publish telemetry" from "rule engine can deliver an RPC". + +### 4.3 Latencies and failure keys + +- New latency: `RpcRoundTrip` (e.g. `mqttRpcRoundTrip`, + `mqttFooRpcRoundTrip` for non-default queues), reported alongside the + existing `Request` and `WsUpdate` keys for the same target. +- New failure service key: an `RpcInfo(TransportInfo)` instance whose + `getShortName()` returns `" RPC"` (e.g. + `"MQTT RPC"`, `"MQTT Foo RPC"`). The incident header therefore shows + rows like `:red_circle: MQTT RPC (3)` next to `:red_circle: MQTT (1)`. + +### 4.4 Incident manager integration + +Because failure attribution flows through the existing +`MonitoringReporter` API, no changes are needed in `IncidentManager`, +`SlackIncidentTransport`, or the incident YAML. An MQTT RPC failure is +just another `ServiceFailureNotification` whose `AffectedService` carries +`name = "MQTT RPC"`. Cases: + +| Scenario | Incident header reflects | +|----------|---------------------------| +| MQTT transport down (uplink fails) | `:red_circle: MQTT (n)`. RPC short-circuits (no separate row). | +| MQTT uplink fine, RPC delivery broken | `:red_circle: MQTT RPC (n)`. MQTT row stays green. | +| Both broken | Two rows: `:red_circle: MQTT (n), :red_circle: MQTT RPC (m)`. | +| RPC recovers first | MQTT RPC row turns `:large_green_circle:`, MQTT stays red until uplink recovers. | + +### 4.5 Proposed flow + +```text + Bootstrap (once) Per-cycle (each transport target) + ──────────────── ──────────────────────────────── + + entityService.checkEntities() Login → WS connect → WS subscribe + rule chain For each target: + latencies asset initClient + public dashboard • MQTT/HTTP/CoAP as today + │ • IF rpc.enabled, also subscribe + ┌───────────┘ v1/.../rpc/request/+ on the + ▼ SAME client + Transports.init publish telemetry → wait WS + provisions devices + creds testData=uuid + used by both telemetry & IF rpc.enabled: doRpcCheck() + RPC checks POST /api/rpc/twoway/{deviceId} + schedule services with stagger validate echo on same client + send :rocket: started report mqttRpcRoundTrip + report request, wsUpdate + EDQS (optional), GENERAL ok + + Failure path (incident grouping unchanged): + serviceFailure(transportInfo, e) → :red_circle: MQTT (n) + serviceFailure(rpcInfo(transportInfo), e) → :red_circle: MQTT RPC (n) +``` + +```mermaid +flowchart TB + subgraph Bootstrap[Bootstrap once] + EntityCheck["entityService.checkEntities()
rule chain · asset · dashboard"] + Init["Transports.init()
provisions devices + credentials
used by BOTH checks"] + Stagger["scheduleWithFixedDelay (staggered)"] + end + + subgraph Loop["Per cycle / per transport target"] + direction TB + Login["Login → WS connect → WS subscribe"] + Tx["initClient
(if rpc.enabled, also subscribe
v1/.../rpc/request/+ on same client)"] + UpL["Telemetry uplink + WS validation
report mqttRequest, mqttWsUpdate"] + RpcOpt{rpc.enabled?} + DoRpc["doRpcCheck()
POST /api/rpc/twoway → echo on same MQTT
report mqttRpcRoundTrip
(failure key = MQTT RPC)"] + Skip[skip RPC] + Edqs[EDQS · GENERAL ok] + end + + subgraph Notify[Failure attribution] + R[MonitoringReporter] --> N[NotificationService] --> Inc[IncidentManager] + Inc -->|":red_circle: MQTT (n)"| Slack[Slack thread] + Inc -->|":red_circle: MQTT RPC (n)"| Slack + end + + EntityCheck --> Init --> Stagger --> Loop + Login --> Tx --> UpL --> RpcOpt + RpcOpt -- yes --> DoRpc --> Edqs + RpcOpt -- no --> Skip --> Edqs + UpL -. on failure .-> R + DoRpc -. on failure .-> R +``` + +```text + Monitoring app ThingsBoard MQTT transport Echo device + ────────────── ─────────── (incl. ssl://) (same Paho + client) + │ │ + ├─ MQTT connect (URL = transport target URL, │ + │ creds = auto-provisioned device token) ─────────────────────────►│ + │ │ + ├─ subscribe v1/.../rpc/request/+ (only if rpc.enabled) ────────────►│ + │ │ + ├─ login → WS connect → WS subscribe ──►│ │ + │ │ + ════╪═══════════ Telemetry check (unchanged from rc) ════════════════════ + ├─ MQTT publish v1/.../telemetry {testData:uuid} ───────────────────►│ + │◄── WS update {testData:uuid} ──────────────────│ │ + │ │ + ════╪═══════════ Companion RPC check (only if rpc.enabled) ══════════════ + ├─ POST /api/rpc/twoway/{deviceId} ─────────────►│ │ + │ {method:monitoringCheck, rule engine + transport + │ params:{value:uuid2}, │ │ + │ timeout:requestTimeoutMs} │ │ + │ ├──────────►│ │ + │ ├─ pub ►│ + │ │◄─pub─│ + │◄────── 200 OK {value:uuid2} ───────────────────────────────│ │ + │ validate echo → report mqttRpcRoundTrip │ + + NOTE: ONE MqttClient covers both the telemetry uplink and the RPC echo. + Failure of telemetry uplink short-circuits the RPC sub-check. +``` + +```mermaid +sequenceDiagram + autonumber + participant Mon as Monitoring app + participant TB as ThingsBoard REST + participant TR as MQTT transport (incl. ssl://) + participant Dev as Echo device
(same Paho client) + + Note over Mon,Dev: One MQTT session, used for both checks + Mon->>TR: connect(URL = target URL, creds = provisioned token) + Mon->>TR: subscribe v1/devices/me/rpc/request/+ (only if rpc.enabled) + Mon->>TB: REST login → WS connect → WS subscribe + + rect rgb(245,245,255) + Note right of Mon: Existing telemetry check (unchanged) + Mon->>TR: publish v1/devices/me/telemetry {testData:uuid} + TR-->>Mon: WS update {testData:uuid} + end + + rect rgb(245,255,245) + Note right of Mon: New companion RPC check + Mon->>TB: POST /api/rpc/twoway/{deviceId}
{method,params:{value:uuid2}} + TB->>TR: route through rule engine partition + transport + TR->>Dev: publish v1/devices/me/rpc/request/{id} + Dev-->>TR: publish v1/devices/me/rpc/response/{id} {value:uuid2} + TR-->>TB: response + TB-->>Mon: 200 OK {value:uuid2} + Mon->>Mon: validate, report mqttRpcRoundTrip + end + + Note over Mon: On failure → MonitoringReporter.serviceFailure(rpcInfo, e)
→ IncidentManager threads ":red_circle: MQTT RPC (n)" +``` + +## 5. Implementation phases + +Each phase is an independently reviewable commit: + +1. **Land this design paper.** No code changes. +2. **Add the per-target hook.** Introduce `RpcCheckConfig`, + `TransportMonitoringTarget.rpc`, `RpcInfo`, and the no-op `doRpcCheck` + on `BaseHealthChecker`. Wire `BaseHealthChecker.check` to invoke + `doRpcCheck` after WS validation when enabled. Tests at base level. +3. **MQTT implementation.** Extend `MqttTransportHealthChecker.initClient` + to subscribe `v1/devices/me/rpc/request/+` on the existing client when + `rpc.enabled`, and implement `doRpcCheck` against + `TbClient.handleTwoWayDeviceRPCRequest`. Includes the device-side echo + callback (Paho async thread). Unit tests cover happy path, value + mismatch, REST exception, response timeout, and RPC-disabled (no + subscription). +4. **Documentation polish.** Update + `monitoring/src/main/resources/README.md` with the new YAML keys, a + secure-MQTT example, and a small troubleshooting note on + `request_timeout_ms` ordering vs `monitoring.rest.request_timeout_ms`. +5. **Follow-ups (separate issues).** HTTP RPC (long-poll + `GET /api/v1/{token}/rpc?timeout=…`) and CoAP RPC plug into the same + `doRpcCheck` hook with no further base-class changes; tracking issue + per transport. + +## 6. Open decisions + +Listed for reviewer push-back before implementation begins: + +1. **MQTT only in scope here.** HTTP/CoAP follow as separate issues. The + `doRpcCheck` hook is generic so they slot in without further base-class + churn. +2. **Two-way RPC only.** Round-trip value validation is the strongest + signal; one-way RPC would only confirm the request reached the device, + which the telemetry uplink check already covers from the other + direction. +3. **RPC failure key shape.** Use `RpcInfo(TransportInfo)` implementing + `ShortNameProvider` returning `" RPC"`. Alternative: + a string constant like `"MQTT RPC"`. Recommendation: the wrapper, so + non-default queues (`"MQTT Foo RPC"`) are handled with no extra code. +4. **Latency suffix.** Reuse the transport target's `getKey()` derivation + (`mqtt`, `mqttFoo`) and append `RpcRoundTrip` — no separate per-target + label. This lines up with `Request` / `WsUpdate` already used + for the telemetry side and means the existing latencies dashboard + widget needs no schema changes. +5. **Echo handler placement.** The MQTT echo callback (subscribe handler + that publishes `params` back as the response) lives inside + `MqttTransportHealthChecker.initClient` — same place where the telemetry + client is created, on the same Paho thread. Alternative: a separate + helper class. Recommendation: keep it inline. Echo logic is six lines + and is only meaningful in the context of one connected client. + +## 7. Code references (`rc`) + +- `BaseHealthChecker.check(WsClient)` — `monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java:70` +- `BaseMonitoringService.runChecks` — `monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java:114` +- `MonitoringEntityService.checkEntities` — `monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringEntityService.java:95` +- `MonitoringReporter.serviceFailure` — `monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java:107` +- `IncidentManager.sendAlert` — `monitoring/src/main/java/org/thingsboard/monitoring/notification/incident/IncidentManager.java:70` +- `AffectedService` record — `monitoring/src/main/java/org/thingsboard/monitoring/data/notification/AffectedService.java:18` +- `ShortNameProvider` interface — `monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ShortNameProvider.java:18` +- `TransportInfo.getShortName` — `monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java:27` +- `RestClient.handleTwoWayDeviceRPCRequest` — `rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java:2495` From 110f18fec542a01c6a78bfbdc448cd4efa3e9d01 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 21:19:12 +0200 Subject: [PATCH 02/10] feat(monitoring): add RPC check hook scaffolding (#15541) Introduce the per-target RPC companion-check seam shared by all transport implementations: - RpcCheckConfig (enabled, requestTimeoutMs) and TransportMonitoringTarget.rpc with isRpcEnabled() helper - RpcInfo wraps TransportInfo so failures appear as a distinct " RPC" row in the IncidentManager header; equals/hashCode delegate to the wrapped TransportInfo so failuresCounters keys stay stable across cycles - BaseHealthChecker.doRpcCheck() default no-op invoked between checkWsUpdates() and serviceIsOk(); telemetry/WS failures keep short-circuiting via the existing try/catch - Latencies.rpcRoundTrip(key) -> "RpcRoundTrip" Tests cover RpcCheckConfig defaults, RpcInfo short-name across queues, RpcInfo equality, and the hook's success/short-circuit/failure paths through BaseHealthChecker.check. --- .../config/transport/RpcCheckConfig.java | 26 +++ .../monitoring/config/transport/RpcInfo.java | 36 ++++ .../transport/TransportMonitoringTarget.java | 5 + .../monitoring/data/Latencies.java | 4 + .../monitoring/service/BaseHealthChecker.java | 6 + .../config/transport/RpcCheckConfigTest.java | 70 ++++++ .../config/transport/RpcInfoTest.java | 72 +++++++ .../service/BaseHealthCheckerRpcHookTest.java | 200 ++++++++++++++++++ 8 files changed, 419 insertions(+) create mode 100644 monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java create mode 100644 monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcCheckConfigTest.java create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcInfoTest.java create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/service/BaseHealthCheckerRpcHookTest.java diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java new file mode 100644 index 0000000000..590478cedc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.transport; + +import lombok.Data; + +@Data +public class RpcCheckConfig { + + private boolean enabled; + private Integer requestTimeoutMs; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java new file mode 100644 index 0000000000..3e2f5bbfa1 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.transport; + +import lombok.Data; +import org.thingsboard.monitoring.data.notification.ShortNameProvider; + +@Data +public class RpcInfo implements ShortNameProvider { + + private final TransportInfo transportInfo; + + @Override + public String getShortName() { + return transportInfo.getShortName() + " RPC"; + } + + @Override + public String toString() { + return transportInfo.toString() + " RPC"; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java index 7fa6ec3f46..5d52010646 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java @@ -30,6 +30,7 @@ public class TransportMonitoringTarget implements MonitoringTarget { private String queue; private boolean checkDomainIps; private String namePrefix; + private RpcCheckConfig rpc; @Override public UUID getDeviceId() { @@ -44,4 +45,8 @@ public class TransportMonitoringTarget implements MonitoringTarget { return Strings.nullToEmpty(namePrefix); } + public boolean isRpcEnabled() { + return rpc != null && rpc.isEnabled(); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index 4846964272..faa17065d3 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -30,4 +30,8 @@ public class Latencies { return String.format("%sWsUpdate", key); } + public static String rpcRoundTrip(String key) { + return String.format("%sRpcRoundTrip", key); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index c0e85da1ac..7666bc8147 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -88,6 +88,8 @@ public abstract class BaseHealthChecker> latestEcho(TestChecker checker) { + return (InvocationOnMock invocation) -> { + String value = checker.capturedTestValue; + return value == null ? Collections.emptyMap() : Map.of("testData", value); + }; + } + + private static final class TestConfig implements MonitoringConfig { + @Override + public List getTargets() { + return Collections.emptyList(); + } + } + + private static final class TestTarget implements MonitoringTarget { + private final UUID deviceId; + + TestTarget(UUID deviceId) { + this.deviceId = deviceId; + } + + @Override public UUID getDeviceId() { return deviceId; } + @Override public String getBaseUrl() { return "test://target"; } + @Override public boolean isCheckDomainIps() { return false; } + } + + private static class TestChecker extends BaseHealthChecker { + + String capturedTestValue; + boolean failTelemetry; + boolean rpcCheckCalled; + boolean skipRpcOverride; + Object rpcServiceKey; + Throwable rpcFailure; + + TestChecker(TestConfig config, TestTarget target) { + super(config, target); + } + + @Override protected void initialize() { } + @Override protected void initClient() { } + @Override protected void destroyClient() { } + @Override protected Object getInfo() { return "test-info"; } + @Override protected String getKey() { return "test"; } + @Override protected boolean isCfMonitoringEnabled() { return false; } + + @Override + protected String createTestPayload(String testValue) { + capturedTestValue = testValue; + return testValue; + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + if (failTelemetry) { + throw new IOException("telemetry publish failed"); + } + } + + @Override + protected void doRpcCheck() throws Exception { + if (skipRpcOverride) { + super.doRpcCheck(); + return; + } + rpcCheckCalled = true; + if (rpcFailure != null) { + throw new ServiceFailureException(rpcServiceKey, rpcFailure); + } + } + } + +} From 558e57b577396ae42072e26d41ea9358a83e1892 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 21:22:21 +0200 Subject: [PATCH 03/10] feat(monitoring): add MQTT RPC companion check (#15541) Extend MqttTransportHealthChecker with the per-target RPC sub-check: - initClient also subscribes "v1/devices/me/rpc/request/+" on the same Paho client when target.isRpcEnabled(); subscription is idempotent - echoRpcRequest publishes the request's params as the response body to "v1/devices/me/rpc/response/{requestId}" so the round-trip closes on the same MQTT session - doRpcCheck POSTs /api/rpc/twoway/{deviceId} with {method:"monitoringCheck", params:{value:}, timeout:} and validates the echoed value; latency is reported under RpcRoundTrip and failures throw ServiceFailureException keyed with RpcInfo so the IncidentManager header shows ":red_circle: MQTT RPC" TbClient and the RpcInfo/timeout helpers move to TransportHealthChecker so the next three transports can share them. Tests cover happy path, value mismatch, REST exception, response timeout, RPC-disabled (no subscription, no tbClient call), idempotent subscribe, and the echo handler's request->response shape. --- .../monitoring/service/BaseHealthChecker.java | 4 + .../transport/TransportHealthChecker.java | 15 ++ .../impl/MqttTransportHealthChecker.java | 62 +++++- .../MqttTransportHealthCheckerRpcTest.java | 204 ++++++++++++++++++ 4 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index 7666bc8147..4d5af8b8b8 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -54,6 +54,10 @@ public abstract class BaseHealthChecker> associates = new HashMap<>(); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java index 882b79df98..92bb50636a 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java @@ -17,8 +17,11 @@ package org.thingsboard.monitoring.service.transport; import com.fasterxml.jackson.databind.node.TextNode; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.transport.RpcInfo; import org.thingsboard.monitoring.config.transport.TransportInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; @@ -31,10 +34,22 @@ public abstract class TransportHealthChecker { - private MqttClient mqttClient; + MqttClient mqttClient; + private boolean rpcSubscribed; private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry"; + private static final String DEVICE_RPC_REQUEST_SUB_TOPIC = "v1/devices/me/rpc/request/+"; + private static final String DEVICE_RPC_RESPONSE_TOPIC_PREFIX = "v1/devices/me/rpc/response/"; protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, TransportMonitoringTarget target) { super(config, target); @@ -59,9 +73,26 @@ public class MqttTransportHealthChecker extends TransportHealthChecker bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture())) + .thenAnswer(invocation -> { + JsonNode params = bodyCaptor.getValue().get("params"); + return params; + }); + + checker.doRpcCheck(); + + JsonNode sent = bodyCaptor.getValue(); + assertThat(sent.get("method").asText()).isEqualTo("monitoringCheck"); + assertThat(sent.get("timeout").asInt()).isEqualTo(4000); + assertThat(sent.get("params").get("value").asText()).isNotBlank(); + } + + @Test + void doRpcCheckValueMismatchThrowsServiceFailureWithRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(JacksonUtil.newObjectNode().put("value", "wrong")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("RPC echo mismatch") + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + // target.rpc not set → isRpcEnabled() == false + + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void initClientSubscribesToRpcRequestTopicWhenRpcEnabled() throws Exception { + enableRpc(); + + checker.initClient(); + + verify(mqttClient).subscribe(eq("v1/devices/me/rpc/request/+"), eq(1), any(IMqttMessageListener.class)); + } + + @Test + void initClientSkipsSubscriptionWhenRpcDisabled() throws Exception { + checker.initClient(); + + verify(mqttClient, never()).subscribe(anyString(), anyInt(), any(IMqttMessageListener.class)); + } + + @Test + void initClientSubscribesOnlyOnce() throws Exception { + enableRpc(); + + checker.initClient(); + checker.initClient(); + + verify(mqttClient, org.mockito.Mockito.times(1)) + .subscribe(eq("v1/devices/me/rpc/request/+"), eq(1), any(IMqttMessageListener.class)); + } + + @Test + void echoRpcRequestPublishesParamsBackToResponseTopic() throws Exception { + enableRpc(); + ObjectNode body = JacksonUtil.newObjectNode(); + body.put("method", "monitoringCheck"); + ObjectNode params = JacksonUtil.newObjectNode().put("value", "uuid-42"); + body.set("params", params); + MqttMessage incoming = new MqttMessage(JacksonUtil.toString(body).getBytes(StandardCharsets.UTF_8)); + + checker.echoRpcRequest("v1/devices/me/rpc/request/77", incoming); + + ArgumentCaptor publishedCaptor = ArgumentCaptor.forClass(MqttMessage.class); + verify(mqttClient).publish(eq("v1/devices/me/rpc/response/77"), publishedCaptor.capture()); + JsonNode echoed = JacksonUtil.toJsonNode(new String(publishedCaptor.getValue().getPayload(), StandardCharsets.UTF_8)); + assertThat(echoed.get("value").asText()).isEqualTo("uuid-42"); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + +} From c0ea5b21d2b23d83c7929d34b9989516d5cd4759 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 21:26:05 +0200 Subject: [PATCH 04/10] refactor(monitoring): pull RPC echo check into TransportHealthChecker Move the UUID echo doRpcCheck implementation from MqttTransportHealthChecker up into the shared TransportHealthChecker so HTTP and CoAP inherit identical behaviour without duplication. LwM2M will keep its own override (Read /3/0/0 instead of echo). MQTT keeps a trivial @Override doRpcCheck delegate so the test in the impl package retains protected-access without ReflectionTestUtils. --- .../transport/TransportHealthChecker.java | 33 +++++++++++++++++++ .../impl/MqttTransportHealthChecker.java | 31 +---------------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java index 92bb50636a..5175b16840 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java @@ -15,6 +15,8 @@ */ package org.thingsboard.monitoring.service.transport; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -26,7 +28,11 @@ import org.thingsboard.monitoring.config.transport.TransportInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportType; +import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.BaseHealthChecker; +import org.thingsboard.server.common.data.id.DeviceId; + +import java.util.UUID; @Slf4j public abstract class TransportHealthChecker extends BaseHealthChecker { @@ -50,6 +56,33 @@ public abstract class TransportHealthChecker Date: Wed, 29 Apr 2026 21:27:43 +0200 Subject: [PATCH 05/10] feat(monitoring): add HTTP RPC companion check (#15541) Extend HttpTransportHealthChecker for the RPC sub-check on poll-based HTTP devices: - initClient now also spawns a daemon long-poll thread per target when target.isRpcEnabled(); the thread is idempotent (CAS on AtomicBoolean) and torn down by destroyClient - pollOnce performs GET /api/v1/{token}/rpc?timeout=1000; on a 200 it POSTs the request's params back to /api/v1/{token}/rpc/{id}; 408 and empty bodies are silently retried; other errors are swallowed with a short backoff so a transient REST blip cannot kill the polling loop - doRpcCheck inherits the UUID echo from TransportHealthChecker (with a thin @Override delegate so the impl-package test keeps protected access) Tests cover happy path, value mismatch, REST exception, response timeout, RPC-disabled (no doRpcCheck call, no polling thread), idempotent poll-thread start, destroyClient teardown, and pollOnce behaviour for 200/408/204. --- .../impl/HttpTransportHealthChecker.java | 75 +++++- .../HttpTransportHealthCheckerRpcTest.java | 225 ++++++++++++++++++ 2 files changed, 298 insertions(+), 2 deletions(-) create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthChecker.java index 948ac9fd7e..8298ac934e 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthChecker.java @@ -15,11 +15,15 @@ */ package org.thingsboard.monitoring.service.transport.impl; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.context.annotation.Scope; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.HttpStatusCodeException; import org.springframework.web.client.RestTemplate; import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; @@ -27,13 +31,18 @@ import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; @Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j public class HttpTransportHealthChecker extends TransportHealthChecker { - private RestTemplate restTemplate; + private static final long POLL_TIMEOUT_MS = 1000L; + + RestTemplate restTemplate; + private final AtomicBoolean rpcPolling = new AtomicBoolean(); + private Thread rpcPollThread; protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) { super(config, target); @@ -48,6 +57,57 @@ public class HttpTransportHealthChecker extends TransportHealthChecker bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture())) + .thenAnswer(inv -> bodyCaptor.getValue().get("params")); + + checker.doRpcCheck(); + + assertThat(bodyCaptor.getValue().get("method").asText()).isEqualTo("monitoringCheck"); + } + + @Test + void doRpcCheckValueMismatchUsesRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(JacksonUtil.newObjectNode().put("value", "wrong")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void initClientStartsPollingThreadWhenRpcEnabled() throws Exception { + enableRpc(); + + checker.initClient(); + + Thread thread = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + assertThat(thread).isNotNull(); + assertThat(thread.isDaemon()).isTrue(); + // give the thread a moment, then tear down + checker.destroyClient(); + } + + @Test + void initClientDoesNotStartPollingThreadWhenRpcDisabled() throws Exception { + checker.initClient(); + + Thread thread = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + assertThat(thread).isNull(); + } + + @Test + void initClientIsIdempotentForPollingThread() throws Exception { + enableRpc(); + + checker.initClient(); + Thread first = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + checker.initClient(); + Thread second = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + + assertThat(second).isSameAs(first); + checker.destroyClient(); + } + + @Test + void destroyClientStopsPollingThread() throws Exception { + enableRpc(); + checker.initClient(); + + checker.destroyClient(); + + Thread thread = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + assertThat(thread).isNull(); + } + + @Test + void pollOnceEchoesParamsWhenRpcArrives() throws Exception { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("id", 99L); + rpc.put("method", "monitoringCheck"); + rpc.set("params", JacksonUtil.newObjectNode().put("value", "uuid-99")); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(ResponseEntity.ok(rpc)); + + checker.pollOnce(); + + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(restTemplate).postForLocation(eq("http://localhost:8080/api/v1/token-abc/rpc/99"), bodyCaptor.capture()); + assertThat(bodyCaptor.getValue().get("value").asText()).isEqualTo("uuid-99"); + } + + @Test + void pollOnceIsSilentOn408() throws Exception { + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenThrow(HttpClientErrorException.create(HttpStatus.REQUEST_TIMEOUT, "timeout", null, null, null)); + + checker.pollOnce(); + + verify(restTemplate, never()).postForLocation(any(String.class), any()); + } + + @Test + void pollOnceSwallowsBodylessOk() throws Exception { + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.pollOnce(); + + verify(restTemplate, never()).postForLocation(any(String.class), any()); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + +} From 31138818b00fda46075a1ef97a0c49d5178130a2 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 21:30:51 +0200 Subject: [PATCH 06/10] feat(monitoring): add CoAP RPC companion check (#15541) Extend CoapTransportHealthChecker with the RPC sub-check on CoAP observe-based devices: - initClient also opens an OBSERVE on coap://host/api/v1/{token}/rpc via a separate CoapClient so it doesn't interfere with the existing /telemetry CoapClient; the relation is idempotent and torn down by destroyClient (proactiveCancel + shutdown of the rpc client) - handleRpcNotification parses each notification, extracts {id, params} and POSTs the params back to coap://host/api/v1/{token}/rpc/{id}; the response post lives in postRpcResponse, isolated as a testable seam and resilient to ConnectorException/IOException - doRpcCheck inherits the UUID echo from TransportHealthChecker via a thin @Override delegate (impl-package test access) Tests cover happy path, value mismatch, REST exception, response timeout, RPC-disabled (no observe relation, no tbClient call), idempotent observe, destroyClient teardown, and the notification handler's URL/payload extraction for valid, empty and id-less bodies. --- .../impl/CoapTransportHealthChecker.java | 72 ++++++ .../CoapTransportHealthCheckerRpcTest.java | 242 ++++++++++++++++++ 2 files changed, 314 insertions(+) create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthCheckerRpcTest.java diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java index 953b5c2415..e52e019e05 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java @@ -15,8 +15,11 @@ */ package org.thingsboard.monitoring.service.transport.impl; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapHandler; +import org.eclipse.californium.core.CoapObserveRelation; import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.MediaTypeRegistry; @@ -24,6 +27,7 @@ import org.eclipse.californium.elements.config.SystemConfig; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportType; @@ -41,6 +45,8 @@ public class CoapTransportHealthChecker extends TransportHealthChecker capturedBody = new AtomicReference<>(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenAnswer(inv -> { + JsonNode b = inv.getArgument(1); + capturedBody.set(b); + return b.get("params"); + }); + + checker.doRpcCheck(); + + assertThat(capturedBody.get().get("method").asText()).isEqualTo("monitoringCheck"); + } + + @Test + void doRpcCheckValueMismatchUsesRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(JacksonUtil.newObjectNode().put("value", "wrong")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void initClientStartsObserveWhenRpcEnabled() throws Exception { + enableRpc(); + + checker.initClient(); + + verify(mockRpcCoapClient).observe(any(CoapHandler.class)); + assertThat(checker.rpcObserveRelation).isSameAs(mockObserveRelation); + } + + @Test + void initClientSkipsObserveWhenRpcDisabled() throws Exception { + checker.initClient(); + + verify(mockRpcCoapClient, never()).observe(any(CoapHandler.class)); + } + + @Test + void initClientObserveIsIdempotent() throws Exception { + enableRpc(); + + checker.initClient(); + checker.initClient(); + + verify(mockRpcCoapClient, org.mockito.Mockito.times(1)).observe(any(CoapHandler.class)); + } + + @Test + void destroyClientCancelsObserveAndShutsDownRpcClient() throws Exception { + enableRpc(); + checker.initClient(); + + checker.destroyClient(); + + verify(mockObserveRelation).proactiveCancel(); + verify(mockRpcCoapClient).shutdown(); + assertThat(checker.rpcObserveRelation).isNull(); + assertThat(checker.rpcCoapClient).isNull(); + } + + @Test + void handleRpcNotificationEchoesParamsBackToResponseEndpoint() { + enableRpc(); + CoapResponse response = mock(CoapResponse.class); + when(response.getResponseText()).thenReturn( + "{\"id\":42,\"method\":\"monitoringCheck\",\"params\":{\"value\":\"uuid-42\"}}"); + + checker.handleRpcNotification(response); + + assertThat(checker.capturedResponseUri).isEqualTo("coap://localhost/api/v1/token-coap/rpc/42"); + JsonNode echoed = JacksonUtil.toJsonNode(checker.capturedResponsePayload); + assertThat(echoed.get("value").asText()).isEqualTo("uuid-42"); + } + + @Test + void handleRpcNotificationIgnoresEmptyResponse() { + enableRpc(); + CoapResponse response = mock(CoapResponse.class); + when(response.getResponseText()).thenReturn(""); + + checker.handleRpcNotification(response); + + assertThat(checker.capturedResponseUri).isNull(); + } + + @Test + void handleRpcNotificationIgnoresMalformedRpcWithoutId() { + enableRpc(); + CoapResponse response = mock(CoapResponse.class); + when(response.getResponseText()).thenReturn("{\"method\":\"x\"}"); + + checker.handleRpcNotification(response); + + assertThat(checker.capturedResponseUri).isNull(); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + + private static final class TestableCoapChecker extends CoapTransportHealthChecker { + String capturedResponseUri; + String capturedResponsePayload; + + TestableCoapChecker(CoapTransportMonitoringConfig config, TransportMonitoringTarget target) { + super(config, target); + } + + @Override + void postRpcResponse(String uri, String payload) { + capturedResponseUri = uri; + capturedResponsePayload = payload; + } + } + +} From 6e3dfdc536cac3544c49167cfcb39f23f8092c97 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 21:32:42 +0200 Subject: [PATCH 07/10] feat(monitoring): add LwM2M RPC companion check (#15541) Extend Lwm2mTransportHealthChecker with a server-initiated Read sub-check. Unlike MQTT/HTTP/CoAP there is no echo handler on the device side; the existing Lwm2mClient already serves /3/0/0 via the standard LwM2M Read operation, so the companion check is realized purely in doRpcCheck: - POST /api/rpc/twoway/{deviceId} with {method:"Read", params:{key:"/3/0/0"}, timeout:} - Assert the response is a non-blank string (the device's Manufacturer- shaped resource value); blank/null is reported via ServiceFailureException keyed with RpcInfo so failures appear as ":red_circle: LwM2M RPC" in the IncidentManager header - The default rpc.request_timeout_ms in tb-monitoring.yml is set conservatively to 6000 ms because the LwM2M Read goes through rule engine + Leshan registration; per-target overrides are honoured - No initClient changes; no echo handler Tests cover happy path with body shape (method=Read, params.key=/3/0/0, timeout), blank response, null response, REST exception, response timeout, RPC-disabled (no tbClient call), and per-target timeout override. --- .../impl/Lwm2mTransportHealthChecker.java | 34 ++++ .../Lwm2mTransportHealthCheckerRpcTest.java | 167 ++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthChecker.java index 9bb1295f29..1026f75a4e 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthChecker.java @@ -15,15 +15,21 @@ */ package org.thingsboard.monitoring.service.transport.impl; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.monitoring.client.Lwm2mClient; import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportType; +import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.transport.TransportHealthChecker; +import org.thingsboard.server.common.data.id.DeviceId; @Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @@ -56,6 +62,34 @@ public class Lwm2mTransportHealthChecker extends TransportHealthChecker bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + + checker.doRpcCheck(); + + verify(tbClient).handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture()); + JsonNode body = bodyCaptor.getValue(); + assertThat(body.get("method").asText()).isEqualTo("Read"); + assertThat(body.get("params").get("key").asText()).isEqualTo("/3/0/0"); + assertThat(body.get("timeout").asInt()).isEqualTo(6000); + } + + @Test + void doRpcCheckBlankResponseFailsWithRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(new TextNode("")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("blank result") + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + } + + @Test + void doRpcCheckNullResponseFailsWithRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(null); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void doRpcCheckUsesPerTargetTimeoutOverride() throws Exception { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + rpc.setRequestTimeoutMs(12000); + target.setRpc(rpc); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(new TextNode("ok")); + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + + checker.doRpcCheck(); + + verify(tbClient).handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture()); + assertThat(bodyCaptor.getValue().get("timeout").asInt()).isEqualTo(12000); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + +} From 8aef4329584f865dae0e36259d01e794a1b7c54d Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 29 Apr 2026 21:35:03 +0200 Subject: [PATCH 08/10] docs(monitoring): document RPC monitoring config and behaviour (#15541) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tb-monitoring.yml: add the optional rpc.{enabled,request_timeout_ms} sub-block to every transport target with comments explaining the per-transport mechanism, plus a commented-out secure-MQTT example - monitoring/src/main/resources/README.md: new file with the YAML key table, a per-transport behaviour summary, the failure-attribution semantics (RpcInfo => " RPC"), and troubleshooting notes including the rpc.request_timeout_ms < monitoring.rest.request_timeout_ms ordering constraint - 2026-04-29-rpc-monitoring-design.md: update §5 to list MQTT + HTTP + CoAP + LwM2M as shipped phases (replacing the previous "MQTT only + follow-ups" framing), reword §6 open decisions as resolved decisions with all four transports in scope, add §4.6 explaining the LwM2M Read-vs-echo asymmetry and the conservative 6000 ms default --- .../docs/2026-04-29-rpc-monitoring-design.md | 108 +++++++++++++----- monitoring/src/main/resources/README.md | 76 ++++++++++++ .../src/main/resources/tb-monitoring.yml | 35 ++++++ 3 files changed, 192 insertions(+), 27 deletions(-) create mode 100644 monitoring/src/main/resources/README.md diff --git a/monitoring/docs/2026-04-29-rpc-monitoring-design.md b/monitoring/docs/2026-04-29-rpc-monitoring-design.md index 56a614190a..499e023854 100644 --- a/monitoring/docs/2026-04-29-rpc-monitoring-design.md +++ b/monitoring/docs/2026-04-29-rpc-monitoring-design.md @@ -1,6 +1,6 @@ # RPC Monitoring — Design -| Status | Draft for review | +| Status | Implemented (PR for #15541) | |--------|------------------| | Author | Sergii Matviienko (`smatvienko@thingsboard.io`) | | Date | 2026-04-29 | @@ -313,6 +313,42 @@ just another `ServiceFailureNotification` whose `AffectedService` carries | Both broken | Two rows: `:red_circle: MQTT (n), :red_circle: MQTT RPC (m)`. | | RPC recovers first | MQTT RPC row turns `:large_green_circle:`, MQTT stays red until uplink recovers. | +### 4.6 LwM2M Read-vs-echo asymmetry + +Three of the four transports use the same value-echo round-trip: + +``` +POST /api/rpc/twoway/{deviceId} {method:"monitoringCheck", params:{value:}} + ↓ rule engine + transport + device echoes params back, value == uuid +``` + +LwM2M is the exception. There is no echo handler to add on the monitoring +side because the existing `Lwm2mClient` already implements the standard +Object 3 (Device) Read pathway: `Lwm2mClient.read(server, /3/0/0)` returns +the value that was last set via `send()` (which is what the telemetry uplink +phase does immediately before). So the LwM2M companion check uses a +server-initiated Read instead of an echo: + +``` +POST /api/rpc/twoway/{deviceId} {method:"Read", params:{key:"/3/0/0"}, timeout:6000} + ↓ rule engine + LwM2M Read + device returns the resource value +``` + +The assertion is therefore weaker — non-blank instead of value-equal — +because the resource value is not under the monitoring service's control on +the wire (the rule-engine + Leshan registration path round-trips first). +This is sufficient as a liveness signal for the cloud-to-device LwM2M Read +pathway and matches what the existing fixtures (`device_profile.json`, +`resource.json`) already expose. If a stronger assertion is wanted later, +a Write+Read pair against a custom resource on the test model would close +the gap, but that is out of scope for this PR. + +The `request_timeout_ms` default is therefore `6000` ms for LwM2M (vs `4000` +ms for the other three transports) because the LwM2M Read traverses one +extra hop (Leshan registration → device coap loop → response). + ### 4.5 Proposed flow ```text @@ -439,55 +475,73 @@ sequenceDiagram ## 5. Implementation phases -Each phase is an independently reviewable commit: +Each phase is an independently reviewable commit landing in the same PR: 1. **Land this design paper.** No code changes. 2. **Add the per-target hook.** Introduce `RpcCheckConfig`, `TransportMonitoringTarget.rpc`, `RpcInfo`, and the no-op `doRpcCheck` on `BaseHealthChecker`. Wire `BaseHealthChecker.check` to invoke - `doRpcCheck` after WS validation when enabled. Tests at base level. + `doRpcCheck` after WS validation. Tests at base level. 3. **MQTT implementation.** Extend `MqttTransportHealthChecker.initClient` - to subscribe `v1/devices/me/rpc/request/+` on the existing client when - `rpc.enabled`, and implement `doRpcCheck` against - `TbClient.handleTwoWayDeviceRPCRequest`. Includes the device-side echo - callback (Paho async thread). Unit tests cover happy path, value - mismatch, REST exception, response timeout, and RPC-disabled (no - subscription). -4. **Documentation polish.** Update - `monitoring/src/main/resources/README.md` with the new YAML keys, a + to subscribe `v1/devices/me/rpc/request/+` on the existing Paho client + when `rpc.enabled`. Implement the value-echo `doRpcCheck` once on + `TransportHealthChecker` so HTTP and CoAP can inherit it. Unit tests + cover happy path, value mismatch, REST exception, response timeout, + RPC-disabled (no subscription), and idempotent subscribe. +4. **HTTP implementation.** Extend `HttpTransportHealthChecker.initClient` + to spawn an idempotent daemon long-poll thread on + `GET /api/v1/{token}/rpc?timeout=1000`; on each 200 response, POST the + echoed params back to `/api/v1/{token}/rpc/{id}`. The thread is + lifecycle-bound to `initClient` / `destroyClient`. Tests cover the + inherited `doRpcCheck` shape plus poll-thread start/stop/idempotency + and `pollOnce` behaviour for 200/408/204. +5. **CoAP implementation.** Extend `CoapTransportHealthChecker.initClient` + to open a CoAP OBSERVE on `coap://host/api/v1/{token}/rpc` via a + second `CoapClient`; on each notification echo the params back to + `coap://host/api/v1/{token}/rpc/{id}`. `destroyClient` cancels the + observe and shuts down the rpc client. Tests cover the inherited + `doRpcCheck` shape, observe lifecycle, and notification handler + parsing. +6. **LwM2M implementation.** Override `doRpcCheck` only — no + `initClient` changes — to issue + `POST /api/rpc/twoway/{deviceId} {method:"Read", params:{key:"/3/0/0"}}` + and assert non-blank, per §4.6. +7. **Documentation polish.** Add `monitoring/src/main/resources/README.md` + with the per-transport behaviour table, the new YAML keys, the secure-MQTT example, and a small troubleshooting note on - `request_timeout_ms` ordering vs `monitoring.rest.request_timeout_ms`. -5. **Follow-ups (separate issues).** HTTP RPC (long-poll - `GET /api/v1/{token}/rpc?timeout=…`) and CoAP RPC plug into the same - `doRpcCheck` hook with no further base-class changes; tracking issue - per transport. + `rpc.request_timeout_ms` ordering vs `monitoring.rest.request_timeout_ms`. ## 6. Open decisions -Listed for reviewer push-back before implementation begins: +Listed for reviewer push-back before implementation began. All four +transports ship in this PR; the rest of the table below records the calls +that survived review. -1. **MQTT only in scope here.** HTTP/CoAP follow as separate issues. The - `doRpcCheck` hook is generic so they slot in without further base-class - churn. +1. **All four transports in scope.** Hook + MQTT + HTTP + CoAP + LwM2M ship + together so the IncidentManager header gains a coherent set of + `:red_circle: RPC` rows in one go. The hook itself is + generic enough that adding more transports later (SNMP, custom) is + just another override. 2. **Two-way RPC only.** Round-trip value validation is the strongest signal; one-way RPC would only confirm the request reached the device, which the telemetry uplink check already covers from the other direction. 3. **RPC failure key shape.** Use `RpcInfo(TransportInfo)` implementing `ShortNameProvider` returning `" RPC"`. Alternative: - a string constant like `"MQTT RPC"`. Recommendation: the wrapper, so + a string constant like `"MQTT RPC"`. Decision: the wrapper, so non-default queues (`"MQTT Foo RPC"`) are handled with no extra code. 4. **Latency suffix.** Reuse the transport target's `getKey()` derivation (`mqtt`, `mqttFoo`) and append `RpcRoundTrip` — no separate per-target label. This lines up with `Request` / `WsUpdate` already used for the telemetry side and means the existing latencies dashboard widget needs no schema changes. -5. **Echo handler placement.** The MQTT echo callback (subscribe handler - that publishes `params` back as the response) lives inside - `MqttTransportHealthChecker.initClient` — same place where the telemetry - client is created, on the same Paho thread. Alternative: a separate - helper class. Recommendation: keep it inline. Echo logic is six lines - and is only meaningful in the context of one connected client. +5. **Echo handler placement (MQTT/HTTP/CoAP).** Each transport's echo + path lives inline in its `initClient` rather than in a separate helper + class. Echo logic is small (a handful of lines per transport) and is + only meaningful in the context of one connected client. +6. **LwM2M Read instead of echo.** See §4.6. The asymmetry is documented + in `monitoring/src/main/resources/README.md` so operators understand + why the LwM2M assertion is non-blank rather than uuid-equal. ## 7. Code references (`rc`) diff --git a/monitoring/src/main/resources/README.md b/monitoring/src/main/resources/README.md new file mode 100644 index 0000000000..5965e0e3f1 --- /dev/null +++ b/monitoring/src/main/resources/README.md @@ -0,0 +1,76 @@ +# tb-monitoring — RPC companion check + +Each `monitoring.transports.{mqtt,http,coap,lwm2m}.targets[N]` accepts an optional +`rpc:` sub-block. When `rpc.enabled` is `true`, the monitoring loop runs an +extra round-trip on the same target per cycle, after the existing telemetry +uplink + WebSocket validation has succeeded. + +```yaml +monitoring: + transports: + mqtt: + request_timeout_ms: 4000 + targets: + - base_url: 'tcp://${monitoring.domain}:1883' + queue: 'Main' + rpc: + enabled: true + request_timeout_ms: 4000 + - base_url: 'ssl://${monitoring.domain}:8883' # secure variant + queue: 'Main' + rpc: + enabled: true # inherits ssl + creds +``` + +## YAML keys + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `rpc.enabled` | bool | `false` | Opt in to the per-target RPC sub-check. | +| `rpc.request_timeout_ms` | int | transport `request_timeout_ms` | Sent through to ThingsBoard as the RPC `timeout`. Must be `< monitoring.rest.request_timeout_ms`. | + +For LwM2M the default is `6000` ms because the Read goes through rule engine + +Leshan registration; the other transports default to `4000`. + +## Per-transport behaviour + +| Transport | Device side | Cloud side | Assertion | +|-----------|-------------|------------|-----------| +| MQTT | Same Paho client subscribes `v1/devices/me/rpc/request/+` and publishes echoed `params` to `v1/devices/me/rpc/response/{id}` | `POST /api/rpc/twoway/{deviceId}` `{method:"monitoringCheck", params:{value:}}` | echoed `value == uuid` | +| HTTP | Daemon long-poll thread per target: `GET /api/v1/{token}/rpc?timeout=1000`; on 200, `POST /api/v1/{token}/rpc/{id}` with the params | same as MQTT | echoed `value == uuid` | +| CoAP | Separate `CoapClient` opens an OBSERVE on `coap://host/api/v1/{token}/rpc`; on each notification, posts echoed params to `coap://host/api/v1/{token}/rpc/{id}` | same as MQTT | echoed `value == uuid` | +| LwM2M | None — the existing `Lwm2mClient` already serves `/3/0/0` via the standard LwM2M Read | `POST /api/rpc/twoway/{deviceId}` `{method:"Read", params:{key:"/3/0/0"}}` | response is non-blank | + +## Failure attribution + +RPC failures use a dedicated `RpcInfo` service key whose `getShortName()` +returns `" RPC"` (`"MQTT RPC"`, `"MQTT Foo RPC"` for +non-default queues, etc.). The IncidentManager header therefore distinguishes +`":red_circle: MQTT (n)"` from `":red_circle: MQTT RPC (m)"`. A telemetry +uplink failure short-circuits the RPC sub-check for that cycle, so a broken +transport will not double-count against both rows. + +## Latencies + +When the RPC check succeeds, the round-trip is reported under +`RpcRoundTrip` (e.g. `mqttRpcRoundTrip`, `mqttFooRpcRoundTrip` for +non-default queues), alongside the existing `Request` and `WsUpdate` +keys for the same target. No latencies dashboard schema change is needed. + +## Troubleshooting + +- **`rpc.request_timeout_ms` ordering**: the RPC HTTP call goes through the + same `monitoring.rest.request_timeout_ms` REST client. Keep + `rpc.request_timeout_ms` strictly smaller so the device-side timeout fires + first; otherwise the REST call will fail with `ResourceAccessException` + before the RPC has a chance to time out cleanly. +- **HTTP polling thread leaks**: the long-poll thread is daemon and lifecycle- + bound to `initClient` / `destroyClient`. If you see leaked threads, check + whether a custom shutdown hook is bypassing `@PreDestroy`. +- **CoAP OBSERVE behind NAT**: the OBSERVE client uses the same Californium + configuration as the telemetry client; if the telemetry check works but the + RPC observe never delivers, check NAT/firewall on the CoAP UDP path. +- **LwM2M Read on `/3/0/0` returns blank**: the monitoring `Lwm2mClient` + serves resource `0` from whatever was last sent via telemetry, so a blank + result usually means telemetry never reached the device this cycle. Confirm + the telemetry row is green first. diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index 768fd2e4c7..196b0abe6e 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -59,6 +59,21 @@ monitoring: check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${MQTT_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check on this target. + # Issues POST /api/rpc/twoway/{deviceId} with method=monitoringCheck and a uuid, + # validates the device echoes the uuid back over the same MQTT session, and + # reports the round-trip under the RpcRoundTrip latency. + # Failures appear as ":red_circle: MQTT RPC" in the IncidentManager header. + rpc: + enabled: '${MQTT_RPC_MONITORING_ENABLED:false}' + # RPC request timeout in ms; must be < monitoring.rest.request_timeout_ms. + # If unset, falls back to monitoring.transports.mqtt.request_timeout_ms. + request_timeout_ms: '${MQTT_RPC_REQUEST_TIMEOUT_MS:4000}' + # Secure MQTT example with RPC enabled by default: + # - base_url: '${MQTT_TRANSPORT_SSL_BASE_URL:ssl://${monitoring.domain}:8883}' + # queue: 'Main' + # rpc: + # enabled: true # To add more targets, use following environment variables: # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc. @@ -76,6 +91,11 @@ monitoring: check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${COAP_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check. Opens a CoAP OBSERVE on /api/v1/{token}/rpc on + # a separate CoapClient and echoes incoming RPC params back to /api/v1/{token}/rpc/{id}. + rpc: + enabled: '${COAP_RPC_MONITORING_ENABLED:false}' + request_timeout_ms: '${COAP_RPC_REQUEST_TIMEOUT_MS:4000}' # To add more targets, use following environment variables: # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc. @@ -93,6 +113,12 @@ monitoring: check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${HTTP_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check. Spawns a daemon long-poll thread per target on + # GET /api/v1/{token}/rpc?timeout=1000 and POSTs the echoed params back to + # /api/v1/{token}/rpc/{id} when an RPC arrives. + rpc: + enabled: '${HTTP_RPC_MONITORING_ENABLED:false}' + request_timeout_ms: '${HTTP_RPC_REQUEST_TIMEOUT_MS:4000}' # To add more targets, use following environment variables: # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc. @@ -110,6 +136,15 @@ monitoring: check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${LWM2M_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check. Issues a server-initiated LwM2M Read on /3/0/0 + # via /api/rpc/twoway/{deviceId} with method=Read; asserts the response is + # non-blank. The "value matches uuid" pattern used by other transports does not + # apply here because the Read is read-only — see design doc §4.6. + rpc: + enabled: '${LWM2M_RPC_MONITORING_ENABLED:false}' + # LwM2M Read goes through rule engine + Leshan registration, so default to + # 6000 ms; tune per environment. + request_timeout_ms: '${LWM2M_RPC_REQUEST_TIMEOUT_MS:6000}' # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. From 20828ca2980130c0d40c8359e8f63077fd43aa17 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 30 Apr 2026 09:13:58 +0200 Subject: [PATCH 09/10] fix(monitoring): rework HTTP RPC poll lifecycle, harden CoAP, validate RPC timeout (#15541) Address PR review on RPC monitoring feature: HTTP poll thread (main concern): - Replace `new Thread` + `AtomicBoolean` with a single-thread `ScheduledExecutorService`; manage with `Future#cancel(true)` and `awaitTermination` so destroyClient() unblocks within bounded time. - Bound `RestTemplate` read timeout to `POLL_TIMEOUT_MS + slack` so a blocked socket cannot outlive cancellation by an unbounded amount. - Tighten `catch (Throwable)` -> `catch (Exception)` and surface non-timeout errors at WARN. - Replace fixed 500 ms backoff with exponential-with-cap (500 ms -> 5 s) plus jitter so an unreachable TB is no longer hit twice per second. - Detect non-numeric `id` in poll response. - Post empty `ObjectNode` instead of `null` body when `params` is absent (RestTemplate.postForLocation with null body produces a 400). - Restore `restTemplate` field visibility to `private`; tests now use `ReflectionTestUtils.setField`. Startup validation: - `TransportHealthChecker.initialize()` now fails fast when the per-target RPC request_timeout_ms is `>=` the global `monitoring.rest.request_timeout_ms`. Without this the REST client times out before TB times out the RPC, producing false negatives. CoAP: - Defensive `isNumber()` check on rpc id node. - Tighten echo error catch to `Exception` and log at WARN. - Comment why `postRpcResponse` allocates a fresh CoapClient (observe relation owns `rpcCoapClient`). - Comment that sustained observe failure surfaces via the next server-side RPC send timing out. Tests: - Update HTTP poll tests to assert via `restTemplate` interactions instead of inspecting field shape, plus new tests: destroy-stops-polling, reschedule-after-future-dies, empty-body when params missing, non-numeric id ignored, backoff doesn't kill scheduler. - New `TransportHealthCheckerInitializeTest` covering the rpc-timeout < rest-timeout validation. - New `RpcInfoTest` assertion that an `RpcInfo` and its `TransportInfo` for the same target produce distinct incident keys. Tests: 85 in monitoring module (was 79), all passing. --- .../transport/TransportHealthChecker.java | 12 ++ .../impl/CoapTransportHealthChecker.java | 17 ++- .../impl/HttpTransportHealthChecker.java | 109 ++++++++++---- .../config/transport/RpcInfoTest.java | 12 ++ .../TransportHealthCheckerInitializeTest.java | 134 ++++++++++++++++++ .../HttpTransportHealthCheckerRpcTest.java | 122 +++++++++++++--- 6 files changed, 355 insertions(+), 51 deletions(-) create mode 100644 monitoring/src/test/java/org/thingsboard/monitoring/service/transport/TransportHealthCheckerInitializeTest.java diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java index 5175b16840..bd73dba782 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java @@ -40,6 +40,9 @@ public abstract class TransportHealthChecker= restRequestTimeoutMs) { + throw new IllegalStateException("RPC request timeout (" + rpcTimeoutMs + " ms) for " + + getTransportType() + " target " + target.getBaseUrl() + + " must be < monitoring.rest.request_timeout_ms (" + restRequestTimeoutMs + + " ms); otherwise tbClient times out before TB times out the RPC, producing false negatives."); + } + } } @Override diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java index e52e019e05..c040babbaa 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java @@ -76,6 +76,9 @@ public class CoapTransportHealthChecker extends TransportHealthChecker { - private static final long POLL_TIMEOUT_MS = 1000L; + static final long POLL_TIMEOUT_MS = 1000L; + private static final long POLL_READ_TIMEOUT_SLACK_MS = 1000L; + private static final long POLL_BACKOFF_INITIAL_MS = 500L; + private static final long POLL_BACKOFF_MAX_MS = 5_000L; + private static final long SHUTDOWN_TIMEOUT_MS = 5_000L; + private static final AtomicInteger POOL_COUNTER = new AtomicInteger(); - RestTemplate restTemplate; - private final AtomicBoolean rpcPolling = new AtomicBoolean(); - private Thread rpcPollThread; + private RestTemplate restTemplate; + private ScheduledExecutorService rpcPoller; + private Future rpcPollFuture; + private long backoffMs; protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) { super(config, target); @@ -53,39 +66,58 @@ public class HttpTransportHealthChecker extends TransportHealthChecker { + Thread t = new Thread(r, name); + t.setDaemon(true); + return t; + }; + } + + void pollTask() { + try { + pollOnce(); + backoffMs = 0L; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.warn("HTTP RPC poll error: {}", e.getMessage()); try { - pollOnce(); - } catch (InterruptedException e) { + Thread.sleep(nextBackoffMs()); + } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - return; - } catch (Throwable e) { - log.debug("HTTP RPC poll error: {}", e.getMessage()); - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - return; - } } } } + private long nextBackoffMs() { + long base = backoffMs == 0L ? POLL_BACKOFF_INITIAL_MS : Math.min(backoffMs * 2L, POLL_BACKOFF_MAX_MS); + backoffMs = base; + long jitter = ThreadLocalRandom.current().nextLong(0L, Math.max(1L, base / 2L)); + return base + jitter; + } + void pollOnce() throws InterruptedException { String accessToken = target.getDevice().getCredentials().getCredentialsId(); + // POLL_TIMEOUT_MS is sent to the server as the long-poll wait window. + // The RestTemplate read timeout above is sized to POLL_TIMEOUT_MS + slack so + // a slow server still terminates within bounded time and destroyClient() can + // unblock the poller via Future#cancel(true). String pollUrl = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc?timeout=" + POLL_TIMEOUT_MS; ResponseEntity poll; try { @@ -102,12 +134,13 @@ public class HttpTransportHealthChecker extends TransportHealthChecker { + StubTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) { + super(config, target); + } + + void initializePublic() { + initialize(); + } + + @Override protected TransportType getTransportType() { return TransportType.HTTP; } + @Override protected void initClient() {} + @Override protected void sendTestPayload(String payload) {} + @Override protected void destroyClient() {} + } +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java index 230775608b..91d7fcd4db 100644 --- a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java @@ -39,15 +39,19 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import java.net.SocketTimeoutException; import java.util.UUID; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; class HttpTransportHealthCheckerRpcTest { @@ -77,7 +81,7 @@ class HttpTransportHealthCheckerRpcTest { restTemplate = mock(RestTemplate.class); checker = new HttpTransportHealthChecker(config, target); - checker.restTemplate = restTemplate; + ReflectionTestUtils.setField(checker, "restTemplate", restTemplate); ReflectionTestUtils.setField(checker, "tbClient", tbClient); ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class)); } @@ -136,48 +140,87 @@ class HttpTransportHealthCheckerRpcTest { } @Test - void initClientStartsPollingThreadWhenRpcEnabled() throws Exception { + void initClientStartsPollingWhenRpcEnabled() throws Exception { enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); checker.initClient(); - - Thread thread = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); - assertThat(thread).isNotNull(); - assertThat(thread.isDaemon()).isTrue(); - // give the thread a moment, then tear down - checker.destroyClient(); + try { + verify(restTemplate, timeout(2000).atLeastOnce()) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + } finally { + checker.destroyClient(); + } } @Test - void initClientDoesNotStartPollingThreadWhenRpcDisabled() throws Exception { + void initClientDoesNotStartPollingWhenRpcDisabled() throws Exception { checker.initClient(); - Thread thread = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); - assertThat(thread).isNull(); + Thread.sleep(100); + verify(restTemplate, never()).getForEntity(any(String.class), eq(JsonNode.class)); + Future future = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + assertThat(future).isNull(); } @Test - void initClientIsIdempotentForPollingThread() throws Exception { + void initClientIsIdempotent() throws Exception { enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); checker.initClient(); - Thread first = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + Future first = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); checker.initClient(); - Thread second = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); + Future second = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); assertThat(second).isSameAs(first); checker.destroyClient(); } @Test - void destroyClientStopsPollingThread() throws Exception { + void destroyClientStopsPolling() throws Exception { enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + checker.initClient(); + verify(restTemplate, timeout(2000).atLeastOnce()) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); checker.destroyClient(); + Thread.sleep(100); + clearInvocations(restTemplate); + Thread.sleep(200); - Thread thread = (Thread) ReflectionTestUtils.getField(checker, "rpcPollThread"); - assertThat(thread).isNull(); + verifyNoMoreInteractions(restTemplate); + assertThat(ReflectionTestUtils.getField(checker, "rpcPollFuture")).isNull(); + assertThat(ReflectionTestUtils.getField(checker, "rpcPoller")).isNull(); + } + + @Test + void initClientReschedulesAfterPollFutureDies() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + try { + Future first = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + assertThat(first).isNotNull(); + first.cancel(true); + verify(restTemplate, timeout(2000).atLeastOnce()) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + + checker.initClient(); + + Future second = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + assertThat(second).isNotNull().isNotSameAs(first); + assertThat(second.isDone()).isFalse(); + } finally { + checker.destroyClient(); + } } @Test @@ -196,6 +239,34 @@ class HttpTransportHealthCheckerRpcTest { assertThat(bodyCaptor.getValue().get("value").asText()).isEqualTo("uuid-99"); } + @Test + void pollOncePostsEmptyBodyWhenParamsMissing() throws Exception { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("id", 7L); + rpc.put("method", "monitoringCheck"); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(ResponseEntity.ok(rpc)); + + checker.pollOnce(); + + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(restTemplate).postForLocation(eq("http://localhost:8080/api/v1/token-abc/rpc/7"), bodyCaptor.capture()); + assertThat(bodyCaptor.getValue()).isNotNull(); + assertThat(bodyCaptor.getValue().isObject()).isTrue(); + } + + @Test + void pollOnceIgnoresNonNumericId() throws Exception { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("id", "abc"); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(ResponseEntity.ok(rpc)); + + checker.pollOnce(); + + verify(restTemplate, never()).postForLocation(any(String.class), any()); + } + @Test void pollOnceIsSilentOn408() throws Exception { when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) @@ -216,6 +287,23 @@ class HttpTransportHealthCheckerRpcTest { verify(restTemplate, never()).postForLocation(any(String.class), any()); } + @Test + void pollTaskBackoffDoesNotKillScheduler() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenThrow(new RuntimeException("boom")) + .thenThrow(new RuntimeException("boom")) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + try { + verify(restTemplate, timeout(5000).atLeast(3)) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + } finally { + checker.destroyClient(); + } + } + private void enableRpc() { RpcCheckConfig rpc = new RpcCheckConfig(); rpc.setEnabled(true); From 19e66f25beb4966825498beae7cb26e06100a5b3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 30 Apr 2026 11:33:48 +0200 Subject: [PATCH 10/10] refactor(monitoring): address RPC monitoring review comments (#15541) - MqttTransportHealthChecker.echoRpcRequest: wrap body in try/catch so a malformed payload or transient publish failure can no longer propagate back into Paho's IMqttMessageListener dispatcher (matches the existing CoAP handleRpcNotification pattern). - Bump monitoring.rest.request_timeout_ms default 5000 -> 8000 so the out-of-the-box LwM2M RPC default (6000) does not trip the fail-fast invariant in TransportHealthChecker.initialize(). - Improve the fail-fast IllegalStateException message to name both the REST_REQUEST_TIMEOUT_MS env var and the per-target YAML key/env var the operator should reach for. - HttpTransportHealthChecker: replace the placeholder 1ms inter-poll delay with POLL_TIMEOUT_MS / 4 so tight-spin on immediate returns (204, bodyless 200, 408) is bounded; document SHUTDOWN_TIMEOUT_MS bound vs POLL_TIMEOUT_MS + slack. - CoapTransportHealthChecker.postRpcResponse: move CoapClient ctor inside the try block with a null-guarded shutdown() in finally to prevent socket leaks if a future Californium ctor change does I/O. - Annotate package-private fields and bridge overrides with com.google.common.annotations.VisibleForTesting; consolidate the bridge-rationale comment to a single Javadoc on TransportHealthChecker.doRpcCheck. - Extract RpcInfo.RPC_SUFFIX so tests assert against the constant rather than a duplicated " RPC" literal. - Replace Thread.sleep(...) in HttpTransportHealthCheckerRpcTest with Mockito#after(...).never() to make the destroy/disabled-poller assertions deterministic on slow CI runners. - Move Lwm2mTransportHealthChecker.READ_RESOURCE_PATH to the field block. --- .../monitoring/config/transport/RpcInfo.java | 6 ++-- .../transport/TransportHealthChecker.java | 14 ++++++++- .../impl/CoapTransportHealthChecker.java | 20 ++++++++---- .../impl/HttpTransportHealthChecker.java | 19 +++++++++--- .../impl/Lwm2mTransportHealthChecker.java | 4 +-- .../impl/MqttTransportHealthChecker.java | 31 +++++++++++++------ monitoring/src/main/resources/README.md | 6 +++- .../src/main/resources/tb-monitoring.yml | 6 ++-- .../config/transport/RpcInfoTest.java | 2 +- .../CoapTransportHealthCheckerRpcTest.java | 3 +- .../HttpTransportHealthCheckerRpcTest.java | 17 +++++----- .../Lwm2mTransportHealthCheckerRpcTest.java | 5 +-- .../MqttTransportHealthCheckerRpcTest.java | 5 +-- 13 files changed, 98 insertions(+), 40 deletions(-) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java index 3e2f5bbfa1..6ba0a699ed 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java @@ -21,16 +21,18 @@ import org.thingsboard.monitoring.data.notification.ShortNameProvider; @Data public class RpcInfo implements ShortNameProvider { + public static final String RPC_SUFFIX = " RPC"; + private final TransportInfo transportInfo; @Override public String getShortName() { - return transportInfo.getShortName() + " RPC"; + return transportInfo.getShortName() + RPC_SUFFIX; } @Override public String toString() { - return transportInfo.toString() + " RPC"; + return transportInfo.toString() + RPC_SUFFIX; } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java index bd73dba782..b57bd705e4 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java @@ -59,6 +59,14 @@ public abstract class TransportHealthCheckerDeclared {@code protected} in this package; per-transport impls in + * {@code org.thingsboard.monitoring.service.transport.impl} expose narrow {@code @VisibleForTesting} + * delegating overrides so that same-package unit tests can drive {@code doRpcCheck()} directly + * without resorting to {@code ReflectionTestUtils.invokeMethod(...)}. + */ @Override protected void doRpcCheck() throws Exception { if (!target.isRpcEnabled()) { @@ -92,10 +100,14 @@ public abstract class TransportHealthChecker= restRequestTimeoutMs) { + String transportName = getTransportType().name().toLowerCase(); throw new IllegalStateException("RPC request timeout (" + rpcTimeoutMs + " ms) for " + getTransportType() + " target " + target.getBaseUrl() + " must be < monitoring.rest.request_timeout_ms (" + restRequestTimeoutMs - + " ms); otherwise tbClient times out before TB times out the RPC, producing false negatives."); + + " ms); otherwise tbClient times out before TB times out the RPC, producing false negatives." + + " Either raise REST_REQUEST_TIMEOUT_MS above " + rpcTimeoutMs + + " ms or lower monitoring.transports." + transportName + ".targets[*].rpc.request_timeout_ms" + + " (env: " + getTransportType() + "_RPC_REQUEST_TIMEOUT_MS)."); } } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java index c040babbaa..3cca7fa760 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java @@ -16,6 +16,7 @@ package org.thingsboard.monitoring.service.transport.impl; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapClient; import org.eclipse.californium.core.CoapHandler; @@ -45,7 +46,9 @@ public class CoapTransportHealthChecker extends TransportHealthChecker { + private static final String READ_RESOURCE_PATH = "/3/0/0"; + private Lwm2mClient lwm2mClient; protected Lwm2mTransportHealthChecker(Lwm2mTransportMonitoringConfig config, TransportMonitoringTarget target) { @@ -62,8 +64,6 @@ public class Lwm2mTransportHealthChecker extends TransportHealthChecker { + @VisibleForTesting MqttClient mqttClient; private boolean rpcSubscribed; @@ -76,15 +78,25 @@ public class MqttTransportHealthChecker extends TransportHealthChecker checker.doRpcCheck()) .isInstanceOf(ServiceFailureException.class) .extracting(t -> ((ServiceFailureException) t).getServiceKey()) - .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); } @Test diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java index 91d7fcd4db..ca3eaa2a1e 100644 --- a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java @@ -31,6 +31,7 @@ import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.config.transport.DeviceConfig; import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.MonitoringReporter; @@ -46,12 +47,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.after; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; class HttpTransportHealthCheckerRpcTest { @@ -107,7 +108,7 @@ class HttpTransportHealthCheckerRpcTest { assertThatThrownBy(() -> checker.doRpcCheck()) .isInstanceOf(ServiceFailureException.class) .extracting(t -> ((ServiceFailureException) t).getServiceKey()) - .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); } @Test @@ -158,8 +159,9 @@ class HttpTransportHealthCheckerRpcTest { void initClientDoesNotStartPollingWhenRpcDisabled() throws Exception { checker.initClient(); - Thread.sleep(100); - verify(restTemplate, never()).getForEntity(any(String.class), eq(JsonNode.class)); + // Mockito#after waits for the window then asserts no invocation occurred — + // deterministic on slow CI runners, unlike Thread.sleep + verify(never()). + verify(restTemplate, after(200).never()).getForEntity(any(String.class), eq(JsonNode.class)); Future future = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); assertThat(future).isNull(); } @@ -190,11 +192,12 @@ class HttpTransportHealthCheckerRpcTest { .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); checker.destroyClient(); - Thread.sleep(100); clearInvocations(restTemplate); - Thread.sleep(200); - verifyNoMoreInteractions(restTemplate); + // Wait the window and verify no further polls fire — replaces brittle + // Thread.sleep + verifyNoMoreInteractions which races on slow runners. + verify(restTemplate, after(200).never()) + .getForEntity(any(String.class), eq(JsonNode.class)); assertThat(ReflectionTestUtils.getField(checker, "rpcPollFuture")).isNull(); assertThat(ReflectionTestUtils.getField(checker, "rpcPoller")).isNull(); } diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java index 5e8d3c3c86..5e6be624d0 100644 --- a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java @@ -27,6 +27,7 @@ import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.config.transport.DeviceConfig; import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.MonitoringReporter; @@ -98,7 +99,7 @@ class Lwm2mTransportHealthCheckerRpcTest { .isInstanceOf(ServiceFailureException.class) .hasMessageContaining("blank result") .extracting(t -> ((ServiceFailureException) t).getServiceKey()) - .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); } @Test @@ -110,7 +111,7 @@ class Lwm2mTransportHealthCheckerRpcTest { assertThatThrownBy(() -> checker.doRpcCheck()) .isInstanceOf(ServiceFailureException.class) .extracting(t -> ((ServiceFailureException) t).getServiceKey()) - .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); } @Test diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java index 2d85062bdc..de27c2471d 100644 --- a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java @@ -30,6 +30,7 @@ import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.config.transport.DeviceConfig; import org.thingsboard.monitoring.config.transport.MqttTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.MonitoringReporter; @@ -115,7 +116,7 @@ class MqttTransportHealthCheckerRpcTest { .isInstanceOf(ServiceFailureException.class) .hasMessageContaining("RPC echo mismatch") .extracting(t -> ((ServiceFailureException) t).getServiceKey()) - .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); } @Test @@ -128,7 +129,7 @@ class MqttTransportHealthCheckerRpcTest { .isInstanceOf(ServiceFailureException.class) .hasCauseInstanceOf(RuntimeException.class) .extracting(t -> ((ServiceFailureException) t).getServiceKey()) - .satisfies(key -> assertThat(key.toString()).endsWith("RPC")); + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); } @Test