diff --git a/monitoring/docs/2026-04-29-rpc-monitoring-design.md b/monitoring/docs/2026-04-29-rpc-monitoring-design.md new file mode 100644 index 0000000000..499e023854 --- /dev/null +++ b/monitoring/docs/2026-04-29-rpc-monitoring-design.md @@ -0,0 +1,556 @@ +# RPC Monitoring — Design + +| Status | Implemented (PR for #15541) | +|--------|------------------| +| Author | Sergii Matviienko (`smatvienko@thingsboard.io`) | +| Date | 2026-04-29 | +| Module | `monitoring/` | +| Base | `upstream/rc` (CE) | + +## 1. Background + +`tb-monitoring` is an external service that probes a running ThingsBoard +deployment from outside the cluster. On startup it provisions every entity +it needs under a dedicated monitoring tenant — devices, profiles, calculated +fields, a rule chain, the latencies asset, and a public dashboard — and then +runs a continuous loop that, for every configured transport target, sends a +telemetry uplink and validates that the value arrives back over a WebSocket +subscription. + +What the loop does **not** cover today is the **server-to-device direction**: +a working telemetry uplink does not prove that a two-way RPC issued by the +REST API will actually reach the device through the rule engine plus the +target transport. There are production failure modes where the uplink +remains green but RPC delivery is broken — for example, a stuck rule engine +partition, or a transport microservice that can publish telemetry inwards +but cannot deliver an RPC outwards. + +Adding an opt-in RPC companion check to each transport target closes that +gap with a single new round-trip per cycle, reusing every entity and every +transport client the monitoring service already owns. + +## 2. Current architecture (`rc`) + +### 2.1 Bootstrap + +`ThingsboardMonitoringApplication.startMonitoring` runs once on +`ApplicationReadyEvent`: + +1. `MonitoringEntityService.checkEntities()` upserts the rule chain for the + monitoring tenant, finds-or-creates the `[Monitoring] Latencies` asset, + finds-or-creates the `[Monitoring] Cloud monitoring` dashboard, assigns + both to the public customer, and remembers the dashboard public link. +2. For each `BaseMonitoringService` bean (transports today; RPC will fold in + here too), `init()` walks its config's targets and calls + `entityService.checkEntities(config, target)` to lazily create the + device, device profile and (when `monitoring.calculated_fields.enabled`) + the calculated field. The resulting `DeviceConfig` (id + access token) is + written back into the target. +3. The scheduler stagger-schedules each `BaseMonitoringService` with + `(monitoring_rate_ms / count) * i` initial delay — so two services on a + 10 s rate fire at 0 s and 5 s, not on top of each other. +4. A `:rocket: Monitoring started` info notification is emitted with a + clickable public-dashboard link. + +### 2.2 Per-cycle loop (`BaseMonitoringService.runChecks`) + +Each tick is a single REST login → single WebSocket connect → single WS +subscription that covers every device this service owns → for each +`BaseHealthChecker` call `check(WsClient)`. Each phase has its own service +key so failures land precisely: + +``` +LOGIN (MonitoredServiceKey.LOGIN) "Login" +WS_CONNECT (MonitoredServiceKey.WS_CONNECT) "WS Connect" +WS_SUBSCRIBE (MonitoredServiceKey.WS_SUBSCRIBE) "WS Subscribe" +per-target check (TransportInfo, implements ShortNameProvider) +EDQS (optional) (MonitoredServiceKey.EDQS) "*EDQS*" +GENERAL (MonitoredServiceKey.GENERAL) "Monitoring" +``` + +`BaseHealthChecker.check(WsClient)` is `final` and orchestrates a fixed +template: + +``` +initClient() // (re)open transport client +sendTestPayload(uuid) // publish telemetry uplink +checkWsUpdates(...) // wait for WS testData=uuid +report ok / failure +``` + +Each transport implements `initClient`, `sendTestPayload`, `destroyClient`, +plus `getKey` / `getInfo` / `getTransportType`. + +### 2.3 Failure attribution & incident grouping (PR #15456, merged) + +`MonitoringReporter.serviceFailure(Object serviceKey, Throwable error)` +publishes a `ServiceFailureNotification` whose `getAffectedServices()` +returns `[failing(shortName(serviceKey), failuresCount)]`. `shortName` +delegates to `ShortNameProvider` when the key implements it +(`TransportInfo` already does — `"MQTT"`, `"MQTT Foo"` if a non-default +queue, etc.) and falls back to `serviceKey.toString()` otherwise. + +`IncidentManager` (Slack API mode, `monitoring.notifications.incident.*`) +threads alerts that fire within `resolution_timeout_s` (default 90 s) under +a single Slack message. The header tracks failing / high-latency / recovered +services with red / yellow / green circles and updates every minute. After +the quiet window the incident auto-resolves with a final summary. + +Any new service key fed through `MonitoringReporter` automatically lights up +in the incident header — no extra wiring needed beyond producing a stable, +unique short name. + +### 2.4 Current flow + +```text + Bootstrap (once) Per-cycle (each service, staggered) + ──────────────── ────────────────────────────────── + + entityService.checkEntities() Login (LOGIN) + rule chain WS connect (WS_CONNECT) + latencies asset WS subscribe (WS_SUBSCRIBE) + public dashboard For each target (TransportInfo): + │ initClient + ┌───────────┘ publish telemetry → wait WS + ▼ report request, wsUpdate + TransportsMonitoringService.init() EDQS (optional) + provisions devices + creds report latencies + schedule each service with stagger report GENERAL ok + send :rocket: started notification + + ┌───────────────────────────┐ + │ MonitoringReporter │ + │ .serviceFailure(key, e) │ + │ .serviceIsOk(key) │ + └────────────┬──────────────┘ + ▼ + ┌───────────────────────────┐ + │ NotificationService │ + │ Slack webhook (legacy) │ + │ IncidentManager (rc) │ + │ thread + auto-resolve │ + └───────────────────────────┘ +``` + +```mermaid +flowchart TB + subgraph Bootstrap[Bootstrap once] + EntityCheck["entityService.checkEntities()
rule chain · asset · dashboard"] + Init["BaseMonitoringService.init()
provisions devices + credentials"] + Stagger["scheduleWithFixedDelay,
initialDelay = rate / N × index"] + Started[":rocket: Monitoring started
InfoNotification w/ public link"] + end + + subgraph Loop["Per cycle"] + direction TB + Login["Login (LOGIN)"] + WsConn["WS connect (WS_CONNECT)"] + WsSub["WS subscribe (WS_SUBSCRIBE)"] + PerTgt{{"For each transport target"}} + Tgt["initClient → publish telemetry
→ wait WS testData=uuid
(serviceKey = TransportInfo)"] + Edqs["EDQS query (EDQS)"] + GenOk["report GENERAL ok + latencies"] + end + + subgraph Notify[Notification path] + Reporter[MonitoringReporter] + Notif[NotificationService] + Inc[IncidentManager
thread + auto-resolve] + Slack[Slack channel] + Reporter --> Notif --> Inc --> Slack + end + + EntityCheck --> Init --> Stagger --> Started --> Loop + Login --> WsConn --> WsSub --> PerTgt --> Tgt --> Edqs --> GenOk + Tgt -. on failure .-> Reporter +``` + +```text + Monitoring app ThingsBoard MQTT transport Echo device + ────────────── ─────────── ────────────── ─────────── + │ │ │ │ + ├─ login ──────────────►│ │ │ + ├─ WS connect ─────────►│ │ │ + ├─ WS subscribe ───────►│ │ │ + │ │ + │ ── For each transport target ── │ + ├─ MQTT publish v1/.../telemetry {testData:uuid} ─────────────────────►│ + │◄── WS update {testData:uuid} ──────────────────│ │ + │ │ + ├─ EDQS query (optional) │ + └─ report latencies, GENERAL ok │ +``` + +## 3. The gap + +The check above proves a one-way path: REST login → WS subscribe → +device-to-cloud uplink. It does **not** exercise the cloud-to-device path +that production traffic relies on, namely: + +``` +REST API → rule engine partition → transport microservice → device → response +``` + +A failure anywhere along that chain (rule engine partition stuck on a +specific `queue`, a transport microservice unable to publish back, an MQTT +broker that accepts publishes but drops subscriptions) currently goes +undetected by `tb-monitoring`. Adding an RPC companion check to each +transport target closes the loop in both directions. + +## 4. Proposed feature + +**Core idea.** RPC is an opt-in *companion check* on each existing +transport target — same loop, same auto-provisioned device, same transport +client, one extra round-trip when enabled. + +The shape mirrors the way the `monitoring.calculated_fields.enabled` flag +extends the existing telemetry check today: a flag tightens the assertion +performed within the same cycle on the same device — it does not spawn a +parallel monitoring service. + +### 4.1 Configuration + +A new optional `rpc:` sub-block on each `TransportMonitoringTarget`: + +```yaml +monitoring: + transports: + mqtt: + enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}' + qos: 1 + request_timeout_ms: 4000 + targets: + - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}' + queue: Main + rpc: + enabled: '${MQTT_RPC_MONITORING_ENABLED:false}' + request_timeout_ms: '${MQTT_RPC_REQUEST_TIMEOUT_MS:4000}' + - base_url: '${MQTT_TRANSPORT_SSL_BASE_URL:ssl://${monitoring.domain}:8883}' + queue: Main + rpc: + enabled: true # secure variant gets RPC for free +``` + +Properties of this shape: + +- No new top-level config block — RPC is part of the transport target it + exercises. +- The RPC sub-block carries no URL, no credentials, no QoS. Those are + inherited from the parent target. Secure transports (`ssl://`, client + certs) Just Work. +- A target without `rpc:` (or with `rpc.enabled: false`) keeps today's + behaviour identically — pure addition. +- Multiple targets per transport (already supported via `targets[N]`) cover + partition fan-out automatically; each target gets its own RPC sub-check + if enabled. + +### 4.2 Java surface + +``` +BaseHealthChecker + + protected void doRpcCheck(...) // default: no-op + // check(WsClient) stays final; reporter / stopWatch stay private + +TransportMonitoringTarget + + RpcCheckConfig rpc // nullable; defaults to disabled + +config.transport.RpcCheckConfig // new + - boolean enabled + - Integer requestTimeoutMs // optional override + +config.transport.RpcInfo + // ShortNameProvider wrapper that returns " RPC" + // so failures appear as a distinct row in the IncidentManager header. + +MqttTransportHealthChecker + + extends initClient() // when rpc.enabled, also subscribe + // v1/devices/me/rpc/request/+ + // on the existing MqttClient and + // echo params back as the response + + overrides doRpcCheck() // POST /api/rpc/twoway/{deviceId}, + // validate the echoed value, + // report rpcRoundTrip latency +``` + +`BaseHealthChecker.check(WsClient)` is extended once at the end of its +existing template: + +``` +initClient() +sendTestPayload(uuid) +checkWsUpdates(...) +if (rpcCheckEnabled()) doRpcCheck(); // new, after telemetry validates +report ok / failure +``` + +A telemetry-uplink failure short-circuits the RPC sub-check (the existing +try/catch around `sendTestPayload` already does this). The RPC sub-check +reports its own dedicated failure key so the incident header tells apart +"device can publish telemetry" from "rule engine can deliver an RPC". + +### 4.3 Latencies and failure keys + +- New latency: `RpcRoundTrip` (e.g. `mqttRpcRoundTrip`, + `mqttFooRpcRoundTrip` for non-default queues), reported alongside the + existing `Request` and `WsUpdate` keys for the same target. +- New failure service key: an `RpcInfo(TransportInfo)` instance whose + `getShortName()` returns `" RPC"` (e.g. + `"MQTT RPC"`, `"MQTT Foo RPC"`). The incident header therefore shows + rows like `:red_circle: MQTT RPC (3)` next to `:red_circle: MQTT (1)`. + +### 4.4 Incident manager integration + +Because failure attribution flows through the existing +`MonitoringReporter` API, no changes are needed in `IncidentManager`, +`SlackIncidentTransport`, or the incident YAML. An MQTT RPC failure is +just another `ServiceFailureNotification` whose `AffectedService` carries +`name = "MQTT RPC"`. Cases: + +| Scenario | Incident header reflects | +|----------|---------------------------| +| MQTT transport down (uplink fails) | `:red_circle: MQTT (n)`. RPC short-circuits (no separate row). | +| MQTT uplink fine, RPC delivery broken | `:red_circle: MQTT RPC (n)`. MQTT row stays green. | +| Both broken | Two rows: `:red_circle: MQTT (n), :red_circle: MQTT RPC (m)`. | +| RPC recovers first | MQTT RPC row turns `:large_green_circle:`, MQTT stays red until uplink recovers. | + +### 4.6 LwM2M Read-vs-echo asymmetry + +Three of the four transports use the same value-echo round-trip: + +``` +POST /api/rpc/twoway/{deviceId} {method:"monitoringCheck", params:{value:}} + ↓ rule engine + transport + device echoes params back, value == uuid +``` + +LwM2M is the exception. There is no echo handler to add on the monitoring +side because the existing `Lwm2mClient` already implements the standard +Object 3 (Device) Read pathway: `Lwm2mClient.read(server, /3/0/0)` returns +the value that was last set via `send()` (which is what the telemetry uplink +phase does immediately before). So the LwM2M companion check uses a +server-initiated Read instead of an echo: + +``` +POST /api/rpc/twoway/{deviceId} {method:"Read", params:{key:"/3/0/0"}, timeout:6000} + ↓ rule engine + LwM2M Read + device returns the resource value +``` + +The assertion is therefore weaker — non-blank instead of value-equal — +because the resource value is not under the monitoring service's control on +the wire (the rule-engine + Leshan registration path round-trips first). +This is sufficient as a liveness signal for the cloud-to-device LwM2M Read +pathway and matches what the existing fixtures (`device_profile.json`, +`resource.json`) already expose. If a stronger assertion is wanted later, +a Write+Read pair against a custom resource on the test model would close +the gap, but that is out of scope for this PR. + +The `request_timeout_ms` default is therefore `6000` ms for LwM2M (vs `4000` +ms for the other three transports) because the LwM2M Read traverses one +extra hop (Leshan registration → device coap loop → response). + +### 4.5 Proposed flow + +```text + Bootstrap (once) Per-cycle (each transport target) + ──────────────── ──────────────────────────────── + + entityService.checkEntities() Login → WS connect → WS subscribe + rule chain For each target: + latencies asset initClient + public dashboard • MQTT/HTTP/CoAP as today + │ • IF rpc.enabled, also subscribe + ┌───────────┘ v1/.../rpc/request/+ on the + ▼ SAME client + Transports.init publish telemetry → wait WS + provisions devices + creds testData=uuid + used by both telemetry & IF rpc.enabled: doRpcCheck() + RPC checks POST /api/rpc/twoway/{deviceId} + schedule services with stagger validate echo on same client + send :rocket: started report mqttRpcRoundTrip + report request, wsUpdate + EDQS (optional), GENERAL ok + + Failure path (incident grouping unchanged): + serviceFailure(transportInfo, e) → :red_circle: MQTT (n) + serviceFailure(rpcInfo(transportInfo), e) → :red_circle: MQTT RPC (n) +``` + +```mermaid +flowchart TB + subgraph Bootstrap[Bootstrap once] + EntityCheck["entityService.checkEntities()
rule chain · asset · dashboard"] + Init["Transports.init()
provisions devices + credentials
used by BOTH checks"] + Stagger["scheduleWithFixedDelay (staggered)"] + end + + subgraph Loop["Per cycle / per transport target"] + direction TB + Login["Login → WS connect → WS subscribe"] + Tx["initClient
(if rpc.enabled, also subscribe
v1/.../rpc/request/+ on same client)"] + UpL["Telemetry uplink + WS validation
report mqttRequest, mqttWsUpdate"] + RpcOpt{rpc.enabled?} + DoRpc["doRpcCheck()
POST /api/rpc/twoway → echo on same MQTT
report mqttRpcRoundTrip
(failure key = MQTT RPC)"] + Skip[skip RPC] + Edqs[EDQS · GENERAL ok] + end + + subgraph Notify[Failure attribution] + R[MonitoringReporter] --> N[NotificationService] --> Inc[IncidentManager] + Inc -->|":red_circle: MQTT (n)"| Slack[Slack thread] + Inc -->|":red_circle: MQTT RPC (n)"| Slack + end + + EntityCheck --> Init --> Stagger --> Loop + Login --> Tx --> UpL --> RpcOpt + RpcOpt -- yes --> DoRpc --> Edqs + RpcOpt -- no --> Skip --> Edqs + UpL -. on failure .-> R + DoRpc -. on failure .-> R +``` + +```text + Monitoring app ThingsBoard MQTT transport Echo device + ────────────── ─────────── (incl. ssl://) (same Paho + client) + │ │ + ├─ MQTT connect (URL = transport target URL, │ + │ creds = auto-provisioned device token) ─────────────────────────►│ + │ │ + ├─ subscribe v1/.../rpc/request/+ (only if rpc.enabled) ────────────►│ + │ │ + ├─ login → WS connect → WS subscribe ──►│ │ + │ │ + ════╪═══════════ Telemetry check (unchanged from rc) ════════════════════ + ├─ MQTT publish v1/.../telemetry {testData:uuid} ───────────────────►│ + │◄── WS update {testData:uuid} ──────────────────│ │ + │ │ + ════╪═══════════ Companion RPC check (only if rpc.enabled) ══════════════ + ├─ POST /api/rpc/twoway/{deviceId} ─────────────►│ │ + │ {method:monitoringCheck, rule engine + transport + │ params:{value:uuid2}, │ │ + │ timeout:requestTimeoutMs} │ │ + │ ├──────────►│ │ + │ ├─ pub ►│ + │ │◄─pub─│ + │◄────── 200 OK {value:uuid2} ───────────────────────────────│ │ + │ validate echo → report mqttRpcRoundTrip │ + + NOTE: ONE MqttClient covers both the telemetry uplink and the RPC echo. + Failure of telemetry uplink short-circuits the RPC sub-check. +``` + +```mermaid +sequenceDiagram + autonumber + participant Mon as Monitoring app + participant TB as ThingsBoard REST + participant TR as MQTT transport (incl. ssl://) + participant Dev as Echo device
(same Paho client) + + Note over Mon,Dev: One MQTT session, used for both checks + Mon->>TR: connect(URL = target URL, creds = provisioned token) + Mon->>TR: subscribe v1/devices/me/rpc/request/+ (only if rpc.enabled) + Mon->>TB: REST login → WS connect → WS subscribe + + rect rgb(245,245,255) + Note right of Mon: Existing telemetry check (unchanged) + Mon->>TR: publish v1/devices/me/telemetry {testData:uuid} + TR-->>Mon: WS update {testData:uuid} + end + + rect rgb(245,255,245) + Note right of Mon: New companion RPC check + Mon->>TB: POST /api/rpc/twoway/{deviceId}
{method,params:{value:uuid2}} + TB->>TR: route through rule engine partition + transport + TR->>Dev: publish v1/devices/me/rpc/request/{id} + Dev-->>TR: publish v1/devices/me/rpc/response/{id} {value:uuid2} + TR-->>TB: response + TB-->>Mon: 200 OK {value:uuid2} + Mon->>Mon: validate, report mqttRpcRoundTrip + end + + Note over Mon: On failure → MonitoringReporter.serviceFailure(rpcInfo, e)
→ IncidentManager threads ":red_circle: MQTT RPC (n)" +``` + +## 5. Implementation phases + +Each phase is an independently reviewable commit landing in the same PR: + +1. **Land this design paper.** No code changes. +2. **Add the per-target hook.** Introduce `RpcCheckConfig`, + `TransportMonitoringTarget.rpc`, `RpcInfo`, and the no-op `doRpcCheck` + on `BaseHealthChecker`. Wire `BaseHealthChecker.check` to invoke + `doRpcCheck` after WS validation. Tests at base level. +3. **MQTT implementation.** Extend `MqttTransportHealthChecker.initClient` + to subscribe `v1/devices/me/rpc/request/+` on the existing Paho client + when `rpc.enabled`. Implement the value-echo `doRpcCheck` once on + `TransportHealthChecker` so HTTP and CoAP can inherit it. Unit tests + cover happy path, value mismatch, REST exception, response timeout, + RPC-disabled (no subscription), and idempotent subscribe. +4. **HTTP implementation.** Extend `HttpTransportHealthChecker.initClient` + to spawn an idempotent daemon long-poll thread on + `GET /api/v1/{token}/rpc?timeout=1000`; on each 200 response, POST the + echoed params back to `/api/v1/{token}/rpc/{id}`. The thread is + lifecycle-bound to `initClient` / `destroyClient`. Tests cover the + inherited `doRpcCheck` shape plus poll-thread start/stop/idempotency + and `pollOnce` behaviour for 200/408/204. +5. **CoAP implementation.** Extend `CoapTransportHealthChecker.initClient` + to open a CoAP OBSERVE on `coap://host/api/v1/{token}/rpc` via a + second `CoapClient`; on each notification echo the params back to + `coap://host/api/v1/{token}/rpc/{id}`. `destroyClient` cancels the + observe and shuts down the rpc client. Tests cover the inherited + `doRpcCheck` shape, observe lifecycle, and notification handler + parsing. +6. **LwM2M implementation.** Override `doRpcCheck` only — no + `initClient` changes — to issue + `POST /api/rpc/twoway/{deviceId} {method:"Read", params:{key:"/3/0/0"}}` + and assert non-blank, per §4.6. +7. **Documentation polish.** Add `monitoring/src/main/resources/README.md` + with the per-transport behaviour table, the new YAML keys, the + secure-MQTT example, and a small troubleshooting note on + `rpc.request_timeout_ms` ordering vs `monitoring.rest.request_timeout_ms`. + +## 6. Open decisions + +Listed for reviewer push-back before implementation began. All four +transports ship in this PR; the rest of the table below records the calls +that survived review. + +1. **All four transports in scope.** Hook + MQTT + HTTP + CoAP + LwM2M ship + together so the IncidentManager header gains a coherent set of + `:red_circle: RPC` rows in one go. The hook itself is + generic enough that adding more transports later (SNMP, custom) is + just another override. +2. **Two-way RPC only.** Round-trip value validation is the strongest + signal; one-way RPC would only confirm the request reached the device, + which the telemetry uplink check already covers from the other + direction. +3. **RPC failure key shape.** Use `RpcInfo(TransportInfo)` implementing + `ShortNameProvider` returning `" RPC"`. Alternative: + a string constant like `"MQTT RPC"`. Decision: the wrapper, so + non-default queues (`"MQTT Foo RPC"`) are handled with no extra code. +4. **Latency suffix.** Reuse the transport target's `getKey()` derivation + (`mqtt`, `mqttFoo`) and append `RpcRoundTrip` — no separate per-target + label. This lines up with `Request` / `WsUpdate` already used + for the telemetry side and means the existing latencies dashboard + widget needs no schema changes. +5. **Echo handler placement (MQTT/HTTP/CoAP).** Each transport's echo + path lives inline in its `initClient` rather than in a separate helper + class. Echo logic is small (a handful of lines per transport) and is + only meaningful in the context of one connected client. +6. **LwM2M Read instead of echo.** See §4.6. The asymmetry is documented + in `monitoring/src/main/resources/README.md` so operators understand + why the LwM2M assertion is non-blank rather than uuid-equal. + +## 7. Code references (`rc`) + +- `BaseHealthChecker.check(WsClient)` — `monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java:70` +- `BaseMonitoringService.runChecks` — `monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java:114` +- `MonitoringEntityService.checkEntities` — `monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringEntityService.java:95` +- `MonitoringReporter.serviceFailure` — `monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java:107` +- `IncidentManager.sendAlert` — `monitoring/src/main/java/org/thingsboard/monitoring/notification/incident/IncidentManager.java:70` +- `AffectedService` record — `monitoring/src/main/java/org/thingsboard/monitoring/data/notification/AffectedService.java:18` +- `ShortNameProvider` interface — `monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ShortNameProvider.java:18` +- `TransportInfo.getShortName` — `monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java:27` +- `RestClient.handleTwoWayDeviceRPCRequest` — `rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java:2495` diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java new file mode 100644 index 0000000000..590478cedc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.transport; + +import lombok.Data; + +@Data +public class RpcCheckConfig { + + private boolean enabled; + private Integer requestTimeoutMs; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java new file mode 100644 index 0000000000..6ba0a699ed --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.transport; + +import lombok.Data; +import org.thingsboard.monitoring.data.notification.ShortNameProvider; + +@Data +public class RpcInfo implements ShortNameProvider { + + public static final String RPC_SUFFIX = " RPC"; + + private final TransportInfo transportInfo; + + @Override + public String getShortName() { + return transportInfo.getShortName() + RPC_SUFFIX; + } + + @Override + public String toString() { + return transportInfo.toString() + RPC_SUFFIX; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java index 7fa6ec3f46..5d52010646 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java @@ -30,6 +30,7 @@ public class TransportMonitoringTarget implements MonitoringTarget { private String queue; private boolean checkDomainIps; private String namePrefix; + private RpcCheckConfig rpc; @Override public UUID getDeviceId() { @@ -44,4 +45,8 @@ public class TransportMonitoringTarget implements MonitoringTarget { return Strings.nullToEmpty(namePrefix); } + public boolean isRpcEnabled() { + return rpc != null && rpc.isEnabled(); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index 4846964272..faa17065d3 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -30,4 +30,8 @@ public class Latencies { return String.format("%sWsUpdate", key); } + public static String rpcRoundTrip(String key) { + return String.format("%sRpcRoundTrip", key); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index c0e85da1ac..4d5af8b8b8 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -54,6 +54,10 @@ public abstract class BaseHealthChecker> associates = new HashMap<>(); @@ -88,6 +92,8 @@ public abstract class BaseHealthChecker extends BaseHealthChecker { @@ -31,13 +40,76 @@ public abstract class TransportHealthCheckerDeclared {@code protected} in this package; per-transport impls in + * {@code org.thingsboard.monitoring.service.transport.impl} expose narrow {@code @VisibleForTesting} + * delegating overrides so that same-package unit tests can drive {@code doRpcCheck()} directly + * without resorting to {@code ReflectionTestUtils.invokeMethod(...)}. + */ + @Override + protected void doRpcCheck() throws Exception { + if (!target.isRpcEnabled()) { + return; + } + RpcInfo rpcInfo = getRpcInfo(); + String testValue = UUID.randomUUID().toString(); + ObjectNode body = JacksonUtil.newObjectNode(); + body.put("method", "monitoringCheck"); + body.set("params", JacksonUtil.newObjectNode().put("value", testValue)); + body.put("timeout", getRpcTimeoutMs()); + + long start = System.nanoTime(); + JsonNode response; + try { + response = tbClient.handleTwoWayDeviceRPCRequest(new DeviceId(target.getDeviceId()), body); + } catch (Throwable e) { + throw new ServiceFailureException(rpcInfo, e); + } + String actual = response == null ? null : response.path("value").asText(null); + if (!testValue.equals(actual)) { + throw new ServiceFailureException(rpcInfo, + "RPC echo mismatch: expected " + testValue + " but got " + actual); + } + reportRpcLatency(System.nanoTime() - start); + } + @Override protected void initialize() { entityService.checkEntities(config, target); + if (target.isRpcEnabled()) { + int rpcTimeoutMs = getRpcTimeoutMs(); + if (rpcTimeoutMs >= restRequestTimeoutMs) { + String transportName = getTransportType().name().toLowerCase(); + throw new IllegalStateException("RPC request timeout (" + rpcTimeoutMs + " ms) for " + + getTransportType() + " target " + target.getBaseUrl() + + " must be < monitoring.rest.request_timeout_ms (" + restRequestTimeoutMs + + " ms); otherwise tbClient times out before TB times out the RPC, producing false negatives." + + " Either raise REST_REQUEST_TIMEOUT_MS above " + rpcTimeoutMs + + " ms or lower monitoring.transports." + transportName + ".targets[*].rpc.request_timeout_ms" + + " (env: " + getTransportType() + "_RPC_REQUEST_TIMEOUT_MS)."); + } + } } @Override diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java index 953b5c2415..3cca7fa760 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java @@ -15,8 +15,12 @@ */ package org.thingsboard.monitoring.service.transport.impl; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapHandler; +import org.eclipse.californium.core.CoapObserveRelation; import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.MediaTypeRegistry; @@ -24,6 +28,7 @@ import org.eclipse.californium.elements.config.SystemConfig; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportType; @@ -41,6 +46,10 @@ public class CoapTransportHealthChecker extends TransportHealthChecker { + static final long POLL_TIMEOUT_MS = 1000L; + private static final long POLL_READ_TIMEOUT_SLACK_MS = 1000L; + // Inter-poll delay. The effective rate is dominated by the long-poll's + // server-side wait window (POLL_TIMEOUT_MS). This value only acts as a + // rate-limiter when the server returns immediately (204, bodyless 200, + // 408) — without it, the loop would tight-spin in those cases. + private static final long POLL_INTERVAL_MS = POLL_TIMEOUT_MS / 4; + private static final long POLL_BACKOFF_INITIAL_MS = 500L; + private static final long POLL_BACKOFF_MAX_MS = 5_000L; + // Must exceed POLL_TIMEOUT_MS + POLL_READ_TIMEOUT_SLACK_MS so that destroyClient() + // can wait out the worst-case in-flight read. The default JDK HttpURLConnection + // does not honor Thread.interrupt() on a blocking read, so cancel(true) only + // unblocks the poller once the read timeout fires. If POLL_READ_TIMEOUT_SLACK_MS + // is bumped, this value must move with it. + private static final long SHUTDOWN_TIMEOUT_MS = 5_000L; + private static final AtomicInteger POOL_COUNTER = new AtomicInteger(); + private RestTemplate restTemplate; + private ScheduledExecutorService rpcPoller; + private Future rpcPollFuture; + private long backoffMs; protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) { super(config, target); @@ -44,10 +77,83 @@ public class HttpTransportHealthChecker extends TransportHealthChecker { + Thread t = new Thread(r, name); + t.setDaemon(true); + return t; + }; + } + + @VisibleForTesting + void pollTask() { + try { + pollOnce(); + backoffMs = 0L; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.warn("HTTP RPC poll error: {}", e.getMessage()); + try { + Thread.sleep(nextBackoffMs()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + + private long nextBackoffMs() { + long base = backoffMs == 0L ? POLL_BACKOFF_INITIAL_MS : Math.min(backoffMs * 2L, POLL_BACKOFF_MAX_MS); + backoffMs = base; + long jitter = ThreadLocalRandom.current().nextLong(0L, Math.max(1L, base / 2L)); + return base + jitter; + } + + @VisibleForTesting + void pollOnce() throws InterruptedException { + String accessToken = target.getDevice().getCredentials().getCredentialsId(); + // POLL_TIMEOUT_MS is sent to the server as the long-poll wait window. + // The RestTemplate read timeout above is sized to POLL_TIMEOUT_MS + slack so + // a slow server still terminates within bounded time and destroyClient() can + // unblock the poller via Future#cancel(true). + String pollUrl = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc?timeout=" + POLL_TIMEOUT_MS; + ResponseEntity poll; + try { + poll = restTemplate.getForEntity(pollUrl, JsonNode.class); + } catch (HttpStatusCodeException e) { + if (e.getStatusCode() == HttpStatus.REQUEST_TIMEOUT || e.getStatusCode() == HttpStatus.NO_CONTENT) { + return; + } + throw e; + } + if (poll.getStatusCode() != HttpStatus.OK || poll.getBody() == null) { + return; + } + JsonNode rpc = poll.getBody(); + JsonNode idNode = rpc.get("id"); + JsonNode params = rpc.get("params"); + if (idNode == null || !idNode.isNumber()) { + log.debug("HTTP RPC poll response missing or non-numeric id: {}", rpc); + return; + } + String responseUrl = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc/" + idNode.asLong(); + JsonNode body = params == null ? JacksonUtil.newObjectNode() : params; + restTemplate.postForLocation(responseUrl, body); } @Override @@ -56,8 +162,31 @@ public class HttpTransportHealthChecker extends TransportHealthChecker { + private static final String READ_RESOURCE_PATH = "/3/0/0"; + private Lwm2mClient lwm2mClient; protected Lwm2mTransportHealthChecker(Lwm2mTransportMonitoringConfig config, TransportMonitoringTarget target) { @@ -56,6 +64,32 @@ public class Lwm2mTransportHealthChecker extends TransportHealthChecker { - private MqttClient mqttClient; + @VisibleForTesting + MqttClient mqttClient; + private boolean rpcSubscribed; private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry"; + private static final String DEVICE_RPC_REQUEST_SUB_TOPIC = "v1/devices/me/rpc/request/+"; + private static final String DEVICE_RPC_RESPONSE_TOPIC_PREFIX = "v1/devices/me/rpc/response/"; protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, TransportMonitoringTarget target) { super(config, target); @@ -59,6 +69,33 @@ public class MqttTransportHealthChecker extends TransportHealthChecker}}` | echoed `value == uuid` | +| HTTP | Daemon long-poll thread per target: `GET /api/v1/{token}/rpc?timeout=1000`; on 200, `POST /api/v1/{token}/rpc/{id}` with the params | same as MQTT | echoed `value == uuid` | +| CoAP | Separate `CoapClient` opens an OBSERVE on `coap://host/api/v1/{token}/rpc`; on each notification, posts echoed params to `coap://host/api/v1/{token}/rpc/{id}` | same as MQTT | echoed `value == uuid` | +| LwM2M | None — the existing `Lwm2mClient` already serves `/3/0/0` via the standard LwM2M Read | `POST /api/rpc/twoway/{deviceId}` `{method:"Read", params:{key:"/3/0/0"}}` | response is non-blank | + +## Failure attribution + +RPC failures use a dedicated `RpcInfo` service key whose `getShortName()` +returns `" RPC"` (`"MQTT RPC"`, `"MQTT Foo RPC"` for +non-default queues, etc.). The IncidentManager header therefore distinguishes +`":red_circle: MQTT (n)"` from `":red_circle: MQTT RPC (m)"`. A telemetry +uplink failure short-circuits the RPC sub-check for that cycle, so a broken +transport will not double-count against both rows. + +## Latencies + +When the RPC check succeeds, the round-trip is reported under +`RpcRoundTrip` (e.g. `mqttRpcRoundTrip`, `mqttFooRpcRoundTrip` for +non-default queues), alongside the existing `Request` and `WsUpdate` +keys for the same target. No latencies dashboard schema change is needed. + +## Troubleshooting + +- **`rpc.request_timeout_ms` ordering**: the RPC HTTP call goes through the + same `monitoring.rest.request_timeout_ms` REST client. Keep + `rpc.request_timeout_ms` strictly smaller so the device-side timeout fires + first; otherwise the REST call will fail with `ResourceAccessException` + before the RPC has a chance to time out cleanly. +- **HTTP polling thread leaks**: the long-poll thread is daemon and lifecycle- + bound to `initClient` / `destroyClient`. If you see leaked threads, check + whether a custom shutdown hook is bypassing `@PreDestroy`. +- **CoAP OBSERVE behind NAT**: the OBSERVE client uses the same Californium + configuration as the telemetry client; if the telemetry check works but the + RPC observe never delivers, check NAT/firewall on the CoAP UDP path. +- **LwM2M Read on `/3/0/0` returns blank**: the monitoring `Lwm2mClient` + serves resource `0` from whatever was last sent via telemetry, so a blank + result usually means telemetry never reached the device this cycle. Confirm + the telemetry row is green first. diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index 768fd2e4c7..af0ed7c5fc 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -24,8 +24,10 @@ monitoring: username: '${REST_AUTH_USERNAME:tenant@thingsboard.org}' # Authentication password password: '${REST_AUTH_PASSWORD:tenant}' - # REST request timeout in milliseconds - request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:5000}' + # REST request timeout in milliseconds. Must be greater than every per-target + # rpc.request_timeout_ms — see TransportHealthChecker.initialize() fail-fast. + # Default leaves a 2 s margin above the slowest built-in default (LwM2M 6000 ms). + request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:8000}' ws: # WebSocket url, wss://DOMAIN by default base_url: '${WS_BASE_URL:wss://${monitoring.domain}}' @@ -59,6 +61,21 @@ monitoring: check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${MQTT_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check on this target. + # Issues POST /api/rpc/twoway/{deviceId} with method=monitoringCheck and a uuid, + # validates the device echoes the uuid back over the same MQTT session, and + # reports the round-trip under the RpcRoundTrip latency. + # Failures appear as ":red_circle: MQTT RPC" in the IncidentManager header. + rpc: + enabled: '${MQTT_RPC_MONITORING_ENABLED:false}' + # RPC request timeout in ms; must be < monitoring.rest.request_timeout_ms. + # If unset, falls back to monitoring.transports.mqtt.request_timeout_ms. + request_timeout_ms: '${MQTT_RPC_REQUEST_TIMEOUT_MS:4000}' + # Secure MQTT example with RPC enabled by default: + # - base_url: '${MQTT_TRANSPORT_SSL_BASE_URL:ssl://${monitoring.domain}:8883}' + # queue: 'Main' + # rpc: + # enabled: true # To add more targets, use following environment variables: # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc. @@ -76,6 +93,11 @@ monitoring: check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${COAP_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check. Opens a CoAP OBSERVE on /api/v1/{token}/rpc on + # a separate CoapClient and echoes incoming RPC params back to /api/v1/{token}/rpc/{id}. + rpc: + enabled: '${COAP_RPC_MONITORING_ENABLED:false}' + request_timeout_ms: '${COAP_RPC_REQUEST_TIMEOUT_MS:4000}' # To add more targets, use following environment variables: # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc. @@ -93,6 +115,12 @@ monitoring: check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${HTTP_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check. Spawns a daemon long-poll thread per target on + # GET /api/v1/{token}/rpc?timeout=1000 and POSTs the echoed params back to + # /api/v1/{token}/rpc/{id} when an RPC arrives. + rpc: + enabled: '${HTTP_RPC_MONITORING_ENABLED:false}' + request_timeout_ms: '${HTTP_RPC_REQUEST_TIMEOUT_MS:4000}' # To add more targets, use following environment variables: # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc. @@ -110,6 +138,15 @@ monitoring: check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' # Prefix for the target device name name_prefix: '${LWM2M_TRANSPORT_TARGET_NAME_PREFIX:}' + # Optional companion RPC check. Issues a server-initiated LwM2M Read on /3/0/0 + # via /api/rpc/twoway/{deviceId} with method=Read; asserts the response is + # non-blank. The "value matches uuid" pattern used by other transports does not + # apply here because the Read is read-only — see design doc §4.6. + rpc: + enabled: '${LWM2M_RPC_MONITORING_ENABLED:false}' + # LwM2M Read goes through rule engine + Leshan registration, so default to + # 6000 ms; tune per environment. + request_timeout_ms: '${LWM2M_RPC_REQUEST_TIMEOUT_MS:6000}' # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcCheckConfigTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcCheckConfigTest.java new file mode 100644 index 0000000000..2d56699bdb --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcCheckConfigTest.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.transport; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class RpcCheckConfigTest { + + @Test + void defaultsAreDisabledAndUnset() { + RpcCheckConfig config = new RpcCheckConfig(); + + assertThat(config.isEnabled()).isFalse(); + assertThat(config.getRequestTimeoutMs()).isNull(); + } + + @Test + void settersRoundTrip() { + RpcCheckConfig config = new RpcCheckConfig(); + config.setEnabled(true); + config.setRequestTimeoutMs(7500); + + assertThat(config.isEnabled()).isTrue(); + assertThat(config.getRequestTimeoutMs()).isEqualTo(7500); + } + + @Test + void targetIsRpcEnabledFalseWhenSubBlockMissing() { + TransportMonitoringTarget target = new TransportMonitoringTarget(); + target.setBaseUrl("tcp://host:1883"); + + assertThat(target.isRpcEnabled()).isFalse(); + } + + @Test + void targetIsRpcEnabledFalseWhenDisabled() { + TransportMonitoringTarget target = new TransportMonitoringTarget(); + target.setBaseUrl("tcp://host:1883"); + target.setRpc(new RpcCheckConfig()); + + assertThat(target.isRpcEnabled()).isFalse(); + } + + @Test + void targetIsRpcEnabledTrueWhenEnabled() { + TransportMonitoringTarget target = new TransportMonitoringTarget(); + target.setBaseUrl("tcp://host:1883"); + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + + assertThat(target.isRpcEnabled()).isTrue(); + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcInfoTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcInfoTest.java new file mode 100644 index 0000000000..48ea2c0a63 --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcInfoTest.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.transport; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class RpcInfoTest { + + @Test + void shortNameAppendsRpcForDefaultQueue() { + RpcInfo rpc = new RpcInfo(transportInfo("Main")); + + assertThat(rpc.getShortName()).isEqualTo("MQTT RPC"); + } + + @Test + void shortNameAppendsRpcForCustomQueue() { + RpcInfo rpc = new RpcInfo(transportInfo("Foo")); + + assertThat(rpc.getShortName()).isEqualTo("MQTT Foo RPC"); + } + + @Test + void toStringIncludesUrlAndRpcMarker() { + RpcInfo rpc = new RpcInfo(transportInfo("Main")); + + assertThat(rpc.toString()) + .contains("*MQTT*") + .contains("(tcp://host:1883)") + .endsWith(RpcInfo.RPC_SUFFIX); + } + + @Test + void equalsAndHashCodeAreStableAcrossInstances() { + RpcInfo a = new RpcInfo(transportInfo("Main")); + RpcInfo b = new RpcInfo(transportInfo("Main")); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void differentQueuesProduceDistinctKeys() { + RpcInfo a = new RpcInfo(transportInfo("Main")); + RpcInfo b = new RpcInfo(transportInfo("Foo")); + + assertThat(a).isNotEqualTo(b); + } + + // IncidentManager dedups by toString() — RpcInfo for a target must produce a + // different incident row than TransportInfo for the same target so the RPC + // row and the telemetry row don't collapse into one. + @Test + void rpcInfoIncidentKeyDiffersFromTelemetryInfoForSameTarget() { + TransportInfo telemetry = transportInfo("Main"); + RpcInfo rpc = new RpcInfo(telemetry); + + assertThat(rpc.toString()).isNotEqualTo(telemetry.toString()); + assertThat(rpc).isNotEqualTo(telemetry); + } + + private static TransportInfo transportInfo(String queue) { + TransportMonitoringTarget target = new TransportMonitoringTarget(); + target.setBaseUrl("tcp://host:1883"); + target.setQueue(queue); + return new TransportInfo(TransportType.MQTT, target); + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/BaseHealthCheckerRpcHookTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/BaseHealthCheckerRpcHookTest.java new file mode 100644 index 0000000000..bf46045205 --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/BaseHealthCheckerRpcHookTest.java @@ -0,0 +1,200 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.monitoring.client.WsClient; +import org.thingsboard.monitoring.config.MonitoringConfig; +import org.thingsboard.monitoring.config.MonitoringTarget; +import org.thingsboard.monitoring.data.ServiceFailureException; +import org.thingsboard.monitoring.util.TbStopWatch; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class BaseHealthCheckerRpcHookTest { + + private MonitoringReporter reporter; + private WsClient wsClient; + private TestTarget target; + private TestChecker checker; + + @BeforeEach + void setUp() { + reporter = mock(MonitoringReporter.class); + wsClient = mock(WsClient.class); + target = new TestTarget(UUID.randomUUID()); + checker = new TestChecker(new TestConfig(), target); + ReflectionTestUtils.setField(checker, "info", "test-info"); + ReflectionTestUtils.setField(checker, "reporter", reporter); + ReflectionTestUtils.setField(checker, "stopWatch", new TbStopWatch()); + ReflectionTestUtils.setField(checker, "resultCheckTimeoutMs", 1000); + } + + @Test + void rpcCheckRunsAfterSuccessfulTelemetryAndWsUpdate() { + stubWsUpdateEchoesPayload(); + + checker.check(wsClient); + + assertThat(checker.rpcCheckCalled).isTrue(); + verify(reporter).serviceIsOk("test-info"); + } + + @Test + void rpcCheckIsNotRunWhenTelemetryPublishFails() { + checker.failTelemetry = true; + + checker.check(wsClient); + + assertThat(checker.rpcCheckCalled).isFalse(); + verify(reporter).serviceFailure(eq("test-info"), any(Throwable.class)); + verify(reporter, never()).serviceIsOk(any()); + } + + @Test + void rpcCheckIsNotRunWhenWsUpdateMissing() { + when(wsClient.getLatest(any())).thenReturn(Collections.emptyMap()); + + checker.check(wsClient); + + assertThat(checker.rpcCheckCalled).isFalse(); + verify(reporter).serviceFailure(eq("test-info"), any(Throwable.class)); + verify(reporter, never()).serviceIsOk(any()); + } + + @Test + void rpcFailureRoutesThroughDedicatedServiceKey() { + stubWsUpdateEchoesPayload(); + Object rpcKey = new Object() { + @Override public String toString() { return "test-info RPC"; } + }; + checker.rpcServiceKey = rpcKey; + checker.rpcFailure = new IOException("rpc failed"); + + checker.check(wsClient); + + assertThat(checker.rpcCheckCalled).isTrue(); + verify(reporter).serviceFailure(eq(rpcKey), any(Throwable.class)); + verify(reporter, never()).serviceIsOk(any()); + } + + @Test + void defaultDoRpcCheckIsNoOpAndDoesNotPreventOk() { + TestChecker noOpRpc = new TestChecker(new TestConfig(), target); + ReflectionTestUtils.setField(noOpRpc, "info", "noop-info"); + ReflectionTestUtils.setField(noOpRpc, "reporter", reporter); + ReflectionTestUtils.setField(noOpRpc, "stopWatch", new TbStopWatch()); + ReflectionTestUtils.setField(noOpRpc, "resultCheckTimeoutMs", 1000); + noOpRpc.skipRpcOverride = true; + when(wsClient.getLatest(any())).thenAnswer(latestEcho(noOpRpc)); + + noOpRpc.check(wsClient); + + verify(reporter).serviceIsOk("noop-info"); + } + + private void stubWsUpdateEchoesPayload() { + when(wsClient.getLatest(any())).thenAnswer(latestEcho(checker)); + } + + private static org.mockito.stubbing.Answer> latestEcho(TestChecker checker) { + return (InvocationOnMock invocation) -> { + String value = checker.capturedTestValue; + return value == null ? Collections.emptyMap() : Map.of("testData", value); + }; + } + + private static final class TestConfig implements MonitoringConfig { + @Override + public List getTargets() { + return Collections.emptyList(); + } + } + + private static final class TestTarget implements MonitoringTarget { + private final UUID deviceId; + + TestTarget(UUID deviceId) { + this.deviceId = deviceId; + } + + @Override public UUID getDeviceId() { return deviceId; } + @Override public String getBaseUrl() { return "test://target"; } + @Override public boolean isCheckDomainIps() { return false; } + } + + private static class TestChecker extends BaseHealthChecker { + + String capturedTestValue; + boolean failTelemetry; + boolean rpcCheckCalled; + boolean skipRpcOverride; + Object rpcServiceKey; + Throwable rpcFailure; + + TestChecker(TestConfig config, TestTarget target) { + super(config, target); + } + + @Override protected void initialize() { } + @Override protected void initClient() { } + @Override protected void destroyClient() { } + @Override protected Object getInfo() { return "test-info"; } + @Override protected String getKey() { return "test"; } + @Override protected boolean isCfMonitoringEnabled() { return false; } + + @Override + protected String createTestPayload(String testValue) { + capturedTestValue = testValue; + return testValue; + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + if (failTelemetry) { + throw new IOException("telemetry publish failed"); + } + } + + @Override + protected void doRpcCheck() throws Exception { + if (skipRpcOverride) { + super.doRpcCheck(); + return; + } + rpcCheckCalled = true; + if (rpcFailure != null) { + throw new ServiceFailureException(rpcServiceKey, rpcFailure); + } + } + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/TransportHealthCheckerInitializeTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/TransportHealthCheckerInitializeTest.java new file mode 100644 index 0000000000..3da43ba3f7 --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/TransportHealthCheckerInitializeTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.transport; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.monitoring.config.transport.DeviceConfig; +import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; +import org.thingsboard.monitoring.config.transport.TransportType; +import org.thingsboard.monitoring.service.MonitoringEntityService; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +class TransportHealthCheckerInitializeTest { + + private HttpTransportMonitoringConfig config; + private TransportMonitoringTarget target; + private MonitoringEntityService entityService; + + @BeforeEach + void setUp() { + config = new HttpTransportMonitoringConfig(); + ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 3000, int.class); + + DeviceConfig device = new DeviceConfig(); + device.setId(UUID.randomUUID().toString()); + DeviceCredentials credentials = new DeviceCredentials(); + credentials.setCredentialsId("tok"); + device.setCredentials(credentials); + + target = new TransportMonitoringTarget(); + target.setBaseUrl("http://localhost:8080"); + target.setDevice(device); + + entityService = mock(MonitoringEntityService.class); + doNothing().when(entityService).checkEntities(config, target); + } + + @Test + void initializePassesWhenRpcDisabled() { + StubTransportHealthChecker checker = newChecker(5000); + + assertThatCode(checker::initializePublic).doesNotThrowAnyException(); + } + + @Test + void initializePassesWhenRpcTimeoutBelowRest() { + enableRpc(2000); + StubTransportHealthChecker checker = newChecker(5000); + + assertThatCode(checker::initializePublic).doesNotThrowAnyException(); + } + + @Test + void initializeFailsWhenRpcTimeoutEqualsRest() { + enableRpc(5000); + StubTransportHealthChecker checker = newChecker(5000); + + assertThatThrownBy(checker::initializePublic) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("RPC request timeout") + .hasMessageContaining("must be <"); + } + + @Test + void initializeFailsWhenRpcTimeoutExceedsRest() { + enableRpc(7000); + StubTransportHealthChecker checker = newChecker(5000); + + assertThatThrownBy(checker::initializePublic) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("RPC request timeout (7000 ms)"); + } + + @Test + void initializeFallsBackToTransportTimeoutAndStillValidates() { + enableRpc(null); + StubTransportHealthChecker checker = newChecker(2000); + + assertThatThrownBy(checker::initializePublic) + .isInstanceOf(IllegalStateException.class); + } + + private StubTransportHealthChecker newChecker(int restTimeoutMs) { + StubTransportHealthChecker checker = new StubTransportHealthChecker(config, target); + ReflectionTestUtils.setField(checker, "entityService", entityService); + ReflectionTestUtils.setField(checker, "restRequestTimeoutMs", restTimeoutMs); + return checker; + } + + private void enableRpc(Integer rpcTimeoutMs) { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + rpc.setRequestTimeoutMs(rpcTimeoutMs); + target.setRpc(rpc); + } + + private static final class StubTransportHealthChecker extends TransportHealthChecker { + StubTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) { + super(config, target); + } + + void initializePublic() { + initialize(); + } + + @Override protected TransportType getTransportType() { return TransportType.HTTP; } + @Override protected void initClient() {} + @Override protected void sendTestPayload(String payload) {} + @Override protected void destroyClient() {} + } +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthCheckerRpcTest.java new file mode 100644 index 0000000000..1c45d16d75 --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthCheckerRpcTest.java @@ -0,0 +1,243 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.transport.impl; + +import com.fasterxml.jackson.databind.JsonNode; +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapHandler; +import org.eclipse.californium.core.CoapObserveRelation; +import org.eclipse.californium.core.CoapResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.client.ResourceAccessException; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.DeviceConfig; +import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; +import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; +import org.thingsboard.monitoring.data.ServiceFailureException; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.net.SocketTimeoutException; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class CoapTransportHealthCheckerRpcTest { + + private CoapTransportMonitoringConfig config; + private TransportMonitoringTarget target; + private TbClient tbClient; + private CoapClient mockRpcCoapClient; + private CoapObserveRelation mockObserveRelation; + private TestableCoapChecker checker; + + @BeforeEach + void setUp() { + config = new CoapTransportMonitoringConfig(); + ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 4000, int.class); + + DeviceConfig device = new DeviceConfig(); + device.setId(UUID.randomUUID().toString()); + DeviceCredentials credentials = new DeviceCredentials(); + credentials.setCredentialsId("token-coap"); + device.setCredentials(credentials); + + target = new TransportMonitoringTarget(); + target.setBaseUrl("coap://localhost"); + target.setDevice(device); + + tbClient = mock(TbClient.class); + mockRpcCoapClient = mock(CoapClient.class); + mockObserveRelation = mock(CoapObserveRelation.class); + lenient().when(mockRpcCoapClient.observe(any(CoapHandler.class))).thenReturn(mockObserveRelation); + + checker = new TestableCoapChecker(config, target); + // pre-set both clients so initClient() does not call new CoapClient(...) + ReflectionTestUtils.setField(checker, "coapClient", mock(CoapClient.class)); + checker.rpcCoapClient = mockRpcCoapClient; + ReflectionTestUtils.setField(checker, "tbClient", tbClient); + ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class)); + } + + @Test + void doRpcCheckHappyPath() throws Exception { + enableRpc(); + AtomicReference capturedBody = new AtomicReference<>(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenAnswer(inv -> { + JsonNode b = inv.getArgument(1); + capturedBody.set(b); + return b.get("params"); + }); + + checker.doRpcCheck(); + + assertThat(capturedBody.get().get("method").asText()).isEqualTo("monitoringCheck"); + } + + @Test + void doRpcCheckValueMismatchUsesRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(JacksonUtil.newObjectNode().put("value", "wrong")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void initClientStartsObserveWhenRpcEnabled() throws Exception { + enableRpc(); + + checker.initClient(); + + verify(mockRpcCoapClient).observe(any(CoapHandler.class)); + assertThat(checker.rpcObserveRelation).isSameAs(mockObserveRelation); + } + + @Test + void initClientSkipsObserveWhenRpcDisabled() throws Exception { + checker.initClient(); + + verify(mockRpcCoapClient, never()).observe(any(CoapHandler.class)); + } + + @Test + void initClientObserveIsIdempotent() throws Exception { + enableRpc(); + + checker.initClient(); + checker.initClient(); + + verify(mockRpcCoapClient, org.mockito.Mockito.times(1)).observe(any(CoapHandler.class)); + } + + @Test + void destroyClientCancelsObserveAndShutsDownRpcClient() throws Exception { + enableRpc(); + checker.initClient(); + + checker.destroyClient(); + + verify(mockObserveRelation).proactiveCancel(); + verify(mockRpcCoapClient).shutdown(); + assertThat(checker.rpcObserveRelation).isNull(); + assertThat(checker.rpcCoapClient).isNull(); + } + + @Test + void handleRpcNotificationEchoesParamsBackToResponseEndpoint() { + enableRpc(); + CoapResponse response = mock(CoapResponse.class); + when(response.getResponseText()).thenReturn( + "{\"id\":42,\"method\":\"monitoringCheck\",\"params\":{\"value\":\"uuid-42\"}}"); + + checker.handleRpcNotification(response); + + assertThat(checker.capturedResponseUri).isEqualTo("coap://localhost/api/v1/token-coap/rpc/42"); + JsonNode echoed = JacksonUtil.toJsonNode(checker.capturedResponsePayload); + assertThat(echoed.get("value").asText()).isEqualTo("uuid-42"); + } + + @Test + void handleRpcNotificationIgnoresEmptyResponse() { + enableRpc(); + CoapResponse response = mock(CoapResponse.class); + when(response.getResponseText()).thenReturn(""); + + checker.handleRpcNotification(response); + + assertThat(checker.capturedResponseUri).isNull(); + } + + @Test + void handleRpcNotificationIgnoresMalformedRpcWithoutId() { + enableRpc(); + CoapResponse response = mock(CoapResponse.class); + when(response.getResponseText()).thenReturn("{\"method\":\"x\"}"); + + checker.handleRpcNotification(response); + + assertThat(checker.capturedResponseUri).isNull(); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + + private static final class TestableCoapChecker extends CoapTransportHealthChecker { + String capturedResponseUri; + String capturedResponsePayload; + + TestableCoapChecker(CoapTransportMonitoringConfig config, TransportMonitoringTarget target) { + super(config, target); + } + + @Override + void postRpcResponse(String uri, String payload) { + capturedResponseUri = uri; + capturedResponsePayload = payload; + } + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java new file mode 100644 index 0000000000..ca3eaa2a1e --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java @@ -0,0 +1,316 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.transport.impl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.ResourceAccessException; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.transport.DeviceConfig; +import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; +import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; +import org.thingsboard.monitoring.data.ServiceFailureException; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.net.SocketTimeoutException; +import java.util.UUID; +import java.util.concurrent.Future; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class HttpTransportHealthCheckerRpcTest { + + private HttpTransportMonitoringConfig config; + private TransportMonitoringTarget target; + private HttpTransportHealthChecker checker; + private TbClient tbClient; + private RestTemplate restTemplate; + + @BeforeEach + void setUp() { + config = new HttpTransportMonitoringConfig(); + ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 4000, int.class); + + DeviceConfig device = new DeviceConfig(); + device.setId(UUID.randomUUID().toString()); + DeviceCredentials credentials = new DeviceCredentials(); + credentials.setCredentialsId("token-abc"); + device.setCredentials(credentials); + + target = new TransportMonitoringTarget(); + target.setBaseUrl("http://localhost:8080"); + target.setDevice(device); + + tbClient = mock(TbClient.class); + restTemplate = mock(RestTemplate.class); + + checker = new HttpTransportHealthChecker(config, target); + ReflectionTestUtils.setField(checker, "restTemplate", restTemplate); + ReflectionTestUtils.setField(checker, "tbClient", tbClient); + ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class)); + } + + @Test + void doRpcCheckHappyPath() throws Exception { + enableRpc(); + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture())) + .thenAnswer(inv -> bodyCaptor.getValue().get("params")); + + checker.doRpcCheck(); + + assertThat(bodyCaptor.getValue().get("method").asText()).isEqualTo("monitoringCheck"); + } + + @Test + void doRpcCheckValueMismatchUsesRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(JacksonUtil.newObjectNode().put("value", "wrong")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void initClientStartsPollingWhenRpcEnabled() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + try { + verify(restTemplate, timeout(2000).atLeastOnce()) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + } finally { + checker.destroyClient(); + } + } + + @Test + void initClientDoesNotStartPollingWhenRpcDisabled() throws Exception { + checker.initClient(); + + // Mockito#after waits for the window then asserts no invocation occurred — + // deterministic on slow CI runners, unlike Thread.sleep + verify(never()). + verify(restTemplate, after(200).never()).getForEntity(any(String.class), eq(JsonNode.class)); + Future future = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + assertThat(future).isNull(); + } + + @Test + void initClientIsIdempotent() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + Future first = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + checker.initClient(); + Future second = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + + assertThat(second).isSameAs(first); + checker.destroyClient(); + } + + @Test + void destroyClientStopsPolling() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + verify(restTemplate, timeout(2000).atLeastOnce()) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + + checker.destroyClient(); + clearInvocations(restTemplate); + + // Wait the window and verify no further polls fire — replaces brittle + // Thread.sleep + verifyNoMoreInteractions which races on slow runners. + verify(restTemplate, after(200).never()) + .getForEntity(any(String.class), eq(JsonNode.class)); + assertThat(ReflectionTestUtils.getField(checker, "rpcPollFuture")).isNull(); + assertThat(ReflectionTestUtils.getField(checker, "rpcPoller")).isNull(); + } + + @Test + void initClientReschedulesAfterPollFutureDies() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + try { + Future first = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + assertThat(first).isNotNull(); + first.cancel(true); + verify(restTemplate, timeout(2000).atLeastOnce()) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + + checker.initClient(); + + Future second = (Future) ReflectionTestUtils.getField(checker, "rpcPollFuture"); + assertThat(second).isNotNull().isNotSameAs(first); + assertThat(second.isDone()).isFalse(); + } finally { + checker.destroyClient(); + } + } + + @Test + void pollOnceEchoesParamsWhenRpcArrives() throws Exception { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("id", 99L); + rpc.put("method", "monitoringCheck"); + rpc.set("params", JacksonUtil.newObjectNode().put("value", "uuid-99")); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(ResponseEntity.ok(rpc)); + + checker.pollOnce(); + + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(restTemplate).postForLocation(eq("http://localhost:8080/api/v1/token-abc/rpc/99"), bodyCaptor.capture()); + assertThat(bodyCaptor.getValue().get("value").asText()).isEqualTo("uuid-99"); + } + + @Test + void pollOncePostsEmptyBodyWhenParamsMissing() throws Exception { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("id", 7L); + rpc.put("method", "monitoringCheck"); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(ResponseEntity.ok(rpc)); + + checker.pollOnce(); + + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(restTemplate).postForLocation(eq("http://localhost:8080/api/v1/token-abc/rpc/7"), bodyCaptor.capture()); + assertThat(bodyCaptor.getValue()).isNotNull(); + assertThat(bodyCaptor.getValue().isObject()).isTrue(); + } + + @Test + void pollOnceIgnoresNonNumericId() throws Exception { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("id", "abc"); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(ResponseEntity.ok(rpc)); + + checker.pollOnce(); + + verify(restTemplate, never()).postForLocation(any(String.class), any()); + } + + @Test + void pollOnceIsSilentOn408() throws Exception { + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenThrow(HttpClientErrorException.create(HttpStatus.REQUEST_TIMEOUT, "timeout", null, null, null)); + + checker.pollOnce(); + + verify(restTemplate, never()).postForLocation(any(String.class), any()); + } + + @Test + void pollOnceSwallowsBodylessOk() throws Exception { + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.pollOnce(); + + verify(restTemplate, never()).postForLocation(any(String.class), any()); + } + + @Test + void pollTaskBackoffDoesNotKillScheduler() throws Exception { + enableRpc(); + when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class))) + .thenThrow(new RuntimeException("boom")) + .thenThrow(new RuntimeException("boom")) + .thenReturn(new ResponseEntity<>(null, HttpStatus.OK)); + + checker.initClient(); + try { + verify(restTemplate, timeout(5000).atLeast(3)) + .getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)); + } finally { + checker.destroyClient(); + } + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java new file mode 100644 index 0000000000..5e6be624d0 --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java @@ -0,0 +1,168 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.transport.impl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.client.ResourceAccessException; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.transport.DeviceConfig; +import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; +import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; +import org.thingsboard.monitoring.data.ServiceFailureException; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.net.SocketTimeoutException; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class Lwm2mTransportHealthCheckerRpcTest { + + private Lwm2mTransportMonitoringConfig config; + private TransportMonitoringTarget target; + private Lwm2mTransportHealthChecker checker; + private TbClient tbClient; + + @BeforeEach + void setUp() { + config = new Lwm2mTransportMonitoringConfig(); + ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 6000, int.class); + + DeviceConfig device = new DeviceConfig(); + device.setId(UUID.randomUUID().toString()); + DeviceCredentials credentials = new DeviceCredentials(); + credentials.setCredentialsId("endpoint-1"); + device.setCredentials(credentials); + + target = new TransportMonitoringTarget(); + target.setBaseUrl("coap://localhost:5685"); + target.setDevice(device); + + tbClient = mock(TbClient.class); + checker = new Lwm2mTransportHealthChecker(config, target); + ReflectionTestUtils.setField(checker, "tbClient", tbClient); + ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class)); + } + + @Test + void doRpcCheckHappyPathReadsManufacturerResource() throws Exception { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(new TextNode("Thingsboard")); + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + + checker.doRpcCheck(); + + verify(tbClient).handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture()); + JsonNode body = bodyCaptor.getValue(); + assertThat(body.get("method").asText()).isEqualTo("Read"); + assertThat(body.get("params").get("key").asText()).isEqualTo("/3/0/0"); + assertThat(body.get("timeout").asInt()).isEqualTo(6000); + } + + @Test + void doRpcCheckBlankResponseFailsWithRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(new TextNode("")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("blank result") + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); + } + + @Test + void doRpcCheckNullResponseFailsWithRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(null); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void doRpcCheckUsesPerTargetTimeoutOverride() throws Exception { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + rpc.setRequestTimeoutMs(12000); + target.setRpc(rpc); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(new TextNode("ok")); + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + + checker.doRpcCheck(); + + verify(tbClient).handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture()); + assertThat(bodyCaptor.getValue().get("timeout").asInt()).isEqualTo(12000); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + +} diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java new file mode 100644 index 0000000000..de27c2471d --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java @@ -0,0 +1,205 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.transport.impl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.client.ResourceAccessException; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.transport.DeviceConfig; +import org.thingsboard.monitoring.config.transport.MqttTransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.RpcCheckConfig; +import org.thingsboard.monitoring.config.transport.RpcInfo; +import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; +import org.thingsboard.monitoring.data.ServiceFailureException; +import org.thingsboard.monitoring.service.MonitoringReporter; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class MqttTransportHealthCheckerRpcTest { + + private MqttTransportMonitoringConfig config; + private TransportMonitoringTarget target; + private MqttTransportHealthChecker checker; + private TbClient tbClient; + private MqttClient mqttClient; + + @BeforeEach + void setUp() throws Exception { + config = new MqttTransportMonitoringConfig(); + ReflectionTestUtils.setField(config, "qos", 1); + ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 4000, int.class); + + DeviceConfig device = new DeviceConfig(); + device.setId(UUID.randomUUID().toString()); + device.setName("test-device"); + DeviceCredentials credentials = new DeviceCredentials(); + credentials.setCredentialsId("token-123"); + device.setCredentials(credentials); + + target = new TransportMonitoringTarget(); + target.setBaseUrl("tcp://localhost:1883"); + target.setDevice(device); + + tbClient = mock(TbClient.class); + mqttClient = mock(MqttClient.class); + lenient().when(mqttClient.isConnected()).thenReturn(true); + + checker = new MqttTransportHealthChecker(config, target); + checker.mqttClient = mqttClient; + ReflectionTestUtils.setField(checker, "tbClient", tbClient); + ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class)); + } + + @Test + void doRpcCheckHappyPath() throws Exception { + enableRpc(); + ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(JsonNode.class); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture())) + .thenAnswer(invocation -> { + JsonNode params = bodyCaptor.getValue().get("params"); + return params; + }); + + checker.doRpcCheck(); + + JsonNode sent = bodyCaptor.getValue(); + assertThat(sent.get("method").asText()).isEqualTo("monitoringCheck"); + assertThat(sent.get("timeout").asInt()).isEqualTo(4000); + assertThat(sent.get("params").get("value").asText()).isNotBlank(); + } + + @Test + void doRpcCheckValueMismatchThrowsServiceFailureWithRpcKey() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenReturn(JacksonUtil.newObjectNode().put("value", "wrong")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("RPC echo mismatch") + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); + } + + @Test + void doRpcCheckRestExceptionWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new RuntimeException("400 Bad Request")); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasCauseInstanceOf(RuntimeException.class) + .extracting(t -> ((ServiceFailureException) t).getServiceKey()) + .satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX)); + } + + @Test + void doRpcCheckTimeoutWraps() { + enableRpc(); + when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class))) + .thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException())); + + assertThatThrownBy(() -> checker.doRpcCheck()) + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("read timed out"); + } + + @Test + void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception { + // target.rpc not set → isRpcEnabled() == false + + checker.doRpcCheck(); + + verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any()); + } + + @Test + void initClientSubscribesToRpcRequestTopicWhenRpcEnabled() throws Exception { + enableRpc(); + + checker.initClient(); + + verify(mqttClient).subscribe(eq("v1/devices/me/rpc/request/+"), eq(1), any(IMqttMessageListener.class)); + } + + @Test + void initClientSkipsSubscriptionWhenRpcDisabled() throws Exception { + checker.initClient(); + + verify(mqttClient, never()).subscribe(anyString(), anyInt(), any(IMqttMessageListener.class)); + } + + @Test + void initClientSubscribesOnlyOnce() throws Exception { + enableRpc(); + + checker.initClient(); + checker.initClient(); + + verify(mqttClient, org.mockito.Mockito.times(1)) + .subscribe(eq("v1/devices/me/rpc/request/+"), eq(1), any(IMqttMessageListener.class)); + } + + @Test + void echoRpcRequestPublishesParamsBackToResponseTopic() throws Exception { + enableRpc(); + ObjectNode body = JacksonUtil.newObjectNode(); + body.put("method", "monitoringCheck"); + ObjectNode params = JacksonUtil.newObjectNode().put("value", "uuid-42"); + body.set("params", params); + MqttMessage incoming = new MqttMessage(JacksonUtil.toString(body).getBytes(StandardCharsets.UTF_8)); + + checker.echoRpcRequest("v1/devices/me/rpc/request/77", incoming); + + ArgumentCaptor publishedCaptor = ArgumentCaptor.forClass(MqttMessage.class); + verify(mqttClient).publish(eq("v1/devices/me/rpc/response/77"), publishedCaptor.capture()); + JsonNode echoed = JacksonUtil.toJsonNode(new String(publishedCaptor.getValue().getPayload(), StandardCharsets.UTF_8)); + assertThat(echoed.get("value").asText()).isEqualTo("uuid-42"); + } + + private void enableRpc() { + RpcCheckConfig rpc = new RpcCheckConfig(); + rpc.setEnabled(true); + target.setRpc(rpc); + } + +}