From cc35e71e553c8ed7b6701ae524f89e810d250e35 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 5 Mar 2026 17:16:20 +0100 Subject: [PATCH] fix: await RPC re-queuing after session close on delivery timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the MQTT transport closes the session on RPC delivery timeout, the server re-queues the RPC asynchronously. The test was asserting the RPC status immediately after the client disconnect latch fired, before the server-side re-queuing had completed — so the status was still SENT instead of the expected QUEUED. Replace the direct doGet assertion with an Awaitility poll that waits up to DEFAULT_WAIT_TIMEOUT_SECONDS for the status to become QUEUED. Fixes flaky: MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest Co-Authored-By: Claude Sonnet 4.6 --- ...nsportSessionOnRpcDeliveryTimeoutIntegrationTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java index 23aa5133c5..1c9aa40a40 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; @@ -97,7 +98,13 @@ public class MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest exte callback.getDisconnectLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getReturnCode()).isEqualTo(MqttReturnCode.RETURN_CODE_ADMINISTRITIVE_ACTION); - Rpc persistedRpc = doGet("/api/rpc/persistent/" + response.get("rpcId").asText(), Rpc.class); + // The server re-queues the RPC asynchronously after closing the session. + // Poll until the status transitions from SENT to QUEUED. + String rpcId = response.get("rpcId").asText(); + Rpc persistedRpc = await("RPC re-queued after session close") + .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .until(() -> doGet("/api/rpc/persistent/" + rpcId, Rpc.class), + rpc -> RpcStatus.QUEUED.equals(rpc.getStatus())); assertThat(persistedRpc).isNotNull(); assertThat(persistedRpc.getStatus()).isEqualTo(RpcStatus.QUEUED); assertThat(persistedRpc.getResponse()).isInstanceOf(NullNode.class);