Browse Source

Merge 19e66f25be into 394c6ff1a1

pull/15542/merge
Sergii Matviienko 2 days ago
committed by GitHub
parent
commit
bbdfc416ca
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 556
      monitoring/docs/2026-04-29-rpc-monitoring-design.md
  2. 26
      monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java
  3. 38
      monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java
  4. 5
      monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java
  5. 4
      monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java
  6. 10
      monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java
  7. 72
      monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java
  8. 91
      monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java
  9. 133
      monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthChecker.java
  10. 34
      monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthChecker.java
  11. 46
      monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthChecker.java
  12. 80
      monitoring/src/main/resources/README.md
  13. 41
      monitoring/src/main/resources/tb-monitoring.yml
  14. 70
      monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcCheckConfigTest.java
  15. 84
      monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcInfoTest.java
  16. 200
      monitoring/src/test/java/org/thingsboard/monitoring/service/BaseHealthCheckerRpcHookTest.java
  17. 134
      monitoring/src/test/java/org/thingsboard/monitoring/service/transport/TransportHealthCheckerInitializeTest.java
  18. 243
      monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthCheckerRpcTest.java
  19. 316
      monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java
  20. 168
      monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java
  21. 205
      monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java

556
monitoring/docs/2026-04-29-rpc-monitoring-design.md

