|
|
|
@ -26,6 +26,7 @@ import io.netty.buffer.Unpooled; |
|
|
|
import io.netty.handler.codec.mqtt.MqttQoS; |
|
|
|
import lombok.Data; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.awaitility.Awaitility; |
|
|
|
import org.testng.annotations.AfterMethod; |
|
|
|
import org.testng.annotations.BeforeMethod; |
|
|
|
import org.testng.annotations.Test; |
|
|
|
@ -39,9 +40,11 @@ import org.thingsboard.server.common.data.Device; |
|
|
|
import org.thingsboard.server.common.data.DeviceProfile; |
|
|
|
import org.thingsboard.server.common.data.DeviceProfileProvisionType; |
|
|
|
import org.thingsboard.server.common.data.StringUtils; |
|
|
|
import org.thingsboard.server.common.data.id.RpcId; |
|
|
|
import org.thingsboard.server.common.data.id.RuleChainId; |
|
|
|
import org.thingsboard.server.common.data.page.PageData; |
|
|
|
import org.thingsboard.server.common.data.page.PageLink; |
|
|
|
import org.thingsboard.server.common.data.rpc.Rpc; |
|
|
|
import org.thingsboard.server.common.data.rule.NodeConnectionInfo; |
|
|
|
import org.thingsboard.server.common.data.rule.RuleChain; |
|
|
|
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|
|
|
@ -59,6 +62,7 @@ import java.util.Arrays; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Random; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.ArrayBlockingQueue; |
|
|
|
import java.util.concurrent.BlockingQueue; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
@ -66,6 +70,7 @@ import java.util.concurrent.Executors; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
import static org.testng.Assert.assertNotNull; |
|
|
|
import static org.testng.Assert.fail; |
|
|
|
import static org.thingsboard.server.common.data.DataConstants.DEVICE; |
|
|
|
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; |
|
|
|
@ -293,6 +298,60 @@ public class MqttClientTest extends AbstractContainerTest { |
|
|
|
assertThat(serverResponse).isEqualTo(mapper.readTree(clientResponse.toString())); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void serverSidePersistedRpc() throws Exception { |
|
|
|
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); |
|
|
|
|
|
|
|
MqttMessageListener listener = new MqttMessageListener(); |
|
|
|
MqttClient mqttClient = getMqttClient(deviceCredentials, listener); |
|
|
|
mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); |
|
|
|
|
|
|
|
// Wait until subscription is processed
|
|
|
|
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); |
|
|
|
|
|
|
|
// Send an RPC from the server
|
|
|
|
JsonObject serverRpcPayload = new JsonObject(); |
|
|
|
serverRpcPayload.addProperty("method", "getValue"); |
|
|
|
serverRpcPayload.addProperty("params", true); |
|
|
|
serverRpcPayload.addProperty("persistent", true); |
|
|
|
|
|
|
|
JsonNode persistentRpcId = testRestClient.postServerSideRpc(device.getId(), mapper.readTree(serverRpcPayload.toString())); |
|
|
|
|
|
|
|
assertNotNull(persistentRpcId); |
|
|
|
|
|
|
|
RpcId rpcId = new RpcId(UUID.fromString(persistentRpcId.get("rpcId").asText())); |
|
|
|
|
|
|
|
// Wait for RPC call from the server and send the response
|
|
|
|
MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); |
|
|
|
|
|
|
|
assertThat(Objects.requireNonNull(requestFromServer).getMessage()).isEqualTo("{\"method\":\"getValue\",\"params\":true}"); |
|
|
|
|
|
|
|
Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length())); |
|
|
|
JsonObject clientResponse = new JsonObject(); |
|
|
|
clientResponse.addProperty("response", "someResponse"); |
|
|
|
// Send a response to the server's RPC request
|
|
|
|
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); |
|
|
|
|
|
|
|
PageLink pageLink = new PageLink(10); |
|
|
|
|
|
|
|
Awaitility.await() |
|
|
|
.pollInterval(500, TimeUnit.MILLISECONDS) |
|
|
|
.atMost(5 * timeoutMultiplier, TimeUnit.SECONDS) |
|
|
|
.until(() -> { |
|
|
|
PageData<Rpc> rpcByDevice = testRestClient.getPersistedRpcByDevice(device.getId(), pageLink); |
|
|
|
for (Rpc rpc : rpcByDevice.getData()) { |
|
|
|
if (rpc.getId().equals(rpcId)) { |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
return false; |
|
|
|
}); |
|
|
|
|
|
|
|
Rpc persistentRpc = testRestClient.getPersistedRpc(rpcId); |
|
|
|
|
|
|
|
assertThat(persistentRpc.getResponse()).isEqualTo(mapper.readTree(clientResponse.toString())); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void clientSideRpc() throws Exception { |
|
|
|
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); |
|
|
|
|