diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java index deb0574704..9fbdff3fa5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java @@ -52,6 +52,7 @@ import static org.junit.Assert.assertNotNull; @TestPropertySource(properties = { "transport.mqtt.enabled=true", + "js.evaluator=mock", }) @Slf4j public abstract class AbstractMqttIntegrationTest extends AbstractTransportIntegrationTest { @@ -83,22 +84,6 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg } } - protected MqttAsyncClient getMqttAsyncClient(String accessToken) throws MqttException { - String clientId = MqttAsyncClient.generateClientId(); - MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence()); - - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(accessToken); - client.connect(options).waitForCompletion(); - return client; - } - - protected void publishMqttMsg(MqttAsyncClient client, byte[] payload, String topic) throws MqttException { - MqttMessage message = new MqttMessage(); - message.setPayload(payload); - client.publish(topic, message); - } - protected DeviceProfile createMqttDeviceProfile(MqttTestConfigProperties config) throws Exception { TransportPayloadType transportPayloadType = config.getTransportPayloadType(); if (transportPayloadType == null) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java index c8c5b2560c..18815d1c38 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java @@ -28,22 +28,27 @@ import java.util.concurrent.CountDownLatch; @Data public class MqttTestCallback implements MqttCallback { - private final CountDownLatch subscribeLatch; - private final CountDownLatch deliveryLatch; - private int qoS; - private byte[] payloadBytes; - private String awaitSubTopic; - private boolean pubAckReceived; + protected CountDownLatch subscribeLatch; + protected final CountDownLatch deliveryLatch; + protected int qoS; + protected byte[] payloadBytes; + protected String awaitSubTopic; + protected boolean pubAckReceived; - public MqttTestCallback(String awaitSubTopic) { + public MqttTestCallback() { this.subscribeLatch = new CountDownLatch(1); this.deliveryLatch = new CountDownLatch(1); - this.awaitSubTopic = awaitSubTopic; } - public MqttTestCallback() { + public MqttTestCallback(int subscribeCount) { + this.subscribeLatch = new CountDownLatch(subscribeCount); + this.deliveryLatch = new CountDownLatch(1); + } + + public MqttTestCallback(String awaitSubTopic) { this.subscribeLatch = new CountDownLatch(1); this.deliveryLatch = new CountDownLatch(1); + this.awaitSubTopic = awaitSubTopic; } @Override @@ -60,12 +65,16 @@ public class MqttTestCallback implements MqttCallback { payloadBytes = mqttMessage.getPayload(); subscribeLatch.countDown(); } else { - log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); - if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); - payloadBytes = mqttMessage.getPayload(); - subscribeLatch.countDown(); - } + messageArrivedOnAwaitSubTopic(requestTopic, mqttMessage); + } + } + + protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); + if (awaitSubTopic.equals(requestTopic)) { + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + subscribeLatch.countDown(); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java index 8afa41b160..96846f8678 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java @@ -104,6 +104,14 @@ public class MqttTestClient { return client.subscribe(topic, qoS.value()); } + public void enableManualAcks() { + client.setManualAcks(true); + } + + public void messageArrivedComplete(MqttMessage mqttMessage) throws MqttException { + client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); + } + private MqttAsyncClient createClient(String clientId) throws MqttException { if (StringUtils.isEmpty(clientId)) { clientId = MqttAsyncClient.generateClientId(); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java index 261f904e2d..556af34371 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java @@ -60,9 +60,6 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEW import static org.thingsboard.server.common.data.query.EntityKeyType.CLIENT_ATTRIBUTE; import static org.thingsboard.server.common.data.query.EntityKeyType.SHARED_ATTRIBUTE; -@TestPropertySource(properties = { - "js.evaluator=mock", -}) @Slf4j public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqttIntegrationTest { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 33256bbf2f..0abd0f1d6c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -26,33 +26,34 @@ import com.squareup.wire.schema.internal.parser.ProtoFileElement; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.junit.Assert; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.MqttTestClient; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.BASE_DEVICE_API_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.BASE_DEVICE_API_TOPIC_V2; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_RPC_TOPIC; @Slf4j public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractMqttIntegrationTest { @@ -76,81 +77,93 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM protected static final Long asyncContextTimeoutToUseRpcPlugin = 10000L; protected void processOneWayRpcTest(String rpcSubTopic) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(accessToken); - - CountDownLatch latch = new CountDownLatch(1); - TestOneWayMqttCallback callback = new TestOneWayMqttCallback(client, latch); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + MqttTestCallback callback = new MqttTestCallback(rpcSubTopic.replace("+", "0")); client.setCallback(callback); - - client.subscribe(rpcSubTopic, MqttQoS.AT_MOST_ONCE.value()); - - Thread.sleep(1000); + client.subscribeAndWait(rpcSubTopic, MqttQoS.AT_MOST_ONCE); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; - String deviceId = savedDevice.getId().getId().toString(); - String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); - Assert.assertTrue(StringUtils.isEmpty(result)); - latch.await(3, TimeUnit.SECONDS); + String result = doPostAsync("/api/rpc/oneway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk()); + assertTrue(StringUtils.isEmpty(result)); + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + DeviceTransportType deviceTransportType = deviceProfile.getTransportType(); + if (deviceTransportType.equals(DeviceTransportType.MQTT)) { + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); + assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); + MqttDeviceProfileTransportConfiguration configuration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; + TransportPayloadType transportPayloadType = configuration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); + if (transportPayloadType.equals(TransportPayloadType.PROTOBUF)) { + // TODO: add correct validation of proto requests to device + assertTrue(callback.getPayloadBytes().length > 0); + } else { + assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); + } + } else { + assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); + } assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + client.disconnect(); } protected void processJsonOneWayRpcTestGateway(String deviceName) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); String payload = "{\"device\":\"" + deviceName + "\"}"; byte[] payloadBytes = payload.getBytes(); validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes); + client.disconnect(); } protected void processJsonTwoWayRpcTest(String rpcSubTopic) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(accessToken); - client.subscribe(rpcSubTopic, 1); - - CountDownLatch latch = new CountDownLatch(1); - TestJsonMqttCallback callback = new TestJsonMqttCallback(client, latch); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + client.subscribeAndWait(rpcSubTopic, MqttQoS.AT_LEAST_ONCE); + MqttTestRpcJsonCallback callback = new MqttTestRpcJsonCallback(client, rpcSubTopic.replace("+", "0")); client.setCallback(callback); - - Thread.sleep(1000); - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; - String deviceId = savedDevice.getId().getId().toString(); - - String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - String expected = "{\"value1\":\"A\",\"value2\":\"B\"}"; - latch.await(3, TimeUnit.SECONDS); - Assert.assertEquals(expected, result); + String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk()); + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); + assertEquals("{\"value1\":\"A\",\"value2\":\"B\"}", actualRpcResponse); + client.disconnect(); } protected void processProtoTwoWayRpcTest(String rpcSubTopic) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(accessToken); - client.subscribe(rpcSubTopic, 1); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + client.subscribeAndWait(rpcSubTopic, MqttQoS.AT_LEAST_ONCE); - CountDownLatch latch = new CountDownLatch(1); - TestProtoMqttCallback callback = new TestProtoMqttCallback(client, latch); + MqttTestRpcProtoCallback callback = new MqttTestRpcProtoCallback(client, rpcSubTopic.replace("+", "0")); client.setCallback(callback); - Thread.sleep(1000); - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); - String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - String expected = "{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}"; - latch.await(3, TimeUnit.SECONDS); - Assert.assertEquals(expected, result); + String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + // TODO: add correct validation of proto requests to device + assertTrue(callback.getPayloadBytes().length > 0); + assertEquals("{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}", actualRpcResponse); + client.disconnect(); } protected void processProtoTwoWayRpcTestGateway(String deviceName) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName); byte[] payloadBytes = connectMsgProto.toByteArray(); validateProtoTwoWayRpcGatewayResponse(deviceName, client, payloadBytes); + client.disconnect(); } protected void processProtoOneWayRpcTestGateway(String deviceName) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName); byte[] payloadBytes = connectMsgProto.toByteArray(); validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes); + client.disconnect(); } private TransportApiProtos.ConnectMsg getConnectProto(String deviceName) { @@ -175,29 +188,30 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); } - MqttAsyncClient client = getMqttAsyncClient(accessToken); - client.setManualAcks(true); - CountDownLatch latch = new CountDownLatch(10); - TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + client.enableManualAcks(); + MqttTestSequenceCallback callback = new MqttTestSequenceCallback(client, 10, result); client.setCallback(callback); - client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1); + client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE); - latch.await(10, TimeUnit.SECONDS); - Assert.assertEquals(expected, result); + callback.getSubscribeLatch().await(10, TimeUnit.SECONDS); + assertEquals(expected, result); } protected void processJsonTwoWayRpcTestGateway(String deviceName) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); String payload = "{\"device\":\"" + deviceName + "\"}"; byte[] payloadBytes = payload.getBytes(); validateJsonTwoWayRpcGatewayResponse(deviceName, client, payloadBytes); + client.disconnect(); } - protected void validateOneWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception { - publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); - + protected void validateOneWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { + client.publish(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); Device savedDevice = doExecuteWithRetriesAndInterval( () -> getDeviceByName(deviceName), 20, @@ -205,24 +219,45 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ); assertNotNull(savedDevice); - CountDownLatch latch = new CountDownLatch(1); - TestOneWayMqttCallback callback = new TestOneWayMqttCallback(client, latch); + MqttTestCallback callback = new MqttTestCallback(GATEWAY_RPC_TOPIC); client.setCallback(callback); - - client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value()); - - Thread.sleep(1000); - + client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; String deviceId = savedDevice.getId().getId().toString(); String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); - Assert.assertTrue(StringUtils.isEmpty(result)); - latch.await(3, TimeUnit.SECONDS); + assertTrue(StringUtils.isEmpty(result)); + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + DeviceTransportType deviceTransportType = deviceProfile.getTransportType(); + if (deviceTransportType.equals(DeviceTransportType.MQTT)) { + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); + assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); + MqttDeviceProfileTransportConfiguration configuration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; + TransportPayloadType transportPayloadType = configuration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); + if (transportPayloadType.equals(TransportPayloadType.PROTOBUF)) { + // TODO: add correct validation of proto requests to device + assertTrue(callback.getPayloadBytes().length > 0); + } else { + JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest); + assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes())); + } + } else { + JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest); + assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes())); + } assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); } - protected void validateJsonTwoWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception { - publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); + private JsonNode getExpectedGatewayJsonRequestData(String deviceName, String requestStr) { + ObjectNode deviceData = (ObjectNode) JacksonUtil.toJsonNode(requestStr); + deviceData.put("id", 0); + ObjectNode expectedRequest = JacksonUtil.newObjectNode(); + expectedRequest.put("device", deviceName); + expectedRequest.set("data", deviceData); + return expectedRequest; + } + + protected void validateJsonTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { + client.publish(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); Device savedDevice = doExecuteWithRetriesAndInterval( () -> getDeviceByName(deviceName), @@ -231,25 +266,21 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ); assertNotNull(savedDevice); - CountDownLatch latch = new CountDownLatch(1); - TestJsonMqttCallback callback = new TestJsonMqttCallback(client, latch); + MqttTestRpcJsonCallback callback = new MqttTestRpcJsonCallback(client, GATEWAY_RPC_TOPIC); client.setCallback(callback); - - client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value()); - - Thread.sleep(1000); + client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; String deviceId = savedDevice.getId().getId().toString(); - String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - latch.await(3, TimeUnit.SECONDS); - String expected = "{\"success\":true}"; - assertEquals(expected, result); + String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + log.warn("request payload: {}", JacksonUtil.fromBytes(callback.getPayloadBytes())); + assertEquals("{\"success\":true}", actualRpcResponse); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); } - protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception { - publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); + protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { + client.publish(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); Device savedDevice = doExecuteWithRetriesAndInterval( () -> getDeviceByName(deviceName), @@ -258,20 +289,15 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ); assertNotNull(savedDevice); - CountDownLatch latch = new CountDownLatch(1); - TestProtoMqttCallback callback = new TestProtoMqttCallback(client, latch); + MqttTestRpcProtoCallback callback = new MqttTestRpcProtoCallback(client, GATEWAY_RPC_TOPIC); client.setCallback(callback); - - client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value()); - - Thread.sleep(1000); + client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; String deviceId = savedDevice.getId().getId().toString(); - String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - latch.await(3, TimeUnit.SECONDS); - String expected = "{\"success\":true}"; - assertEquals(expected, result); + String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + assertEquals("{\"success\":true}", actualRpcResponse); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); } @@ -279,132 +305,82 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM return doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class); } - protected MqttMessage processJsonMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { - MqttMessage message = new MqttMessage(); - if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { - message.setPayload(DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8)); + protected byte[] processJsonMessageArrived(String requestTopic, MqttMessage mqttMessage) { + if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { + return DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8); } else { JsonNode requestMsgNode = JacksonUtil.toJsonNode(new String(mqttMessage.getPayload(), StandardCharset.UTF_8)); String deviceName = requestMsgNode.get("device").asText(); int requestId = requestMsgNode.get("data").get("id").asInt(); - message.setPayload(("{\"device\": \"" + deviceName + "\", \"id\": " + requestId + ", \"data\": {\"success\": true}}").getBytes(StandardCharset.UTF_8)); - } - return message; - } - - protected class TestOneWayMqttCallback implements MqttCallback { - - private final MqttAsyncClient client; - private final CountDownLatch latch; - private Integer qoS; - - TestOneWayMqttCallback(MqttAsyncClient client, CountDownLatch latch) { - this.client = client; - this.latch = latch; - } - - int getQoS() { - return qoS; - } - - @Override - public void connectionLost(Throwable throwable) { - } - - @Override - public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { - log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); - qoS = mqttMessage.getQos(); - latch.countDown(); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - + String response = "{\"device\": \"" + deviceName + "\", \"id\": " + requestId + ", \"data\": {\"success\": true}}"; + return response.getBytes(StandardCharset.UTF_8); } } - protected class TestJsonMqttCallback implements MqttCallback { + protected class MqttTestRpcJsonCallback extends MqttTestCallback { - private final MqttAsyncClient client; - private final CountDownLatch latch; - private Integer qoS; + private final MqttTestClient client; - TestJsonMqttCallback(MqttAsyncClient client, CountDownLatch latch) { + public MqttTestRpcJsonCallback(MqttTestClient client, String awaitSubTopic) { + super(awaitSubTopic); this.client = client; - this.latch = latch; - } - - int getQoS() { - return qoS; - } - - @Override - public void connectionLost(Throwable throwable) { } @Override - public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { - log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); - String responseTopic; - if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { - responseTopic = requestTopic.replace("req", "res"); - } else { - responseTopic = requestTopic.replace("request", "response"); + protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); + if (awaitSubTopic.equals(requestTopic)) { + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + String responseTopic; + if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { + responseTopic = requestTopic.replace("req", "res"); + } else { + responseTopic = requestTopic.replace("request", "response"); + } + try { + client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); + } catch (MqttException e) { + log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); + } + subscribeLatch.countDown(); } - qoS = mqttMessage.getQos(); - client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); - latch.countDown(); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - } } - protected class TestProtoMqttCallback implements MqttCallback { + protected class MqttTestRpcProtoCallback extends MqttTestCallback { - private final MqttAsyncClient client; - private final CountDownLatch latch; - private Integer qoS; + private final MqttTestClient client; - TestProtoMqttCallback(MqttAsyncClient client, CountDownLatch latch) { + public MqttTestRpcProtoCallback(MqttTestClient client, String awaitSubTopic) { + super(awaitSubTopic); this.client = client; - this.latch = latch; - } - - int getQoS() { - return qoS; } @Override - public void connectionLost(Throwable throwable) { - } - - @Override - public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { - log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); - String responseTopic; - if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { - responseTopic = requestTopic.replace("req", "res"); - } else { - responseTopic = requestTopic.replace("request", "response"); + protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); + if (awaitSubTopic.equals(requestTopic)) { + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + String responseTopic; + if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { + responseTopic = requestTopic.replace("req", "res"); + } else { + responseTopic = requestTopic.replace("request", "response"); + } + try { + client.publish(responseTopic, processProtoMessageArrived(requestTopic, mqttMessage)); + } catch (Exception e) { + log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); + } + subscribeLatch.countDown(); } - qoS = mqttMessage.getQos(); - client.publish(responseTopic, processProtoMessageArrived(requestTopic, mqttMessage)); - latch.countDown(); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - } } - protected MqttMessage processProtoMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { - MqttMessage message = new MqttMessage(); - if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { + protected byte[] processProtoMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { + if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration(); ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA); DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA); @@ -428,9 +404,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM DynamicMessage rpcResponseMsg = rpcResponseBuilder .setField(rpcResponseMsgDescriptor.findFieldByName("payload"), DEVICE_RESPONSE) .build(); - message.setPayload(rpcResponseMsg.toByteArray()); + return rpcResponseMsg.toByteArray(); } catch (InvalidProtocolBufferException e) { - log.warn("Command Response Ack Error, Invalid response received: ", e); + throw new RuntimeException("Command Response Ack Error, Invalid response received: ", e); } } else { TransportApiProtos.GatewayDeviceRpcRequestMsg msg = TransportApiProtos.GatewayDeviceRpcRequestMsg.parseFrom(mqttMessage.getPayload()); @@ -441,9 +417,8 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM .setId(requestId) .setData("{\"success\": true}") .build(); - message.setPayload(gatewayRpcResponseMsg.toByteArray()); + return gatewayRpcResponseMsg.toByteArray(); } - return message; } private ProtoTransportPayloadConfiguration getProtoTransportPayloadConfiguration() { @@ -455,37 +430,30 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; } - protected class TestSequenceMqttCallback implements MqttCallback { + protected class MqttTestSequenceCallback extends MqttTestCallback { - private final MqttAsyncClient client; - private final CountDownLatch latch; + private final MqttTestClient client; private final List expected; - TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List expected) { + MqttTestSequenceCallback(MqttTestClient client, int subscribeCount, List expected) { + super(subscribeCount); this.client = client; - this.latch = latch; this.expected = expected; } @Override - public void connectionLost(Throwable throwable) { - } - - @Override - public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { - log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); expected.add(new String(mqttMessage.getPayload())); String responseTopic = requestTopic.replace("request", "response"); - var qoS = mqttMessage.getQos(); - - client.messageArrivedComplete(mqttMessage.getId(), qoS); - client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); - latch.countDown(); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - + qoS = mqttMessage.getQos(); + try { + client.messageArrivedComplete(mqttMessage); + client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); + } catch (MqttException e) { + log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); + } + subscribeLatch.countDown(); } } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcBackwardCompatibilityIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcBackwardCompatibilityIntegrationTest.java index 465063004e..5fb123666f 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcBackwardCompatibilityIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcBackwardCompatibilityIntegrationTest.java @@ -16,13 +16,16 @@ package org.thingsboard.server.transport.mqtt.rpc; import lombok.extern.slf4j.Slf4j; -import org.junit.After; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; -import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; + @Slf4j @DaoSqlTest public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { @@ -37,7 +40,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .useJsonPayloadFormatForDefaultDownlinkTopics(true) .build(); processBeforeTest(configProperties); - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test @@ -50,7 +53,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .useJsonPayloadFormatForDefaultDownlinkTopics(true) .build(); super.processBeforeTest(configProperties); - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test @@ -63,7 +66,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .useJsonPayloadFormatForDefaultDownlinkTopics(true) .build(); super.processBeforeTest(configProperties); - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); } @Test @@ -75,7 +78,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .enableCompatibilityWithJsonPayloadFormat(true) .build(); super.processBeforeTest(configProperties); - processProtoTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processProtoTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test @@ -88,7 +91,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .useJsonPayloadFormatForDefaultDownlinkTopics(true) .build(); super.processBeforeTest(configProperties); - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test @@ -99,7 +102,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .rpcRequestProtoSchema(RPC_REQUEST_PROTO_SCHEMA) .build(); super.processBeforeTest(configProperties); - processProtoTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processProtoTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test @@ -112,7 +115,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .useJsonPayloadFormatForDefaultDownlinkTopics(true) .build(); super.processBeforeTest(configProperties); - processProtoTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); + processProtoTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); } @Test @@ -125,7 +128,7 @@ public class MqttServerSideRpcBackwardCompatibilityIntegrationTest extends Abstr .useJsonPayloadFormatForDefaultDownlinkTopics(true) .build(); super.processBeforeTest(configProperties); - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcDefaultIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcDefaultIntegrationTest.java index 542843eb2c..95e3b7fde3 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcDefaultIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcDefaultIntegrationTest.java @@ -20,12 +20,14 @@ import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.service.security.AccessValidator; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; @Slf4j @DaoSqlTest @@ -80,32 +82,32 @@ public class MqttServerSideRpcDefaultIntegrationTest extends AbstractMqttServerS @Test public void testServerMqttOneWayRpc() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test public void testServerMqttOneWayRpcOnShortTopic() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test public void testServerMqttTwoWayRpc() throws Exception { - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test public void testServerMqttTwoWayRpcOnShortTopic() throws Exception { - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception { - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcJsonIntegrationTest.java index 2fc95e5f6e..b3c3a994d1 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcJsonIntegrationTest.java @@ -16,14 +16,17 @@ package org.thingsboard.server.transport.mqtt.rpc; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; -import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.MqttTestClient; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; + @Slf4j @DaoSqlTest public class MqttServerSideRpcJsonIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { @@ -40,32 +43,32 @@ public class MqttServerSideRpcJsonIntegrationTest extends AbstractMqttServerSide @Test public void testServerMqttOneWayRpc() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test public void testServerMqttOneWayRpcOnShortTopic() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test public void testServerMqttTwoWayRpc() throws Exception { - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test public void testServerMqttTwoWayRpcOnShortTopic() throws Exception { - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception { - processJsonTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); + processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test @@ -79,10 +82,12 @@ public class MqttServerSideRpcJsonIntegrationTest extends AbstractMqttServerSide } protected void processJsonOneWayRpcTestGateway(String deviceName) throws Exception { - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); String payload = "{\"device\": \"" + deviceName + "\", \"type\": \"" + TransportPayloadType.JSON.name() + "\"}"; byte[] payloadBytes = payload.getBytes(); validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes); + client.disconnect(); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcProtoIntegrationTest.java index dcd06f5a52..e888f49ff4 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/MqttServerSideRpcProtoIntegrationTest.java @@ -19,10 +19,13 @@ import lombok.extern.slf4j.Slf4j; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; -import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; + @Slf4j @DaoSqlTest public class MqttServerSideRpcProtoIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { @@ -40,32 +43,32 @@ public class MqttServerSideRpcProtoIntegrationTest extends AbstractMqttServerSid @Test public void testServerMqttOneWayRpc() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test public void testServerMqttOneWayRpcOnShortTopic() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test public void testServerMqttOneWayRpcOnShortProtoTopic() throws Exception { - processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); + processOneWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); } @Test public void testServerMqttTwoWayRpc() throws Exception { - processProtoTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + processProtoTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_TOPIC); } @Test public void testServerMqttTwoWayRpcOnShortTopic() throws Exception { - processProtoTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + processProtoTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); } @Test public void testServerMqttTwoWayRpcOnShortProtoTopic() throws Exception { - processProtoTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); + processProtoTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); } @Test