@ -0,0 +1,556 @@
# RPC Monitoring — Design
| Status | Implemented (PR for #15541) |
|--------|------------------|
| Author | Sergii Matviienko (`smatvienko@thingsboard.io`) |
| Date | 2026-04-29 |
| Module | `monitoring/` |
| Base | `upstream/rc` (CE) |
## 1. Background
`tb-monitoring` is an external service that probes a running ThingsBoard
deployment from outside the cluster. On startup it provisions every entity
it needs under a dedicated monitoring tenant — devices, profiles, calculated
fields, a rule chain, the latencies asset, and a public dashboard — and then
runs a continuous loop that, for every configured transport target, sends a
telemetry uplink and validates that the value arrives back over a WebSocket
subscription.
What the loop does **not** cover today is the **server-to-device direction**:
a working telemetry uplink does not prove that a two-way RPC issued by the
REST API will actually reach the device through the rule engine plus the
target transport. There are production failure modes where the uplink
remains green but RPC delivery is broken — for example, a stuck rule engine
partition, or a transport microservice that can publish telemetry inwards
but cannot deliver an RPC outwards.
Adding an opt-in RPC companion check to each transport target closes that
gap with a single new round-trip per cycle, reusing every entity and every
transport client the monitoring service already owns.
## 2. Current architecture (`rc`)
### 2.1 Bootstrap
`ThingsboardMonitoringApplication.startMonitoring` runs once on
`ApplicationReadyEvent`:
1. `MonitoringEntityService.checkEntities()` upserts the rule chain for the
monitoring tenant, finds-or-creates the `[Monitoring] Latencies` asset,
finds-or-creates the `[Monitoring] Cloud monitoring` dashboard, assigns
both to the public customer, and remembers the dashboard public link.
2. For each `BaseMonitoringService` bean (transports today; RPC will fold in
here too), `init()` walks its config's targets and calls
`entityService.checkEntities(config, target)` to lazily create the
device, device profile and (when `monitoring.calculated_fields.enabled`)
the calculated field. The resulting `DeviceConfig` (id + access token) is
written back into the target.
3. The scheduler stagger-schedules each `BaseMonitoringService` with
`(monitoring_rate_ms / count) * i` initial delay — so two services on a
10 s rate fire at 0 s and 5 s, not on top of each other.
4. A `:rocket: Monitoring started` info notification is emitted with a
clickable public-dashboard link.
### 2.2 Per-cycle loop (`BaseMonitoringService.runChecks`)
Each tick is a single REST login → single WebSocket connect → single WS
subscription that covers every device this service owns → for each
`BaseHealthChecker` call `check(WsClient)`. Each phase has its own service
key so failures land precisely:
```
LOGIN (MonitoredServiceKey.LOGIN) "Login"
WS_CONNECT (MonitoredServiceKey.WS_CONNECT) "WS Connect"
WS_SUBSCRIBE (MonitoredServiceKey.WS_SUBSCRIBE) "WS Subscribe"
per-target check (TransportInfo, implements ShortNameProvider)
EDQS (optional) (MonitoredServiceKey.EDQS) "*EDQS*"
GENERAL (MonitoredServiceKey.GENERAL) "Monitoring"
```
`BaseHealthChecker.check(WsClient)` is `final` and orchestrates a fixed
template:
```
initClient() // (re)open transport client
sendTestPayload(uuid) // publish telemetry uplink
checkWsUpdates(...) // wait for WS testData=uuid
report ok / failure
```
Each transport implements `initClient`, `sendTestPayload`, `destroyClient`,
plus `getKey` / `getInfo` / `getTransportType`.
### 2.3 Failure attribution & incident grouping (PR #15456, merged)
`MonitoringReporter.serviceFailure(Object serviceKey, Throwable error)`
publishes a `ServiceFailureNotification` whose `getAffectedServices()`
returns `[failing(shortName(serviceKey), failuresCount)]`. `shortName`
delegates to `ShortNameProvider` when the key implements it
(`TransportInfo` already does — `"MQTT"`, `"MQTT Foo"` if a non-default
queue, etc.) and falls back to `serviceKey.toString()` otherwise.
`IncidentManager` (Slack API mode, `monitoring.notifications.incident.*`)
threads alerts that fire within `resolution_timeout_s` (default 90 s) under
a single Slack message. The header tracks failing / high-latency / recovered
services with red / yellow / green circles and updates every minute. After
the quiet window the incident auto-resolves with a final summary.
Any new service key fed through `MonitoringReporter` automatically lights up
in the incident header — no extra wiring needed beyond producing a stable,
unique short name.
### 2.4 Current flow
```text
Bootstrap (once) Per-cycle (each service, staggered)
──────────────── ──────────────────────────────────
entityService.checkEntities() Login (LOGIN)
rule chain WS connect (WS_CONNECT)
latencies asset WS subscribe (WS_SUBSCRIBE)
public dashboard For each target (TransportInfo):
│ initClient
┌───────────┘ publish telemetry → wait WS
▼ report request, wsUpdate
TransportsMonitoringService.init() EDQS (optional)
provisions devices + creds report latencies
schedule each service with stagger report GENERAL ok
send :rocket: started notification
┌───────────────────────────┐
│ MonitoringReporter │
│ .serviceFailure(key, e) │
│ .serviceIsOk(key) │
└────────────┬──────────────┘
┌───────────────────────────┐
│ NotificationService │
│ Slack webhook (legacy) │
│ IncidentManager (rc) │
│ thread + auto-resolve │
└───────────────────────────┘
```
```mermaid
flowchart TB
subgraph Bootstrap[Bootstrap once]
EntityCheck["entityService.checkEntities()<br/>rule chain · asset · dashboard"]
Init["BaseMonitoringService.init()<br/>provisions devices + credentials"]
Stagger["scheduleWithFixedDelay,<br/>initialDelay = rate / N × index"]
Started[":rocket: Monitoring started<br/>InfoNotification w/ public link"]
end
subgraph Loop["Per cycle"]
direction TB
Login["Login (LOGIN)"]
WsConn["WS connect (WS_CONNECT)"]
WsSub["WS subscribe (WS_SUBSCRIBE)"]
PerTgt{{"For each transport target"}}
Tgt["initClient → publish telemetry<br/>→ wait WS testData=uuid<br/>(serviceKey = TransportInfo)"]
Edqs["EDQS query (EDQS)"]
GenOk["report GENERAL ok + latencies"]
end
subgraph Notify[Notification path]
Reporter[MonitoringReporter]
Notif[NotificationService]
Inc[IncidentManager<br/>thread + auto-resolve]
Slack[Slack channel]
Reporter --> Notif --> Inc --> Slack
end
EntityCheck --> Init --> Stagger --> Started --> Loop
Login --> WsConn --> WsSub --> PerTgt --> Tgt --> Edqs --> GenOk
Tgt -. on failure .-> Reporter
```
```text
Monitoring app ThingsBoard MQTT transport Echo device
────────────── ─────────── ────────────── ───────────
│ │ │ │
├─ login ──────────────►│ │ │
├─ WS connect ─────────►│ │ │
├─ WS subscribe ───────►│ │ │
│ │
│ ── For each transport target ── │
├─ MQTT publish v1/.../telemetry {testData:uuid} ─────────────────────►│
│◄── WS update {testData:uuid} ──────────────────│ │
│ │
├─ EDQS query (optional) │
└─ report latencies, GENERAL ok │
```
## 3. The gap
The check above proves a one-way path: REST login → WS subscribe →
device-to-cloud uplink. It does **not** exercise the cloud-to-device path
that production traffic relies on, namely:
```
REST API → rule engine partition → transport microservice → device → response
```
A failure anywhere along that chain (rule engine partition stuck on a
specific `queue`, a transport microservice unable to publish back, an MQTT
broker that accepts publishes but drops subscriptions) currently goes
undetected by `tb-monitoring`. Adding an RPC companion check to each
transport target closes the loop in both directions.
## 4. Proposed feature
**Core idea.** RPC is an opt-in *companion check* on each existing
transport target — same loop, same auto-provisioned device, same transport
client, one extra round-trip when enabled.
The shape mirrors the way the `monitoring.calculated_fields.enabled` flag
extends the existing telemetry check today: a flag tightens the assertion
performed within the same cycle on the same device — it does not spawn a
parallel monitoring service.
### 4.1 Configuration
A new optional `rpc:` sub-block on each `TransportMonitoringTarget`:
```yaml
monitoring:
transports:
mqtt:
enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}'
qos: 1
request_timeout_ms: 4000
targets:
- base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}'
queue: Main
rpc:
enabled: '${MQTT_RPC_MONITORING_ENABLED:false}'
request_timeout_ms: '${MQTT_RPC_REQUEST_TIMEOUT_MS:4000}'
- base_url: '${MQTT_TRANSPORT_SSL_BASE_URL:ssl://${monitoring.domain}:8883}'
queue: Main
rpc:
enabled: true # secure variant gets RPC for free
```
Properties of this shape:
- No new top-level config block — RPC is part of the transport target it
exercises.
- The RPC sub-block carries no URL, no credentials, no QoS. Those are
inherited from the parent target. Secure transports (`ssl://`, client
certs) Just Work.
- A target without `rpc:` (or with `rpc.enabled: false`) keeps today's
behaviour identically — pure addition.
- Multiple targets per transport (already supported via `targets[N]`) cover
partition fan-out automatically; each target gets its own RPC sub-check
if enabled.
### 4.2 Java surface
```
BaseHealthChecker
+ protected void doRpcCheck(...) // default: no-op
// check(WsClient) stays final; reporter / stopWatch stay private
TransportMonitoringTarget
+ RpcCheckConfig rpc // nullable; defaults to disabled
config.transport.RpcCheckConfig // new
- boolean enabled
- Integer requestTimeoutMs // optional override
config.transport.RpcInfo
// ShortNameProvider wrapper that returns "<TransportInfo.shortName> RPC"
// so failures appear as a distinct row in the IncidentManager header.
MqttTransportHealthChecker
+ extends initClient() // when rpc.enabled, also subscribe
// v1/devices/me/rpc/request/+
// on the existing MqttClient and
// echo params back as the response
+ overrides doRpcCheck() // POST /api/rpc/twoway/{deviceId},
// validate the echoed value,
// report rpcRoundTrip latency
```
`BaseHealthChecker.check(WsClient)` is extended once at the end of its
existing template:
```
initClient()
sendTestPayload(uuid)
checkWsUpdates(...)
if (rpcCheckEnabled()) doRpcCheck(); // new, after telemetry validates
report ok / failure
```
A telemetry-uplink failure short-circuits the RPC sub-check (the existing
try/catch around `sendTestPayload` already does this). The RPC sub-check
reports its own dedicated failure key so the incident header tells apart
"device can publish telemetry" from "rule engine can deliver an RPC".
### 4.3 Latencies and failure keys
- New latency: `<key>RpcRoundTrip` (e.g. `mqttRpcRoundTrip`,
`mqttFooRpcRoundTrip` for non-default queues), reported alongside the
existing `<key>Request` and `<key>WsUpdate` keys for the same target.
- New failure service key: an `RpcInfo(TransportInfo)` instance whose
`getShortName()` returns `"<transport.shortName> RPC"` (e.g.
`"MQTT RPC"`, `"MQTT Foo RPC"`). The incident header therefore shows
rows like `:red_circle: MQTT RPC (3)` next to `:red_circle: MQTT (1)`.
### 4.4 Incident manager integration
Because failure attribution flows through the existing
`MonitoringReporter` API, no changes are needed in `IncidentManager`,
`SlackIncidentTransport`, or the incident YAML. An MQTT RPC failure is
just another `ServiceFailureNotification` whose `AffectedService` carries
`name = "MQTT RPC"`. Cases:
| Scenario | Incident header reflects |
|----------|---------------------------|
| MQTT transport down (uplink fails) | `:red_circle: MQTT (n)`. RPC short-circuits (no separate row). |
| MQTT uplink fine, RPC delivery broken | `:red_circle: MQTT RPC (n)`. MQTT row stays green. |
| Both broken | Two rows: `:red_circle: MQTT (n), :red_circle: MQTT RPC (m)`. |
| RPC recovers first | MQTT RPC row turns `:large_green_circle:`, MQTT stays red until uplink recovers. |
### 4.6 LwM2M Read-vs-echo asymmetry
Three of the four transports use the same value-echo round-trip:
```
POST /api/rpc/twoway/{deviceId} {method:"monitoringCheck", params:{value:<uuid>}}
↓ rule engine + transport
device echoes params back, value == uuid
```
LwM2M is the exception. There is no echo handler to add on the monitoring
side because the existing `Lwm2mClient` already implements the standard
Object 3 (Device) Read pathway: `Lwm2mClient.read(server, /3/0/0)` returns
the value that was last set via `send()` (which is what the telemetry uplink
phase does immediately before). So the LwM2M companion check uses a
server-initiated Read instead of an echo:
```
POST /api/rpc/twoway/{deviceId} {method:"Read", params:{key:"/3/0/0"}, timeout:6000}
↓ rule engine + LwM2M Read
device returns the resource value
```
The assertion is therefore weaker — non-blank instead of value-equal —
because the resource value is not under the monitoring service's control on
the wire (the rule-engine + Leshan registration path round-trips first).
This is sufficient as a liveness signal for the cloud-to-device LwM2M Read
pathway and matches what the existing fixtures (`device_profile.json`,
`resource.json`) already expose. If a stronger assertion is wanted later,
a Write+Read pair against a custom resource on the test model would close
the gap, but that is out of scope for this PR.
The `request_timeout_ms` default is therefore `6000` ms for LwM2M (vs `4000`
ms for the other three transports) because the LwM2M Read traverses one
extra hop (Leshan registration → device coap loop → response).
### 4.5 Proposed flow
```text
Bootstrap (once) Per-cycle (each transport target)
──────────────── ────────────────────────────────
entityService.checkEntities() Login → WS connect → WS subscribe
rule chain For each target:
latencies asset initClient
public dashboard • MQTT/HTTP/CoAP as today
│ • IF rpc.enabled, also subscribe
┌───────────┘ v1/.../rpc/request/+ on the
▼ SAME client
Transports.init publish telemetry → wait WS
provisions devices + creds testData=uuid
used by both telemetry & IF rpc.enabled: doRpcCheck()
RPC checks POST /api/rpc/twoway/{deviceId}
schedule services with stagger validate echo on same client
send :rocket: started report mqttRpcRoundTrip
report request, wsUpdate
EDQS (optional), GENERAL ok
Failure path (incident grouping unchanged):
serviceFailure(transportInfo, e) → :red_circle: MQTT (n)
serviceFailure(rpcInfo(transportInfo), e) → :red_circle: MQTT RPC (n)
```
```mermaid
flowchart TB
subgraph Bootstrap[Bootstrap once]
EntityCheck["entityService.checkEntities()<br/>rule chain · asset · dashboard"]
Init["Transports.init()<br/>provisions devices + credentials<br/>used by BOTH checks"]
Stagger["scheduleWithFixedDelay (staggered)"]
end
subgraph Loop["Per cycle / per transport target"]
direction TB
Login["Login → WS connect → WS subscribe"]
Tx["initClient<br/>(if rpc.enabled, also subscribe<br/>v1/.../rpc/request/+ on same client)"]
UpL["Telemetry uplink + WS validation<br/>report mqttRequest, mqttWsUpdate"]
RpcOpt{rpc.enabled?}
DoRpc["doRpcCheck()<br/>POST /api/rpc/twoway → echo on same MQTT<br/>report mqttRpcRoundTrip<br/>(failure key = MQTT RPC)"]
Skip[skip RPC]
Edqs[EDQS · GENERAL ok]
end
subgraph Notify[Failure attribution]
R[MonitoringReporter] --> N[NotificationService] --> Inc[IncidentManager]
Inc -->|":red_circle: MQTT (n)"| Slack[Slack thread]
Inc -->|":red_circle: MQTT RPC (n)"| Slack
end
EntityCheck --> Init --> Stagger --> Loop
Login --> Tx --> UpL --> RpcOpt
RpcOpt -- yes --> DoRpc --> Edqs
RpcOpt -- no --> Skip --> Edqs
UpL -. on failure .-> R
DoRpc -. on failure .-> R
```
```text
Monitoring app ThingsBoard MQTT transport Echo device
────────────── ─────────── (incl. ssl://) (same Paho
client)
│ │
├─ MQTT connect (URL = transport target URL, │
│ creds = auto-provisioned device token) ─────────────────────────►│
│ │
├─ subscribe v1/.../rpc/request/+ (only if rpc.enabled) ────────────►│
│ │
├─ login → WS connect → WS subscribe ──►│ │
│ │
════╪═══════════ Telemetry check (unchanged from rc) ════════════════════
├─ MQTT publish v1/.../telemetry {testData:uuid} ───────────────────►│
│◄── WS update {testData:uuid} ──────────────────│ │
│ │
════╪═══════════ Companion RPC check (only if rpc.enabled) ══════════════
├─ POST /api/rpc/twoway/{deviceId} ─────────────►│ │
│ {method:monitoringCheck, rule engine + transport
│ params:{value:uuid2}, │ │
│ timeout:requestTimeoutMs} │ │
│ ├──────────►│ │
│ ├─ pub ►│
│ │◄─pub─│
│◄────── 200 OK {value:uuid2} ───────────────────────────────│ │
│ validate echo → report mqttRpcRoundTrip │
NOTE: ONE MqttClient covers both the telemetry uplink and the RPC echo.
Failure of telemetry uplink short-circuits the RPC sub-check.
```
```mermaid
sequenceDiagram
autonumber
participant Mon as Monitoring app
participant TB as ThingsBoard REST
participant TR as MQTT transport (incl. ssl://)
participant Dev as Echo device<br/>(same Paho client)
Note over Mon,Dev: One MQTT session, used for both checks
Mon->>TR: connect(URL = target URL, creds = provisioned token)
Mon->>TR: subscribe v1/devices/me/rpc/request/+ (only if rpc.enabled)
Mon->>TB: REST login → WS connect → WS subscribe
rect rgb(245,245,255)
Note right of Mon: Existing telemetry check (unchanged)
Mon->>TR: publish v1/devices/me/telemetry {testData:uuid}
TR-->>Mon: WS update {testData:uuid}
end
rect rgb(245,255,245)
Note right of Mon: New companion RPC check
Mon->>TB: POST /api/rpc/twoway/{deviceId}<br/>{method,params:{value:uuid2}}
TB->>TR: route through rule engine partition + transport
TR->>Dev: publish v1/devices/me/rpc/request/{id}
Dev-->>TR: publish v1/devices/me/rpc/response/{id} {value:uuid2}
TR-->>TB: response
TB-->>Mon: 200 OK {value:uuid2}
Mon->>Mon: validate, report mqttRpcRoundTrip
end
Note over Mon: On failure → MonitoringReporter.serviceFailure(rpcInfo, e)<br/>→ IncidentManager threads ":red_circle: MQTT RPC (n)"
```
## 5. Implementation phases
Each phase is an independently reviewable commit landing in the same PR:
1. **Land this design paper.** No code changes.
2. **Add the per-target hook.** Introduce `RpcCheckConfig`,
`TransportMonitoringTarget.rpc`, `RpcInfo`, and the no-op `doRpcCheck`
on `BaseHealthChecker`. Wire `BaseHealthChecker.check` to invoke
`doRpcCheck` after WS validation. Tests at base level.
3. **MQTT implementation.** Extend `MqttTransportHealthChecker.initClient`
to subscribe `v1/devices/me/rpc/request/+` on the existing Paho client
when `rpc.enabled`. Implement the value-echo `doRpcCheck` once on
`TransportHealthChecker` so HTTP and CoAP can inherit it. Unit tests
cover happy path, value mismatch, REST exception, response timeout,
RPC-disabled (no subscription), and idempotent subscribe.
4. **HTTP implementation.** Extend `HttpTransportHealthChecker.initClient`
to spawn an idempotent daemon long-poll thread on
`GET /api/v1/{token}/rpc?timeout=1000`; on each 200 response, POST the
echoed params back to `/api/v1/{token}/rpc/{id}`. The thread is
lifecycle-bound to `initClient` / `destroyClient`. Tests cover the
inherited `doRpcCheck` shape plus poll-thread start/stop/idempotency
and `pollOnce` behaviour for 200/408/204.
5. **CoAP implementation.** Extend `CoapTransportHealthChecker.initClient`
to open a CoAP OBSERVE on `coap://host/api/v1/{token}/rpc` via a
second `CoapClient`; on each notification echo the params back to
`coap://host/api/v1/{token}/rpc/{id}`. `destroyClient` cancels the
observe and shuts down the rpc client. Tests cover the inherited
`doRpcCheck` shape, observe lifecycle, and notification handler
parsing.
6. **LwM2M implementation.** Override `doRpcCheck` only — no
`initClient` changes — to issue
`POST /api/rpc/twoway/{deviceId} {method:"Read", params:{key:"/3/0/0"}}`
and assert non-blank, per §4.6.
7. **Documentation polish.** Add `monitoring/src/main/resources/README.md`
with the per-transport behaviour table, the new YAML keys, the
secure-MQTT example, and a small troubleshooting note on
`rpc.request_timeout_ms` ordering vs `monitoring.rest.request_timeout_ms`.
## 6. Open decisions
Listed for reviewer push-back before implementation began. All four
transports ship in this PR; the rest of the table below records the calls
that survived review.
1. **All four transports in scope.** Hook + MQTT + HTTP + CoAP + LwM2M ship
together so the IncidentManager header gains a coherent set of
`:red_circle: <transport> RPC` rows in one go. The hook itself is
generic enough that adding more transports later (SNMP, custom) is
just another override.
2. **Two-way RPC only.** Round-trip value validation is the strongest
signal; one-way RPC would only confirm the request reached the device,
which the telemetry uplink check already covers from the other
direction.
3. **RPC failure key shape.** Use `RpcInfo(TransportInfo)` implementing
`ShortNameProvider` returning `"<transport.shortName> RPC"`. Alternative:
a string constant like `"MQTT RPC"`. Decision: the wrapper, so
non-default queues (`"MQTT Foo RPC"`) are handled with no extra code.
4. **Latency suffix.** Reuse the transport target's `getKey()` derivation
(`mqtt`, `mqttFoo`) and append `RpcRoundTrip` — no separate per-target
label. This lines up with `<key>Request` / `<key>WsUpdate` already used
for the telemetry side and means the existing latencies dashboard
widget needs no schema changes.
5. **Echo handler placement (MQTT/HTTP/CoAP).** Each transport's echo
path lives inline in its `initClient` rather than in a separate helper
class. Echo logic is small (a handful of lines per transport) and is
only meaningful in the context of one connected client.
6. **LwM2M Read instead of echo.** See §4.6. The asymmetry is documented
in `monitoring/src/main/resources/README.md` so operators understand
why the LwM2M assertion is non-blank rather than uuid-equal.
## 7. Code references (`rc`)
- `BaseHealthChecker.check(WsClient)``monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java:70`
- `BaseMonitoringService.runChecks``monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java:114`
- `MonitoringEntityService.checkEntities``monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringEntityService.java:95`
- `MonitoringReporter.serviceFailure``monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java:107`
- `IncidentManager.sendAlert``monitoring/src/main/java/org/thingsboard/monitoring/notification/incident/IncidentManager.java:70`
- `AffectedService` record — `monitoring/src/main/java/org/thingsboard/monitoring/data/notification/AffectedService.java:18`
- `ShortNameProvider` interface — `monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ShortNameProvider.java:18`
- `TransportInfo.getShortName``monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java:27`
- `RestClient.handleTwoWayDeviceRPCRequest``rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java:2495`

26
monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcCheckConfig.java

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.transport;
import lombok.Data;
@Data
public class RpcCheckConfig {
private boolean enabled;
private Integer requestTimeoutMs;
}

38
monitoring/src/main/java/org/thingsboard/monitoring/config/transport/RpcInfo.java

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.transport;
import lombok.Data;
import org.thingsboard.monitoring.data.notification.ShortNameProvider;
@Data
public class RpcInfo implements ShortNameProvider {
public static final String RPC_SUFFIX = " RPC";
private final TransportInfo transportInfo;
@Override
public String getShortName() {
return transportInfo.getShortName() + RPC_SUFFIX;
}
@Override
public String toString() {
return transportInfo.toString() + RPC_SUFFIX;
}
}

5
monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java

@ -30,6 +30,7 @@ public class TransportMonitoringTarget implements MonitoringTarget {
private String queue; private String queue;
private boolean checkDomainIps; private boolean checkDomainIps;
private String namePrefix; private String namePrefix;
private RpcCheckConfig rpc;
@Override @Override
public UUID getDeviceId() { public UUID getDeviceId() {
@ -44,4 +45,8 @@ public class TransportMonitoringTarget implements MonitoringTarget {
return Strings.nullToEmpty(namePrefix); return Strings.nullToEmpty(namePrefix);
} }
public boolean isRpcEnabled() {
return rpc != null && rpc.isEnabled();
}
} }

4
monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java

@ -30,4 +30,8 @@ public class Latencies {
return String.format("%sWsUpdate", key); return String.format("%sWsUpdate", key);
} }
public static String rpcRoundTrip(String key) {
return String.format("%sRpcRoundTrip", key);
}
} }

