Browse Source

Merge pull request #15161 from smatvienko-tb/fix/mqtt-rpc-delivery-timeout-requeue-race

fix: Mqtt test await RPC re-queuing after session close on delivery timeout
pull/15180/head
Viacheslav Klimov 3 months ago
committed by GitHub
parent
commit
c4bfb397d5
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 9
      application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java

9
application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java

@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
@ -97,7 +98,13 @@ public class MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest exte
callback.getDisconnectLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(callback.getReturnCode()).isEqualTo(MqttReturnCode.RETURN_CODE_ADMINISTRITIVE_ACTION);
Rpc persistedRpc = doGet("/api/rpc/persistent/" + response.get("rpcId").asText(), Rpc.class);
// The server re-queues the RPC asynchronously after closing the session.
// Poll until the status transitions from SENT to QUEUED.
String rpcId = response.get("rpcId").asText();
Rpc persistedRpc = await("RPC re-queued after session close")
.atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.until(() -> doGet("/api/rpc/persistent/" + rpcId, Rpc.class),
rpc -> RpcStatus.QUEUED.equals(rpc.getStatus()));
assertThat(persistedRpc).isNotNull();
assertThat(persistedRpc.getStatus()).isEqualTo(RpcStatus.QUEUED);
assertThat(persistedRpc.getResponse()).isInstanceOf(NullNode.class);

Loading…
Cancel
Save