diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java index 23aa5133c5..1c9aa40a40 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; @@ -97,7 +98,13 @@ public class MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest exte callback.getDisconnectLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getReturnCode()).isEqualTo(MqttReturnCode.RETURN_CODE_ADMINISTRITIVE_ACTION); - Rpc persistedRpc = doGet("/api/rpc/persistent/" + response.get("rpcId").asText(), Rpc.class); + // The server re-queues the RPC asynchronously after closing the session. + // Poll until the status transitions from SENT to QUEUED. + String rpcId = response.get("rpcId").asText(); + Rpc persistedRpc = await("RPC re-queued after session close") + .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .until(() -> doGet("/api/rpc/persistent/" + rpcId, Rpc.class), + rpc -> RpcStatus.QUEUED.equals(rpc.getStatus())); assertThat(persistedRpc).isNotNull(); assertThat(persistedRpc.getStatus()).isEqualTo(RpcStatus.QUEUED); assertThat(persistedRpc.getResponse()).isInstanceOf(NullNode.class);