10
monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java

@ -54,6 +54,10 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
@Value("${monitoring.check_timeout_ms}") @Value("${monitoring.check_timeout_ms}")
private int resultCheckTimeoutMs; private int resultCheckTimeoutMs;
protected final void reportRpcLatency(long latencyNanos) {
reporter.reportLatency(Latencies.rpcRoundTrip(getKey()), latencyNanos);
}
@Getter @Getter
private final Map<String, BaseHealthChecker<C, T>> associates = new HashMap<>(); private final Map<String, BaseHealthChecker<C, T>> associates = new HashMap<>();
@ -88,6 +92,8 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
log.trace("[{}] Waiting for WS update", info); log.trace("[{}] Waiting for WS update", info);
checkWsUpdates(wsClient, testValue); checkWsUpdates(wsClient, testValue);
doRpcCheck();
reporter.serviceIsOk(info); reporter.serviceIsOk(info);
} catch (ServiceFailureException e) { } catch (ServiceFailureException e) {
reporter.serviceFailure(e.getServiceKey(), e); reporter.serviceFailure(e.getServiceKey(), e);
@ -130,6 +136,10 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
protected abstract void sendTestPayload(String payload) throws Exception; protected abstract void sendTestPayload(String payload) throws Exception;
protected void doRpcCheck() throws Exception {
// no-op; transports opt in by overriding when target.isRpcEnabled()
}
@PreDestroy @PreDestroy
protected abstract void destroyClient() throws Exception; protected abstract void destroyClient() throws Exception;

72
monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java

@ -15,15 +15,24 @@
*/ */
package org.thingsboard.monitoring.service.transport; package org.thingsboard.monitoring.service.transport;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.RpcInfo;
import org.thingsboard.monitoring.config.transport.TransportInfo; import org.thingsboard.monitoring.config.transport.TransportInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.BaseHealthChecker; import org.thingsboard.monitoring.service.BaseHealthChecker;
import org.thingsboard.server.common.data.id.DeviceId;
import java.util.UUID;
@Slf4j @Slf4j
public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> { public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> {
@ -31,13 +40,76 @@ public abstract class TransportHealthChecker<C extends TransportMonitoringConfig
@Value("${monitoring.calculated_fields.enabled:true}") @Value("${monitoring.calculated_fields.enabled:true}")
private boolean calculatedFieldsMonitoringEnabled; private boolean calculatedFieldsMonitoringEnabled;
@Value("${monitoring.rest.request_timeout_ms}")
private int restRequestTimeoutMs;
@Autowired
protected TbClient tbClient;
public TransportHealthChecker(C config, TransportMonitoringTarget target) { public TransportHealthChecker(C config, TransportMonitoringTarget target) {
super(config, target); super(config, target);
} }
protected RpcInfo getRpcInfo() {
return new RpcInfo(new TransportInfo(getTransportType(), target));
}
protected int getRpcTimeoutMs() {
Integer perTarget = target.getRpc() != null ? target.getRpc().getRequestTimeoutMs() : null;
return perTarget != null ? perTarget : config.getRequestTimeoutMs();
}
/**
* Default RPC sub-check: send a uuid via two-way RPC and assert the device echoes it back.
*
* <p>Declared {@code protected} in this package; per-transport impls in
* {@code org.thingsboard.monitoring.service.transport.impl} expose narrow {@code @VisibleForTesting}
* delegating overrides so that same-package unit tests can drive {@code doRpcCheck()} directly
* without resorting to {@code ReflectionTestUtils.invokeMethod(...)}.
*/
@Override
protected void doRpcCheck() throws Exception {
if (!target.isRpcEnabled()) {
return;
}
RpcInfo rpcInfo = getRpcInfo();
String testValue = UUID.randomUUID().toString();
ObjectNode body = JacksonUtil.newObjectNode();
body.put("method", "monitoringCheck");
body.set("params", JacksonUtil.newObjectNode().put("value", testValue));
body.put("timeout", getRpcTimeoutMs());
long start = System.nanoTime();
JsonNode response;
try {
response = tbClient.handleTwoWayDeviceRPCRequest(new DeviceId(target.getDeviceId()), body);
} catch (Throwable e) {
throw new ServiceFailureException(rpcInfo, e);
}
String actual = response == null ? null : response.path("value").asText(null);
if (!testValue.equals(actual)) {
throw new ServiceFailureException(rpcInfo,
"RPC echo mismatch: expected " + testValue + " but got " + actual);
}
reportRpcLatency(System.nanoTime() - start);
}
@Override @Override
protected void initialize() { protected void initialize() {
entityService.checkEntities(config, target); entityService.checkEntities(config, target);
if (target.isRpcEnabled()) {
int rpcTimeoutMs = getRpcTimeoutMs();
if (rpcTimeoutMs >= restRequestTimeoutMs) {
String transportName = getTransportType().name().toLowerCase();
throw new IllegalStateException("RPC request timeout (" + rpcTimeoutMs + " ms) for "
+ getTransportType() + " target " + target.getBaseUrl()
+ " must be < monitoring.rest.request_timeout_ms (" + restRequestTimeoutMs
+ " ms); otherwise tbClient times out before TB times out the RPC, producing false negatives."
+ " Either raise REST_REQUEST_TIMEOUT_MS above " + rpcTimeoutMs
+ " ms or lower monitoring.transports." + transportName + ".targets[*].rpc.request_timeout_ms"
+ " (env: " + getTransportType() + "_RPC_REQUEST_TIMEOUT_MS).");
}
}
} }
@Override @Override

91
monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthChecker.java

@ -15,8 +15,12 @@
*/ */
package org.thingsboard.monitoring.service.transport.impl; package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapClient; import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapHandler;
import org.eclipse.californium.core.CoapObserveRelation;
import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.MediaTypeRegistry;
@ -24,6 +28,7 @@ import org.eclipse.californium.elements.config.SystemConfig;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.config.transport.TransportType;
@ -41,6 +46,10 @@ public class CoapTransportHealthChecker extends TransportHealthChecker<CoapTrans
} }
private CoapClient coapClient; private CoapClient coapClient;
@VisibleForTesting
CoapClient rpcCoapClient;
@VisibleForTesting
CoapObserveRelation rpcObserveRelation;
protected CoapTransportHealthChecker(CoapTransportMonitoringConfig config, TransportMonitoringTarget target) { protected CoapTransportHealthChecker(CoapTransportMonitoringConfig config, TransportMonitoringTarget target) {
super(config, target); super(config, target);
@ -55,6 +64,74 @@ public class CoapTransportHealthChecker extends TransportHealthChecker<CoapTrans
coapClient.setTimeout((long) config.getRequestTimeoutMs()); coapClient.setTimeout((long) config.getRequestTimeoutMs());
log.debug("Initialized CoAP client for URI {}", uri); log.debug("Initialized CoAP client for URI {}", uri);
} }
if (target.isRpcEnabled() && rpcObserveRelation == null) {
String accessToken = target.getDevice().getCredentials().getCredentialsId();
String rpcUri = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc";
if (rpcCoapClient == null) {
rpcCoapClient = new CoapClient(rpcUri);
rpcCoapClient.setTimeout((long) config.getRequestTimeoutMs());
}
rpcObserveRelation = rpcCoapClient.observe(new CoapHandler() {
@Override
public void onLoad(CoapResponse response) {
handleRpcNotification(response);
}
@Override
public void onError() {
// Sustained observe failure surfaces indirectly: doRpcCheck()'s
// server-side RPC send will time out without an echo from this
// device, which is the actual signal we want to report.
log.debug("CoAP RPC observe failed");
}
});
log.debug("Started CoAP RPC observe on {}", rpcUri);
}
}
@VisibleForTesting
void handleRpcNotification(CoapResponse response) {
try {
String body = response == null ? null : response.getResponseText();
if (body == null || body.isEmpty()) {
return;
}
JsonNode rpc = JacksonUtil.toJsonNode(body);
JsonNode idNode = rpc == null ? null : rpc.get("id");
if (idNode == null || !idNode.isNumber()) {
log.debug("CoAP RPC notification missing or non-numeric id: {}", body);
return;
}
JsonNode params = rpc.get("params");
String accessToken = target.getDevice().getCredentials().getCredentialsId();
String responseUri = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc/" + idNode.asLong();
String payload = params == null ? "{}" : JacksonUtil.toString(params);
postRpcResponse(responseUri, payload);
} catch (Exception e) {
log.warn("CoAP RPC echo failed: {}", e.getMessage());
}
}
// Allocates a fresh CoapClient per echo because the observe relation owns
// rpcCoapClient and reusing it via setURI(...) would race with incoming
// observe notifications on the same client. The cost is one short-lived
// client per RPC (rare event). Construction is inside the try block so a
// future ctor change that opens a socket can't leak it on init failure —
// the null-guarded shutdown() in finally still runs.
@VisibleForTesting
void postRpcResponse(String uri, String payload) {
CoapClient client = null;
try {
client = new CoapClient(uri);
client.setTimeout((long) config.getRequestTimeoutMs());
client.post(payload, MediaTypeRegistry.APPLICATION_JSON);
} catch (Exception e) {
log.debug("CoAP RPC response post failed for {}: {}", uri, e.getMessage());
} finally {
if (client != null) {
client.shutdown();
}
}
} }
@Override @Override
@ -66,8 +143,22 @@ public class CoapTransportHealthChecker extends TransportHealthChecker<CoapTrans
} }
} }
@VisibleForTesting
@Override
protected void doRpcCheck() throws Exception {
super.doRpcCheck();
}
@Override @Override
protected void destroyClient() throws Exception { protected void destroyClient() throws Exception {
if (rpcObserveRelation != null) {
rpcObserveRelation.proactiveCancel();
rpcObserveRelation = null;
}
if (rpcCoapClient != null) {
rpcCoapClient.shutdown();
rpcCoapClient = null;
}
if (coapClient != null) { if (coapClient != null) {
coapClient.shutdown(); coapClient.shutdown();
coapClient = null; coapClient = null;

133
monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthChecker.java

@ -15,25 +15,58 @@
*/ */
package org.thingsboard.monitoring.service.transport.impl; package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component @Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j @Slf4j
public class HttpTransportHealthChecker extends TransportHealthChecker<HttpTransportMonitoringConfig> { public class HttpTransportHealthChecker extends TransportHealthChecker<HttpTransportMonitoringConfig> {
static final long POLL_TIMEOUT_MS = 1000L;
private static final long POLL_READ_TIMEOUT_SLACK_MS = 1000L;
// Inter-poll delay. The effective rate is dominated by the long-poll's
// server-side wait window (POLL_TIMEOUT_MS). This value only acts as a
// rate-limiter when the server returns immediately (204, bodyless 200,
// 408) — without it, the loop would tight-spin in those cases.
private static final long POLL_INTERVAL_MS = POLL_TIMEOUT_MS / 4;
private static final long POLL_BACKOFF_INITIAL_MS = 500L;
private static final long POLL_BACKOFF_MAX_MS = 5_000L;
// Must exceed POLL_TIMEOUT_MS + POLL_READ_TIMEOUT_SLACK_MS so that destroyClient()
// can wait out the worst-case in-flight read. The default JDK HttpURLConnection
// does not honor Thread.interrupt() on a blocking read, so cancel(true) only
// unblocks the poller once the read timeout fires. If POLL_READ_TIMEOUT_SLACK_MS
// is bumped, this value must move with it.
private static final long SHUTDOWN_TIMEOUT_MS = 5_000L;
private static final AtomicInteger POOL_COUNTER = new AtomicInteger();
private RestTemplate restTemplate; private RestTemplate restTemplate;
private ScheduledExecutorService rpcPoller;
private Future<?> rpcPollFuture;
private long backoffMs;
protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) { protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) {
super(config, target); super(config, target);
@ -44,10 +77,83 @@ public class HttpTransportHealthChecker extends TransportHealthChecker<HttpTrans
if (restTemplate == null) { if (restTemplate == null) {
restTemplate = new RestTemplateBuilder() restTemplate = new RestTemplateBuilder()
.setConnectTimeout(Duration.ofMillis(config.getRequestTimeoutMs())) .setConnectTimeout(Duration.ofMillis(config.getRequestTimeoutMs()))
.setReadTimeout(Duration.ofMillis(config.getRequestTimeoutMs())) .setReadTimeout(Duration.ofMillis(POLL_TIMEOUT_MS + POLL_READ_TIMEOUT_SLACK_MS))
.build(); .build();
log.debug("Initialized HTTP client"); log.debug("Initialized HTTP client");
} }
if (target.isRpcEnabled() && (rpcPollFuture == null || rpcPollFuture.isDone())) {
if (rpcPoller == null || rpcPoller.isShutdown()) {
rpcPoller = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory());
}
backoffMs = 0L;
rpcPollFuture = rpcPoller.scheduleWithFixedDelay(this::pollTask, 0L, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
log.debug("Started HTTP RPC poll loop for device {}", target.getDeviceId());
}
}
private ThreadFactory daemonThreadFactory() {
String name = "http-rpc-poll-" + POOL_COUNTER.incrementAndGet() + "-" + target.getDeviceId();
return r -> {
Thread t = new Thread(r, name);
t.setDaemon(true);
return t;
};
}
@VisibleForTesting
void pollTask() {
try {
pollOnce();
backoffMs = 0L;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("HTTP RPC poll error: {}", e.getMessage());
try {
Thread.sleep(nextBackoffMs());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
private long nextBackoffMs() {
long base = backoffMs == 0L ? POLL_BACKOFF_INITIAL_MS : Math.min(backoffMs * 2L, POLL_BACKOFF_MAX_MS);
backoffMs = base;
long jitter = ThreadLocalRandom.current().nextLong(0L, Math.max(1L, base / 2L));
return base + jitter;
}
@VisibleForTesting
void pollOnce() throws InterruptedException {
String accessToken = target.getDevice().getCredentials().getCredentialsId();
// POLL_TIMEOUT_MS is sent to the server as the long-poll wait window.
// The RestTemplate read timeout above is sized to POLL_TIMEOUT_MS + slack so
// a slow server still terminates within bounded time and destroyClient() can
// unblock the poller via Future#cancel(true).
String pollUrl = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc?timeout=" + POLL_TIMEOUT_MS;
ResponseEntity<JsonNode> poll;
try {
poll = restTemplate.getForEntity(pollUrl, JsonNode.class);
} catch (HttpStatusCodeException e) {
if (e.getStatusCode() == HttpStatus.REQUEST_TIMEOUT || e.getStatusCode() == HttpStatus.NO_CONTENT) {
return;
}
throw e;
}
if (poll.getStatusCode() != HttpStatus.OK || poll.getBody() == null) {
return;
}
JsonNode rpc = poll.getBody();
JsonNode idNode = rpc.get("id");
JsonNode params = rpc.get("params");
if (idNode == null || !idNode.isNumber()) {
log.debug("HTTP RPC poll response missing or non-numeric id: {}", rpc);
return;
}
String responseUrl = target.getBaseUrl() + "/api/v1/" + accessToken + "/rpc/" + idNode.asLong();
JsonNode body = params == null ? JacksonUtil.newObjectNode() : params;
restTemplate.postForLocation(responseUrl, body);
} }
@Override @Override
@ -56,8 +162,31 @@ public class HttpTransportHealthChecker extends TransportHealthChecker<HttpTrans
restTemplate.postForObject(target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry", payload, String.class); restTemplate.postForObject(target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry", payload, String.class);
} }
@VisibleForTesting
@Override
protected void doRpcCheck() throws Exception {
super.doRpcCheck();
}
@Override @Override
protected void destroyClient() throws Exception {} protected void destroyClient() throws Exception {
if (rpcPollFuture != null) {
rpcPollFuture.cancel(true);
rpcPollFuture = null;
}
if (rpcPoller != null) {
rpcPoller.shutdownNow();
try {
if (!rpcPoller.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
log.warn("HTTP RPC poller did not terminate within {} ms", SHUTDOWN_TIMEOUT_MS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
rpcPoller = null;
log.debug("Stopped HTTP RPC poller");
}
}
@Override @Override
protected TransportType getTransportType() { protected TransportType getTransportType() {

34
monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthChecker.java

@ -15,21 +15,29 @@
*/ */
package org.thingsboard.monitoring.service.transport.impl; package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.Lwm2mClient; import org.thingsboard.monitoring.client.Lwm2mClient;
import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.RpcInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
import org.thingsboard.server.common.data.id.DeviceId;
@Service @Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j @Slf4j
public class Lwm2mTransportHealthChecker extends TransportHealthChecker<Lwm2mTransportMonitoringConfig> { public class Lwm2mTransportHealthChecker extends TransportHealthChecker<Lwm2mTransportMonitoringConfig> {
private static final String READ_RESOURCE_PATH = "/3/0/0";
private Lwm2mClient lwm2mClient; private Lwm2mClient lwm2mClient;
protected Lwm2mTransportHealthChecker(Lwm2mTransportMonitoringConfig config, TransportMonitoringTarget target) { protected Lwm2mTransportHealthChecker(Lwm2mTransportMonitoringConfig config, TransportMonitoringTarget target) {
@ -56,6 +64,32 @@ public class Lwm2mTransportHealthChecker extends TransportHealthChecker<Lwm2mTra
return testValue; return testValue;
} }
@Override
protected void doRpcCheck() throws Exception {
if (!target.isRpcEnabled()) {
return;
}
RpcInfo rpcInfo = getRpcInfo();
ObjectNode body = JacksonUtil.newObjectNode();
body.put("method", "Read");
body.set("params", JacksonUtil.newObjectNode().put("key", READ_RESOURCE_PATH));
body.put("timeout", getRpcTimeoutMs());
long start = System.nanoTime();
JsonNode response;
try {
response = tbClient.handleTwoWayDeviceRPCRequest(new DeviceId(target.getDeviceId()), body);
} catch (Throwable e) {
throw new ServiceFailureException(rpcInfo, e);
}
String result = response == null ? null : response.asText(null);
if (result == null || result.isBlank()) {
throw new ServiceFailureException(rpcInfo,
"LwM2M RPC Read on " + READ_RESOURCE_PATH + " returned blank result");
}
reportRpcLatency(System.nanoTime() - start);
}
@Override @Override
protected void destroyClient() throws Exception { protected void destroyClient() throws Exception {
if (lwm2mClient != null) { if (lwm2mClient != null) {

46
monitoring/src/main/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthChecker.java

@ -15,7 +15,10 @@
*/ */
package org.thingsboard.monitoring.service.transport.impl; package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
@ -25,19 +28,26 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.config.transport.MqttTransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.MqttTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
import java.nio.charset.StandardCharsets;
@Component @Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j @Slf4j
public class MqttTransportHealthChecker extends TransportHealthChecker<MqttTransportMonitoringConfig> { public class MqttTransportHealthChecker extends TransportHealthChecker<MqttTransportMonitoringConfig> {
private MqttClient mqttClient; @VisibleForTesting
MqttClient mqttClient;
private boolean rpcSubscribed;
private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry"; private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry";
private static final String DEVICE_RPC_REQUEST_SUB_TOPIC = "v1/devices/me/rpc/request/+";
private static final String DEVICE_RPC_RESPONSE_TOPIC_PREFIX = "v1/devices/me/rpc/response/";
protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, TransportMonitoringTarget target) { protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, TransportMonitoringTarget target) {
super(config, target); super(config, target);
@ -59,6 +69,33 @@ public class MqttTransportHealthChecker extends TransportHealthChecker<MqttTrans
throw result.getException(); throw result.getException();
} }
log.debug("Initialized MQTT client for URI {}", mqttClient.getServerURI()); log.debug("Initialized MQTT client for URI {}", mqttClient.getServerURI());
rpcSubscribed = false;
}
if (target.isRpcEnabled() && !rpcSubscribed) {
mqttClient.subscribe(DEVICE_RPC_REQUEST_SUB_TOPIC, config.getQos(), this::echoRpcRequest);
rpcSubscribed = true;
log.debug("Subscribed for RPC requests on {}", DEVICE_RPC_REQUEST_SUB_TOPIC);
}
}
// Registered as IMqttMessageListener; must not propagate exceptions to Paho's
// callback dispatcher. A leaked exception (malformed payload, broken JSON,
// transient publish failure) can disconnect the session or leave the listener
// in a bad state, masking the very telemetry uplink the rest of this checker
// depends on. Match CoapTransportHealthChecker.handleRpcNotification: log + skip.
@VisibleForTesting
void echoRpcRequest(String topic, MqttMessage request) {
try {
String requestId = StringUtils.substringAfterLast(topic, "/");
JsonNode body = JacksonUtil.toJsonNode(new String(request.getPayload(), StandardCharsets.UTF_8));
JsonNode params = body == null ? null : body.get("params");
byte[] responsePayload = (params == null ? "{}" : JacksonUtil.toString(params))
.getBytes(StandardCharsets.UTF_8);
MqttMessage response = new MqttMessage(responsePayload);
response.setQos(config.getQos());
mqttClient.publish(DEVICE_RPC_RESPONSE_TOPIC_PREFIX + requestId, response);
} catch (Exception e) {
log.warn("MQTT RPC echo failed for {}: {}", topic, e.getMessage());
} }
} }
@ -70,11 +107,18 @@ public class MqttTransportHealthChecker extends TransportHealthChecker<MqttTrans
mqttClient.publish(DEVICE_TELEMETRY_TOPIC, message); mqttClient.publish(DEVICE_TELEMETRY_TOPIC, message);
} }
@VisibleForTesting
@Override
protected void doRpcCheck() throws Exception {
super.doRpcCheck();
}
@Override @Override
protected void destroyClient() throws Exception { protected void destroyClient() throws Exception {
if (mqttClient != null) { if (mqttClient != null) {
mqttClient.disconnect(); mqttClient.disconnect();
mqttClient = null; mqttClient = null;
rpcSubscribed = false;
log.info("Disconnected MQTT client"); log.info("Disconnected MQTT client");
} }
} }

80
monitoring/src/main/resources/README.md

@ -0,0 +1,80 @@
# tb-monitoring — RPC companion check
Each `monitoring.transports.{mqtt,http,coap,lwm2m}.targets[N]` accepts an optional
`rpc:` sub-block. When `rpc.enabled` is `true`, the monitoring loop runs an
extra round-trip on the same target per cycle, after the existing telemetry
uplink + WebSocket validation has succeeded.
```yaml
monitoring:
transports:
mqtt:
request_timeout_ms: 4000
targets:
- base_url: 'tcp://${monitoring.domain}:1883'
queue: 'Main'
rpc:
enabled: true
request_timeout_ms: 4000
- base_url: 'ssl://${monitoring.domain}:8883' # secure variant
queue: 'Main'
rpc:
enabled: true # inherits ssl + creds
```
## YAML keys
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `rpc.enabled` | bool | `false` | Opt in to the per-target RPC sub-check. |
| `rpc.request_timeout_ms` | int | transport `request_timeout_ms` | Sent through to ThingsBoard as the RPC `timeout`. Must be `< monitoring.rest.request_timeout_ms`. |
For LwM2M the default is `6000` ms because the Read goes through rule engine +
Leshan registration; the other transports default to `4000`. The default
`monitoring.rest.request_timeout_ms` is `8000` ms, leaving 2 s margin above
LwM2M; if you raise any per-target `rpc.request_timeout_ms` near or above the
REST timeout, raise `REST_REQUEST_TIMEOUT_MS` first or `initialize()` will
fail-fast at startup.
## Per-transport behaviour
| Transport | Device side | Cloud side | Assertion |
|-----------|-------------|------------|-----------|
| MQTT | Same Paho client subscribes `v1/devices/me/rpc/request/+` and publishes echoed `params` to `v1/devices/me/rpc/response/{id}` | `POST /api/rpc/twoway/{deviceId}` `{method:"monitoringCheck", params:{value:<uuid>}}` | echoed `value == uuid` |
| HTTP | Daemon long-poll thread per target: `GET /api/v1/{token}/rpc?timeout=1000`; on 200, `POST /api/v1/{token}/rpc/{id}` with the params | same as MQTT | echoed `value == uuid` |
| CoAP | Separate `CoapClient` opens an OBSERVE on `coap://host/api/v1/{token}/rpc`; on each notification, posts echoed params to `coap://host/api/v1/{token}/rpc/{id}` | same as MQTT | echoed `value == uuid` |
| LwM2M | None — the existing `Lwm2mClient` already serves `/3/0/0` via the standard LwM2M Read | `POST /api/rpc/twoway/{deviceId}` `{method:"Read", params:{key:"/3/0/0"}}` | response is non-blank |
## Failure attribution
RPC failures use a dedicated `RpcInfo` service key whose `getShortName()`
returns `"<transport.shortName> RPC"` (`"MQTT RPC"`, `"MQTT Foo RPC"` for
non-default queues, etc.). The IncidentManager header therefore distinguishes
`":red_circle: MQTT (n)"` from `":red_circle: MQTT RPC (m)"`. A telemetry
uplink failure short-circuits the RPC sub-check for that cycle, so a broken
transport will not double-count against both rows.
## Latencies
When the RPC check succeeds, the round-trip is reported under
`<key>RpcRoundTrip` (e.g. `mqttRpcRoundTrip`, `mqttFooRpcRoundTrip` for
non-default queues), alongside the existing `<key>Request` and `<key>WsUpdate`
keys for the same target. No latencies dashboard schema change is needed.
## Troubleshooting
- **`rpc.request_timeout_ms` ordering**: the RPC HTTP call goes through the
same `monitoring.rest.request_timeout_ms` REST client. Keep
`rpc.request_timeout_ms` strictly smaller so the device-side timeout fires
first; otherwise the REST call will fail with `ResourceAccessException`
before the RPC has a chance to time out cleanly.
- **HTTP polling thread leaks**: the long-poll thread is daemon and lifecycle-
bound to `initClient` / `destroyClient`. If you see leaked threads, check
whether a custom shutdown hook is bypassing `@PreDestroy`.
- **CoAP OBSERVE behind NAT**: the OBSERVE client uses the same Californium
configuration as the telemetry client; if the telemetry check works but the
RPC observe never delivers, check NAT/firewall on the CoAP UDP path.
- **LwM2M Read on `/3/0/0` returns blank**: the monitoring `Lwm2mClient`
serves resource `0` from whatever was last sent via telemetry, so a blank
result usually means telemetry never reached the device this cycle. Confirm
the telemetry row is green first.

41
monitoring/src/main/resources/tb-monitoring.yml

@ -24,8 +24,10 @@ monitoring:
username: '${REST_AUTH_USERNAME:tenant@thingsboard.org}' username: '${REST_AUTH_USERNAME:tenant@thingsboard.org}'
# Authentication password # Authentication password
password: '${REST_AUTH_PASSWORD:tenant}' password: '${REST_AUTH_PASSWORD:tenant}'
# REST request timeout in milliseconds # REST request timeout in milliseconds. Must be greater than every per-target
request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:5000}' # rpc.request_timeout_ms — see TransportHealthChecker.initialize() fail-fast.
# Default leaves a 2 s margin above the slowest built-in default (LwM2M 6000 ms).
request_timeout_ms: '${REST_REQUEST_TIMEOUT_MS:8000}'
ws: ws:
# WebSocket url, wss://DOMAIN by default # WebSocket url, wss://DOMAIN by default
base_url: '${WS_BASE_URL:wss://${monitoring.domain}}' base_url: '${WS_BASE_URL:wss://${monitoring.domain}}'
@ -59,6 +61,21 @@ monitoring:
check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name # Prefix for the target device name
name_prefix: '${MQTT_TRANSPORT_TARGET_NAME_PREFIX:}' name_prefix: '${MQTT_TRANSPORT_TARGET_NAME_PREFIX:}'
# Optional companion RPC check on this target.
# Issues POST /api/rpc/twoway/{deviceId} with method=monitoringCheck and a uuid,
# validates the device echoes the uuid back over the same MQTT session, and
# reports the round-trip under the <key>RpcRoundTrip latency.
# Failures appear as ":red_circle: MQTT RPC" in the IncidentManager header.
rpc:
enabled: '${MQTT_RPC_MONITORING_ENABLED:false}'
# RPC request timeout in ms; must be < monitoring.rest.request_timeout_ms.
# If unset, falls back to monitoring.transports.mqtt.request_timeout_ms.
request_timeout_ms: '${MQTT_RPC_REQUEST_TIMEOUT_MS:4000}'
# Secure MQTT example with RPC enabled by default:
# - base_url: '${MQTT_TRANSPORT_SSL_BASE_URL:ssl://${monitoring.domain}:8883}'
# queue: 'Main'
# rpc:
# enabled: true
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc. # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc.
@ -76,6 +93,11 @@ monitoring:
check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name # Prefix for the target device name
name_prefix: '${COAP_TRANSPORT_TARGET_NAME_PREFIX:}' name_prefix: '${COAP_TRANSPORT_TARGET_NAME_PREFIX:}'
# Optional companion RPC check. Opens a CoAP OBSERVE on /api/v1/{token}/rpc on
# a separate CoapClient and echoes incoming RPC params back to /api/v1/{token}/rpc/{id}.
rpc:
enabled: '${COAP_RPC_MONITORING_ENABLED:false}'
request_timeout_ms: '${COAP_RPC_REQUEST_TIMEOUT_MS:4000}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc. # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc.
@ -93,6 +115,12 @@ monitoring:
check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name # Prefix for the target device name
name_prefix: '${HTTP_TRANSPORT_TARGET_NAME_PREFIX:}' name_prefix: '${HTTP_TRANSPORT_TARGET_NAME_PREFIX:}'
# Optional companion RPC check. Spawns a daemon long-poll thread per target on
# GET /api/v1/{token}/rpc?timeout=1000 and POSTs the echoed params back to
# /api/v1/{token}/rpc/{id} when an RPC arrives.
rpc:
enabled: '${HTTP_RPC_MONITORING_ENABLED:false}'
request_timeout_ms: '${HTTP_RPC_REQUEST_TIMEOUT_MS:4000}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc. # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc.
@ -110,6 +138,15 @@ monitoring:
check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name # Prefix for the target device name
name_prefix: '${LWM2M_TRANSPORT_TARGET_NAME_PREFIX:}' name_prefix: '${LWM2M_TRANSPORT_TARGET_NAME_PREFIX:}'
# Optional companion RPC check. Issues a server-initiated LwM2M Read on /3/0/0
# via /api/rpc/twoway/{deviceId} with method=Read; asserts the response is
# non-blank. The "value matches uuid" pattern used by other transports does not
# apply here because the Read is read-only — see design doc §4.6.
rpc:
enabled: '${LWM2M_RPC_MONITORING_ENABLED:false}'
# LwM2M Read goes through rule engine + Leshan registration, so default to
# 6000 ms; tune per environment.
request_timeout_ms: '${LWM2M_RPC_REQUEST_TIMEOUT_MS:6000}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc.

70
monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcCheckConfigTest.java

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.transport;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class RpcCheckConfigTest {
@Test
void defaultsAreDisabledAndUnset() {
RpcCheckConfig config = new RpcCheckConfig();
assertThat(config.isEnabled()).isFalse();
assertThat(config.getRequestTimeoutMs()).isNull();
}
@Test
void settersRoundTrip() {
RpcCheckConfig config = new RpcCheckConfig();
config.setEnabled(true);
config.setRequestTimeoutMs(7500);
assertThat(config.isEnabled()).isTrue();
assertThat(config.getRequestTimeoutMs()).isEqualTo(7500);
}
@Test
void targetIsRpcEnabledFalseWhenSubBlockMissing() {
TransportMonitoringTarget target = new TransportMonitoringTarget();
target.setBaseUrl("tcp://host:1883");
assertThat(target.isRpcEnabled()).isFalse();
}
@Test
void targetIsRpcEnabledFalseWhenDisabled() {
TransportMonitoringTarget target = new TransportMonitoringTarget();
target.setBaseUrl("tcp://host:1883");
target.setRpc(new RpcCheckConfig());
assertThat(target.isRpcEnabled()).isFalse();
}
@Test
void targetIsRpcEnabledTrueWhenEnabled() {
TransportMonitoringTarget target = new TransportMonitoringTarget();
target.setBaseUrl("tcp://host:1883");
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
target.setRpc(rpc);
assertThat(target.isRpcEnabled()).isTrue();
}
}

84
monitoring/src/test/java/org/thingsboard/monitoring/config/transport/RpcInfoTest.java

@ -0,0 +1,84 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.config.transport;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class RpcInfoTest {
@Test
void shortNameAppendsRpcForDefaultQueue() {
RpcInfo rpc = new RpcInfo(transportInfo("Main"));
assertThat(rpc.getShortName()).isEqualTo("MQTT RPC");
}
@Test
void shortNameAppendsRpcForCustomQueue() {
RpcInfo rpc = new RpcInfo(transportInfo("Foo"));
assertThat(rpc.getShortName()).isEqualTo("MQTT Foo RPC");
}
@Test
void toStringIncludesUrlAndRpcMarker() {
RpcInfo rpc = new RpcInfo(transportInfo("Main"));
assertThat(rpc.toString())
.contains("*MQTT*")
.contains("(tcp://host:1883)")
.endsWith(RpcInfo.RPC_SUFFIX);
}
@Test
void equalsAndHashCodeAreStableAcrossInstances() {
RpcInfo a = new RpcInfo(transportInfo("Main"));
RpcInfo b = new RpcInfo(transportInfo("Main"));
assertThat(a).isEqualTo(b);
assertThat(a.hashCode()).isEqualTo(b.hashCode());
}
@Test
void differentQueuesProduceDistinctKeys() {
RpcInfo a = new RpcInfo(transportInfo("Main"));
RpcInfo b = new RpcInfo(transportInfo("Foo"));
assertThat(a).isNotEqualTo(b);
}
// IncidentManager dedups by toString() — RpcInfo for a target must produce a
// different incident row than TransportInfo for the same target so the RPC
// row and the telemetry row don't collapse into one.
@Test
void rpcInfoIncidentKeyDiffersFromTelemetryInfoForSameTarget() {
TransportInfo telemetry = transportInfo("Main");
RpcInfo rpc = new RpcInfo(telemetry);
assertThat(rpc.toString()).isNotEqualTo(telemetry.toString());
assertThat(rpc).isNotEqualTo(telemetry);
}
private static TransportInfo transportInfo(String queue) {
TransportMonitoringTarget target = new TransportMonitoringTarget();
target.setBaseUrl("tcp://host:1883");
target.setQueue(queue);
return new TransportInfo(TransportType.MQTT, target);
}
}

200
monitoring/src/test/java/org/thingsboard/monitoring/service/BaseHealthCheckerRpcHookTest.java

@ -0,0 +1,200 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.monitoring.client.WsClient;
import org.thingsboard.monitoring.config.MonitoringConfig;
import org.thingsboard.monitoring.config.MonitoringTarget;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.util.TbStopWatch;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class BaseHealthCheckerRpcHookTest {
private MonitoringReporter reporter;
private WsClient wsClient;
private TestTarget target;
private TestChecker checker;
@BeforeEach
void setUp() {
reporter = mock(MonitoringReporter.class);
wsClient = mock(WsClient.class);
target = new TestTarget(UUID.randomUUID());
checker = new TestChecker(new TestConfig(), target);
ReflectionTestUtils.setField(checker, "info", "test-info");
ReflectionTestUtils.setField(checker, "reporter", reporter);
ReflectionTestUtils.setField(checker, "stopWatch", new TbStopWatch());
ReflectionTestUtils.setField(checker, "resultCheckTimeoutMs", 1000);
}
@Test
void rpcCheckRunsAfterSuccessfulTelemetryAndWsUpdate() {
stubWsUpdateEchoesPayload();
checker.check(wsClient);
assertThat(checker.rpcCheckCalled).isTrue();
verify(reporter).serviceIsOk("test-info");
}
@Test
void rpcCheckIsNotRunWhenTelemetryPublishFails() {
checker.failTelemetry = true;
checker.check(wsClient);
assertThat(checker.rpcCheckCalled).isFalse();
verify(reporter).serviceFailure(eq("test-info"), any(Throwable.class));
verify(reporter, never()).serviceIsOk(any());
}
@Test
void rpcCheckIsNotRunWhenWsUpdateMissing() {
when(wsClient.getLatest(any())).thenReturn(Collections.emptyMap());
checker.check(wsClient);
assertThat(checker.rpcCheckCalled).isFalse();
verify(reporter).serviceFailure(eq("test-info"), any(Throwable.class));
verify(reporter, never()).serviceIsOk(any());
}
@Test
void rpcFailureRoutesThroughDedicatedServiceKey() {
stubWsUpdateEchoesPayload();
Object rpcKey = new Object() {
@Override public String toString() { return "test-info RPC"; }
};
checker.rpcServiceKey = rpcKey;
checker.rpcFailure = new IOException("rpc failed");
checker.check(wsClient);
assertThat(checker.rpcCheckCalled).isTrue();
verify(reporter).serviceFailure(eq(rpcKey), any(Throwable.class));
verify(reporter, never()).serviceIsOk(any());
}
@Test
void defaultDoRpcCheckIsNoOpAndDoesNotPreventOk() {
TestChecker noOpRpc = new TestChecker(new TestConfig(), target);
ReflectionTestUtils.setField(noOpRpc, "info", "noop-info");
ReflectionTestUtils.setField(noOpRpc, "reporter", reporter);
ReflectionTestUtils.setField(noOpRpc, "stopWatch", new TbStopWatch());
ReflectionTestUtils.setField(noOpRpc, "resultCheckTimeoutMs", 1000);
noOpRpc.skipRpcOverride = true;
when(wsClient.getLatest(any())).thenAnswer(latestEcho(noOpRpc));
noOpRpc.check(wsClient);
verify(reporter).serviceIsOk("noop-info");
}
private void stubWsUpdateEchoesPayload() {
when(wsClient.getLatest(any())).thenAnswer(latestEcho(checker));
}
private static org.mockito.stubbing.Answer<Map<String, String>> latestEcho(TestChecker checker) {
return (InvocationOnMock invocation) -> {
String value = checker.capturedTestValue;
return value == null ? Collections.emptyMap() : Map.of("testData", value);
};
}
private static final class TestConfig implements MonitoringConfig<TestTarget> {
@Override
public List<TestTarget> getTargets() {
return Collections.emptyList();
}
}
private static final class TestTarget implements MonitoringTarget {
private final UUID deviceId;
TestTarget(UUID deviceId) {
this.deviceId = deviceId;
}
@Override public UUID getDeviceId() { return deviceId; }
@Override public String getBaseUrl() { return "test://target"; }
@Override public boolean isCheckDomainIps() { return false; }
}
private static class TestChecker extends BaseHealthChecker<TestConfig, TestTarget> {
String capturedTestValue;
boolean failTelemetry;
boolean rpcCheckCalled;
boolean skipRpcOverride;
Object rpcServiceKey;
Throwable rpcFailure;
TestChecker(TestConfig config, TestTarget target) {
super(config, target);
}
@Override protected void initialize() { }
@Override protected void initClient() { }
@Override protected void destroyClient() { }
@Override protected Object getInfo() { return "test-info"; }
@Override protected String getKey() { return "test"; }
@Override protected boolean isCfMonitoringEnabled() { return false; }
@Override
protected String createTestPayload(String testValue) {
capturedTestValue = testValue;
return testValue;
}
@Override
protected void sendTestPayload(String payload) throws Exception {
if (failTelemetry) {
throw new IOException("telemetry publish failed");
}
}
@Override
protected void doRpcCheck() throws Exception {
if (skipRpcOverride) {
super.doRpcCheck();
return;
}
rpcCheckCalled = true;
if (rpcFailure != null) {
throw new ServiceFailureException(rpcServiceKey, rpcFailure);
}
}
}
}

134
monitoring/src/test/java/org/thingsboard/monitoring/service/transport/TransportHealthCheckerInitializeTest.java

@ -0,0 +1,134 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service.transport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.RpcCheckConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.service.MonitoringEntityService;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
class TransportHealthCheckerInitializeTest {
private HttpTransportMonitoringConfig config;
private TransportMonitoringTarget target;
private MonitoringEntityService entityService;
@BeforeEach
void setUp() {
config = new HttpTransportMonitoringConfig();
ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 3000, int.class);
DeviceConfig device = new DeviceConfig();
device.setId(UUID.randomUUID().toString());
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId("tok");
device.setCredentials(credentials);
target = new TransportMonitoringTarget();
target.setBaseUrl("http://localhost:8080");
target.setDevice(device);
entityService = mock(MonitoringEntityService.class);
doNothing().when(entityService).checkEntities(config, target);
}
@Test
void initializePassesWhenRpcDisabled() {
StubTransportHealthChecker checker = newChecker(5000);
assertThatCode(checker::initializePublic).doesNotThrowAnyException();
}
@Test
void initializePassesWhenRpcTimeoutBelowRest() {
enableRpc(2000);
StubTransportHealthChecker checker = newChecker(5000);
assertThatCode(checker::initializePublic).doesNotThrowAnyException();
}
@Test
void initializeFailsWhenRpcTimeoutEqualsRest() {
enableRpc(5000);
StubTransportHealthChecker checker = newChecker(5000);
assertThatThrownBy(checker::initializePublic)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RPC request timeout")
.hasMessageContaining("must be <");
}
@Test
void initializeFailsWhenRpcTimeoutExceedsRest() {
enableRpc(7000);
StubTransportHealthChecker checker = newChecker(5000);
assertThatThrownBy(checker::initializePublic)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RPC request timeout (7000 ms)");
}
@Test
void initializeFallsBackToTransportTimeoutAndStillValidates() {
enableRpc(null);
StubTransportHealthChecker checker = newChecker(2000);
assertThatThrownBy(checker::initializePublic)
.isInstanceOf(IllegalStateException.class);
}
private StubTransportHealthChecker newChecker(int restTimeoutMs) {
StubTransportHealthChecker checker = new StubTransportHealthChecker(config, target);
ReflectionTestUtils.setField(checker, "entityService", entityService);
ReflectionTestUtils.setField(checker, "restRequestTimeoutMs", restTimeoutMs);
return checker;
}
private void enableRpc(Integer rpcTimeoutMs) {
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
rpc.setRequestTimeoutMs(rpcTimeoutMs);
target.setRpc(rpc);
}
private static final class StubTransportHealthChecker extends TransportHealthChecker<HttpTransportMonitoringConfig> {
StubTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) {
super(config, target);
}
void initializePublic() {
initialize();
}
@Override protected TransportType getTransportType() { return TransportType.HTTP; }
@Override protected void initClient() {}
@Override protected void sendTestPayload(String payload) {}
@Override protected void destroyClient() {}
}
}

243
monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/CoapTransportHealthCheckerRpcTest.java

@ -0,0 +1,243 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapHandler;
import org.eclipse.californium.core.CoapObserveRelation;
import org.eclipse.californium.core.CoapResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.client.ResourceAccessException;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.RpcCheckConfig;
import org.thingsboard.monitoring.config.transport.RpcInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.net.SocketTimeoutException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class CoapTransportHealthCheckerRpcTest {
private CoapTransportMonitoringConfig config;
private TransportMonitoringTarget target;
private TbClient tbClient;
private CoapClient mockRpcCoapClient;
private CoapObserveRelation mockObserveRelation;
private TestableCoapChecker checker;
@BeforeEach
void setUp() {
config = new CoapTransportMonitoringConfig();
ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 4000, int.class);
DeviceConfig device = new DeviceConfig();
device.setId(UUID.randomUUID().toString());
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId("token-coap");
device.setCredentials(credentials);
target = new TransportMonitoringTarget();
target.setBaseUrl("coap://localhost");
target.setDevice(device);
tbClient = mock(TbClient.class);
mockRpcCoapClient = mock(CoapClient.class);
mockObserveRelation = mock(CoapObserveRelation.class);
lenient().when(mockRpcCoapClient.observe(any(CoapHandler.class))).thenReturn(mockObserveRelation);
checker = new TestableCoapChecker(config, target);
// pre-set both clients so initClient() does not call new CoapClient(...)
ReflectionTestUtils.setField(checker, "coapClient", mock(CoapClient.class));
checker.rpcCoapClient = mockRpcCoapClient;
ReflectionTestUtils.setField(checker, "tbClient", tbClient);
ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class));
}
@Test
void doRpcCheckHappyPath() throws Exception {
enableRpc();
AtomicReference<JsonNode> capturedBody = new AtomicReference<>();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenAnswer(inv -> {
JsonNode b = inv.getArgument(1);
capturedBody.set(b);
return b.get("params");
});
checker.doRpcCheck();
assertThat(capturedBody.get().get("method").asText()).isEqualTo("monitoringCheck");
}
@Test
void doRpcCheckValueMismatchUsesRpcKey() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(JacksonUtil.newObjectNode().put("value", "wrong"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.extracting(t -> ((ServiceFailureException) t).getServiceKey())
.satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX));
}
@Test
void doRpcCheckRestExceptionWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new RuntimeException("400 Bad Request"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasCauseInstanceOf(RuntimeException.class);
}
@Test
void doRpcCheckTimeoutWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException()));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("read timed out");
}
@Test
void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception {
checker.doRpcCheck();
verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any());
}
@Test
void initClientStartsObserveWhenRpcEnabled() throws Exception {
enableRpc();
checker.initClient();
verify(mockRpcCoapClient).observe(any(CoapHandler.class));
assertThat(checker.rpcObserveRelation).isSameAs(mockObserveRelation);
}
@Test
void initClientSkipsObserveWhenRpcDisabled() throws Exception {
checker.initClient();
verify(mockRpcCoapClient, never()).observe(any(CoapHandler.class));
}
@Test
void initClientObserveIsIdempotent() throws Exception {
enableRpc();
checker.initClient();
checker.initClient();
verify(mockRpcCoapClient, org.mockito.Mockito.times(1)).observe(any(CoapHandler.class));
}
@Test
void destroyClientCancelsObserveAndShutsDownRpcClient() throws Exception {
enableRpc();
checker.initClient();
checker.destroyClient();
verify(mockObserveRelation).proactiveCancel();
verify(mockRpcCoapClient).shutdown();
assertThat(checker.rpcObserveRelation).isNull();
assertThat(checker.rpcCoapClient).isNull();
}
@Test
void handleRpcNotificationEchoesParamsBackToResponseEndpoint() {
enableRpc();
CoapResponse response = mock(CoapResponse.class);
when(response.getResponseText()).thenReturn(
"{\"id\":42,\"method\":\"monitoringCheck\",\"params\":{\"value\":\"uuid-42\"}}");
checker.handleRpcNotification(response);
assertThat(checker.capturedResponseUri).isEqualTo("coap://localhost/api/v1/token-coap/rpc/42");
JsonNode echoed = JacksonUtil.toJsonNode(checker.capturedResponsePayload);
assertThat(echoed.get("value").asText()).isEqualTo("uuid-42");
}
@Test
void handleRpcNotificationIgnoresEmptyResponse() {
enableRpc();
CoapResponse response = mock(CoapResponse.class);
when(response.getResponseText()).thenReturn("");
checker.handleRpcNotification(response);
assertThat(checker.capturedResponseUri).isNull();
}
@Test
void handleRpcNotificationIgnoresMalformedRpcWithoutId() {
enableRpc();
CoapResponse response = mock(CoapResponse.class);
when(response.getResponseText()).thenReturn("{\"method\":\"x\"}");
checker.handleRpcNotification(response);
assertThat(checker.capturedResponseUri).isNull();
}
private void enableRpc() {
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
target.setRpc(rpc);
}
private static final class TestableCoapChecker extends CoapTransportHealthChecker {
String capturedResponseUri;
String capturedResponsePayload;
TestableCoapChecker(CoapTransportMonitoringConfig config, TransportMonitoringTarget target) {
super(config, target);
}
@Override
void postRpcResponse(String uri, String payload) {
capturedResponseUri = uri;
capturedResponsePayload = payload;
}
}
}

316
monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/HttpTransportHealthCheckerRpcTest.java

@ -0,0 +1,316 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.RpcCheckConfig;
import org.thingsboard.monitoring.config.transport.RpcInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.net.SocketTimeoutException;
import java.util.UUID;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class HttpTransportHealthCheckerRpcTest {
private HttpTransportMonitoringConfig config;
private TransportMonitoringTarget target;
private HttpTransportHealthChecker checker;
private TbClient tbClient;
private RestTemplate restTemplate;
@BeforeEach
void setUp() {
config = new HttpTransportMonitoringConfig();
ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 4000, int.class);
DeviceConfig device = new DeviceConfig();
device.setId(UUID.randomUUID().toString());
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId("token-abc");
device.setCredentials(credentials);
target = new TransportMonitoringTarget();
target.setBaseUrl("http://localhost:8080");
target.setDevice(device);
tbClient = mock(TbClient.class);
restTemplate = mock(RestTemplate.class);
checker = new HttpTransportHealthChecker(config, target);
ReflectionTestUtils.setField(checker, "restTemplate", restTemplate);
ReflectionTestUtils.setField(checker, "tbClient", tbClient);
ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class));
}
@Test
void doRpcCheckHappyPath() throws Exception {
enableRpc();
ArgumentCaptor<JsonNode> bodyCaptor = ArgumentCaptor.forClass(JsonNode.class);
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture()))
.thenAnswer(inv -> bodyCaptor.getValue().get("params"));
checker.doRpcCheck();
assertThat(bodyCaptor.getValue().get("method").asText()).isEqualTo("monitoringCheck");
}
@Test
void doRpcCheckValueMismatchUsesRpcKey() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(JacksonUtil.newObjectNode().put("value", "wrong"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.extracting(t -> ((ServiceFailureException) t).getServiceKey())
.satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX));
}
@Test
void doRpcCheckRestExceptionWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new RuntimeException("400 Bad Request"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasCauseInstanceOf(RuntimeException.class);
}
@Test
void doRpcCheckTimeoutWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException()));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("read timed out");
}
@Test
void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception {
checker.doRpcCheck();
verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any());
}
@Test
void initClientStartsPollingWhenRpcEnabled() throws Exception {
enableRpc();
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(new ResponseEntity<>(null, HttpStatus.OK));
checker.initClient();
try {
verify(restTemplate, timeout(2000).atLeastOnce())
.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class));
} finally {
checker.destroyClient();
}
}
@Test
void initClientDoesNotStartPollingWhenRpcDisabled() throws Exception {
checker.initClient();
// Mockito#after waits for the window then asserts no invocation occurred —
// deterministic on slow CI runners, unlike Thread.sleep + verify(never()).
verify(restTemplate, after(200).never()).getForEntity(any(String.class), eq(JsonNode.class));
Future<?> future = (Future<?>) ReflectionTestUtils.getField(checker, "rpcPollFuture");
assertThat(future).isNull();
}
@Test
void initClientIsIdempotent() throws Exception {
enableRpc();
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(new ResponseEntity<>(null, HttpStatus.OK));
checker.initClient();
Future<?> first = (Future<?>) ReflectionTestUtils.getField(checker, "rpcPollFuture");
checker.initClient();
Future<?> second = (Future<?>) ReflectionTestUtils.getField(checker, "rpcPollFuture");
assertThat(second).isSameAs(first);
checker.destroyClient();
}
@Test
void destroyClientStopsPolling() throws Exception {
enableRpc();
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(new ResponseEntity<>(null, HttpStatus.OK));
checker.initClient();
verify(restTemplate, timeout(2000).atLeastOnce())
.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class));
checker.destroyClient();
clearInvocations(restTemplate);
// Wait the window and verify no further polls fire — replaces brittle
// Thread.sleep + verifyNoMoreInteractions which races on slow runners.
verify(restTemplate, after(200).never())
.getForEntity(any(String.class), eq(JsonNode.class));
assertThat(ReflectionTestUtils.getField(checker, "rpcPollFuture")).isNull();
assertThat(ReflectionTestUtils.getField(checker, "rpcPoller")).isNull();
}
@Test
void initClientReschedulesAfterPollFutureDies() throws Exception {
enableRpc();
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(new ResponseEntity<>(null, HttpStatus.OK));
checker.initClient();
try {
Future<?> first = (Future<?>) ReflectionTestUtils.getField(checker, "rpcPollFuture");
assertThat(first).isNotNull();
first.cancel(true);
verify(restTemplate, timeout(2000).atLeastOnce())
.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class));
checker.initClient();
Future<?> second = (Future<?>) ReflectionTestUtils.getField(checker, "rpcPollFuture");
assertThat(second).isNotNull().isNotSameAs(first);
assertThat(second.isDone()).isFalse();
} finally {
checker.destroyClient();
}
}
@Test
void pollOnceEchoesParamsWhenRpcArrives() throws Exception {
ObjectNode rpc = JacksonUtil.newObjectNode();
rpc.put("id", 99L);
rpc.put("method", "monitoringCheck");
rpc.set("params", JacksonUtil.newObjectNode().put("value", "uuid-99"));
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(ResponseEntity.ok(rpc));
checker.pollOnce();
ArgumentCaptor<JsonNode> bodyCaptor = ArgumentCaptor.forClass(JsonNode.class);
verify(restTemplate).postForLocation(eq("http://localhost:8080/api/v1/token-abc/rpc/99"), bodyCaptor.capture());
assertThat(bodyCaptor.getValue().get("value").asText()).isEqualTo("uuid-99");
}
@Test
void pollOncePostsEmptyBodyWhenParamsMissing() throws Exception {
ObjectNode rpc = JacksonUtil.newObjectNode();
rpc.put("id", 7L);
rpc.put("method", "monitoringCheck");
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(ResponseEntity.ok(rpc));
checker.pollOnce();
ArgumentCaptor<JsonNode> bodyCaptor = ArgumentCaptor.forClass(JsonNode.class);
verify(restTemplate).postForLocation(eq("http://localhost:8080/api/v1/token-abc/rpc/7"), bodyCaptor.capture());
assertThat(bodyCaptor.getValue()).isNotNull();
assertThat(bodyCaptor.getValue().isObject()).isTrue();
}
@Test
void pollOnceIgnoresNonNumericId() throws Exception {
ObjectNode rpc = JacksonUtil.newObjectNode();
rpc.put("id", "abc");
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(ResponseEntity.ok(rpc));
checker.pollOnce();
verify(restTemplate, never()).postForLocation(any(String.class), any());
}
@Test
void pollOnceIsSilentOn408() throws Exception {
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenThrow(HttpClientErrorException.create(HttpStatus.REQUEST_TIMEOUT, "timeout", null, null, null));
checker.pollOnce();
verify(restTemplate, never()).postForLocation(any(String.class), any());
}
@Test
void pollOnceSwallowsBodylessOk() throws Exception {
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenReturn(new ResponseEntity<>(null, HttpStatus.OK));
checker.pollOnce();
verify(restTemplate, never()).postForLocation(any(String.class), any());
}
@Test
void pollTaskBackoffDoesNotKillScheduler() throws Exception {
enableRpc();
when(restTemplate.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class)))
.thenThrow(new RuntimeException("boom"))
.thenThrow(new RuntimeException("boom"))
.thenReturn(new ResponseEntity<>(null, HttpStatus.OK));
checker.initClient();
try {
verify(restTemplate, timeout(5000).atLeast(3))
.getForEntity(contains("/rpc?timeout="), eq(JsonNode.class));
} finally {
checker.destroyClient();
}
}
private void enableRpc() {
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
target.setRpc(rpc);
}
}

168
monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/Lwm2mTransportHealthCheckerRpcTest.java

@ -0,0 +1,168 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.client.ResourceAccessException;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.RpcCheckConfig;
import org.thingsboard.monitoring.config.transport.RpcInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.net.SocketTimeoutException;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class Lwm2mTransportHealthCheckerRpcTest {
private Lwm2mTransportMonitoringConfig config;
private TransportMonitoringTarget target;
private Lwm2mTransportHealthChecker checker;
private TbClient tbClient;
@BeforeEach
void setUp() {
config = new Lwm2mTransportMonitoringConfig();
ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 6000, int.class);
DeviceConfig device = new DeviceConfig();
device.setId(UUID.randomUUID().toString());
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId("endpoint-1");
device.setCredentials(credentials);
target = new TransportMonitoringTarget();
target.setBaseUrl("coap://localhost:5685");
target.setDevice(device);
tbClient = mock(TbClient.class);
checker = new Lwm2mTransportHealthChecker(config, target);
ReflectionTestUtils.setField(checker, "tbClient", tbClient);
ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class));
}
@Test
void doRpcCheckHappyPathReadsManufacturerResource() throws Exception {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(new TextNode("Thingsboard"));
ArgumentCaptor<JsonNode> bodyCaptor = ArgumentCaptor.forClass(JsonNode.class);
checker.doRpcCheck();
verify(tbClient).handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture());
JsonNode body = bodyCaptor.getValue();
assertThat(body.get("method").asText()).isEqualTo("Read");
assertThat(body.get("params").get("key").asText()).isEqualTo("/3/0/0");
assertThat(body.get("timeout").asInt()).isEqualTo(6000);
}
@Test
void doRpcCheckBlankResponseFailsWithRpcKey() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(new TextNode(""));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("blank result")
.extracting(t -> ((ServiceFailureException) t).getServiceKey())
.satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX));
}
@Test
void doRpcCheckNullResponseFailsWithRpcKey() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(null);
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.extracting(t -> ((ServiceFailureException) t).getServiceKey())
.satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX));
}
@Test
void doRpcCheckRestExceptionWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new RuntimeException("400 Bad Request"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasCauseInstanceOf(RuntimeException.class);
}
@Test
void doRpcCheckTimeoutWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException()));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("read timed out");
}
@Test
void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception {
checker.doRpcCheck();
verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any());
}
@Test
void doRpcCheckUsesPerTargetTimeoutOverride() throws Exception {
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
rpc.setRequestTimeoutMs(12000);
target.setRpc(rpc);
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(new TextNode("ok"));
ArgumentCaptor<JsonNode> bodyCaptor = ArgumentCaptor.forClass(JsonNode.class);
checker.doRpcCheck();
verify(tbClient).handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture());
assertThat(bodyCaptor.getValue().get("timeout").asInt()).isEqualTo(12000);
}
private void enableRpc() {
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
target.setRpc(rpc);
}
}

205
monitoring/src/test/java/org/thingsboard/monitoring/service/transport/impl/MqttTransportHealthCheckerRpcTest.java

@ -0,0 +1,205 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service.transport.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.client.ResourceAccessException;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.MqttTransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.RpcCheckConfig;
import org.thingsboard.monitoring.config.transport.RpcInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.MonitoringReporter;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class MqttTransportHealthCheckerRpcTest {
private MqttTransportMonitoringConfig config;
private TransportMonitoringTarget target;
private MqttTransportHealthChecker checker;
private TbClient tbClient;
private MqttClient mqttClient;
@BeforeEach
void setUp() throws Exception {
config = new MqttTransportMonitoringConfig();
ReflectionTestUtils.setField(config, "qos", 1);
ReflectionTestUtils.invokeSetterMethod(config, "setRequestTimeoutMs", 4000, int.class);
DeviceConfig device = new DeviceConfig();
device.setId(UUID.randomUUID().toString());
device.setName("test-device");
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId("token-123");
device.setCredentials(credentials);
target = new TransportMonitoringTarget();
target.setBaseUrl("tcp://localhost:1883");
target.setDevice(device);
tbClient = mock(TbClient.class);
mqttClient = mock(MqttClient.class);
lenient().when(mqttClient.isConnected()).thenReturn(true);
checker = new MqttTransportHealthChecker(config, target);
checker.mqttClient = mqttClient;
ReflectionTestUtils.setField(checker, "tbClient", tbClient);
ReflectionTestUtils.setField(checker, "reporter", mock(MonitoringReporter.class));
}
@Test
void doRpcCheckHappyPath() throws Exception {
enableRpc();
ArgumentCaptor<JsonNode> bodyCaptor = ArgumentCaptor.forClass(JsonNode.class);
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), bodyCaptor.capture()))
.thenAnswer(invocation -> {
JsonNode params = bodyCaptor.getValue().get("params");
return params;
});
checker.doRpcCheck();
JsonNode sent = bodyCaptor.getValue();
assertThat(sent.get("method").asText()).isEqualTo("monitoringCheck");
assertThat(sent.get("timeout").asInt()).isEqualTo(4000);
assertThat(sent.get("params").get("value").asText()).isNotBlank();
}
@Test
void doRpcCheckValueMismatchThrowsServiceFailureWithRpcKey() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenReturn(JacksonUtil.newObjectNode().put("value", "wrong"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("RPC echo mismatch")
.extracting(t -> ((ServiceFailureException) t).getServiceKey())
.satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX));
}
@Test
void doRpcCheckRestExceptionWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new RuntimeException("400 Bad Request"));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasCauseInstanceOf(RuntimeException.class)
.extracting(t -> ((ServiceFailureException) t).getServiceKey())
.satisfies(key -> assertThat(key.toString()).endsWith(RpcInfo.RPC_SUFFIX));
}
@Test
void doRpcCheckTimeoutWraps() {
enableRpc();
when(tbClient.handleTwoWayDeviceRPCRequest(any(DeviceId.class), any(JsonNode.class)))
.thenThrow(new ResourceAccessException("read timed out", new SocketTimeoutException()));
assertThatThrownBy(() -> checker.doRpcCheck())
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("read timed out");
}
@Test
void doRpcCheckIsNoOpWhenRpcDisabled() throws Exception {
// target.rpc not set → isRpcEnabled() == false
checker.doRpcCheck();
verify(tbClient, never()).handleTwoWayDeviceRPCRequest(any(), any());
}
@Test
void initClientSubscribesToRpcRequestTopicWhenRpcEnabled() throws Exception {
enableRpc();
checker.initClient();
verify(mqttClient).subscribe(eq("v1/devices/me/rpc/request/+"), eq(1), any(IMqttMessageListener.class));
}
@Test
void initClientSkipsSubscriptionWhenRpcDisabled() throws Exception {
checker.initClient();
verify(mqttClient, never()).subscribe(anyString(), anyInt(), any(IMqttMessageListener.class));
}
@Test
void initClientSubscribesOnlyOnce() throws Exception {
enableRpc();
checker.initClient();
checker.initClient();
verify(mqttClient, org.mockito.Mockito.times(1))
.subscribe(eq("v1/devices/me/rpc/request/+"), eq(1), any(IMqttMessageListener.class));
}
@Test
void echoRpcRequestPublishesParamsBackToResponseTopic() throws Exception {
enableRpc();
ObjectNode body = JacksonUtil.newObjectNode();
body.put("method", "monitoringCheck");
ObjectNode params = JacksonUtil.newObjectNode().put("value", "uuid-42");
body.set("params", params);
MqttMessage incoming = new MqttMessage(JacksonUtil.toString(body).getBytes(StandardCharsets.UTF_8));
checker.echoRpcRequest("v1/devices/me/rpc/request/77", incoming);
ArgumentCaptor<MqttMessage> publishedCaptor = ArgumentCaptor.forClass(MqttMessage.class);
verify(mqttClient).publish(eq("v1/devices/me/rpc/response/77"), publishedCaptor.capture());
JsonNode echoed = JacksonUtil.toJsonNode(new String(publishedCaptor.getValue().getPayload(), StandardCharsets.UTF_8));
assertThat(echoed.get("value").asText()).isEqualTo("uuid-42");
}
private void enableRpc() {
RpcCheckConfig rpc = new RpcCheckConfig();
rpc.setEnabled(true);
target.setRpc(rpc);
}
}
Loading…
Cancel
Save