|
|
|
@ -26,33 +26,34 @@ import com.squareup.wire.schema.internal.parser.ProtoFileElement; |
|
|
|
import io.netty.handler.codec.mqtt.MqttQoS; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttAsyncClient; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttCallback; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage; |
|
|
|
import org.junit.Assert; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.server.common.data.Device; |
|
|
|
import org.thingsboard.server.common.data.DeviceTransportType; |
|
|
|
import org.thingsboard.server.common.data.TransportPayloadType; |
|
|
|
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; |
|
|
|
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; |
|
|
|
import org.thingsboard.server.common.data.device.profile.MqttTopics; |
|
|
|
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; |
|
|
|
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; |
|
|
|
import org.thingsboard.server.gen.transport.TransportApiProtos; |
|
|
|
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; |
|
|
|
import org.thingsboard.server.transport.mqtt.MqttTestCallback; |
|
|
|
import org.thingsboard.server.transport.mqtt.MqttTestClient; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.List; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
import static org.junit.Assert.assertEquals; |
|
|
|
import static org.junit.Assert.assertNotNull; |
|
|
|
import static org.junit.Assert.assertTrue; |
|
|
|
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|
|
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.BASE_DEVICE_API_TOPIC; |
|
|
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.BASE_DEVICE_API_TOPIC_V2; |
|
|
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; |
|
|
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; |
|
|
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_RPC_TOPIC; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractMqttIntegrationTest { |
|
|
|
@ -76,81 +77,93 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
protected static final Long asyncContextTimeoutToUseRpcPlugin = 10000L; |
|
|
|
|
|
|
|
protected void processOneWayRpcTest(String rpcSubTopic) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(accessToken); |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
TestOneWayMqttCallback callback = new TestOneWayMqttCallback(client, latch); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(accessToken); |
|
|
|
MqttTestCallback callback = new MqttTestCallback(rpcSubTopic.replace("+", "0")); |
|
|
|
client.setCallback(callback); |
|
|
|
|
|
|
|
client.subscribe(rpcSubTopic, MqttQoS.AT_MOST_ONCE.value()); |
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
client.subscribeAndWait(rpcSubTopic, MqttQoS.AT_MOST_ONCE); |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
Assert.assertTrue(StringUtils.isEmpty(result)); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
String result = doPostAsync("/api/rpc/oneway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk()); |
|
|
|
assertTrue(StringUtils.isEmpty(result)); |
|
|
|
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); |
|
|
|
DeviceTransportType deviceTransportType = deviceProfile.getTransportType(); |
|
|
|
if (deviceTransportType.equals(DeviceTransportType.MQTT)) { |
|
|
|
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); |
|
|
|
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); |
|
|
|
MqttDeviceProfileTransportConfiguration configuration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; |
|
|
|
TransportPayloadType transportPayloadType = configuration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); |
|
|
|
if (transportPayloadType.equals(TransportPayloadType.PROTOBUF)) { |
|
|
|
// TODO: add correct validation of proto requests to device
|
|
|
|
assertTrue(callback.getPayloadBytes().length > 0); |
|
|
|
} else { |
|
|
|
assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); |
|
|
|
} |
|
|
|
} else { |
|
|
|
assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); |
|
|
|
} |
|
|
|
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
protected void processJsonOneWayRpcTestGateway(String deviceName) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(gatewayAccessToken); |
|
|
|
String payload = "{\"device\":\"" + deviceName + "\"}"; |
|
|
|
byte[] payloadBytes = payload.getBytes(); |
|
|
|
validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
protected void processJsonTwoWayRpcTest(String rpcSubTopic) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(accessToken); |
|
|
|
client.subscribe(rpcSubTopic, 1); |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
TestJsonMqttCallback callback = new TestJsonMqttCallback(client, latch); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(accessToken); |
|
|
|
client.subscribeAndWait(rpcSubTopic, MqttQoS.AT_LEAST_ONCE); |
|
|
|
MqttTestRpcJsonCallback callback = new MqttTestRpcJsonCallback(client, rpcSubTopic.replace("+", "0")); |
|
|
|
client.setCallback(callback); |
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
|
|
|
|
String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
String expected = "{\"value1\":\"A\",\"value2\":\"B\"}"; |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
Assert.assertEquals(expected, result); |
|
|
|
String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk()); |
|
|
|
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); |
|
|
|
assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); |
|
|
|
assertEquals("{\"value1\":\"A\",\"value2\":\"B\"}", actualRpcResponse); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
protected void processProtoTwoWayRpcTest(String rpcSubTopic) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(accessToken); |
|
|
|
client.subscribe(rpcSubTopic, 1); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(accessToken); |
|
|
|
client.subscribeAndWait(rpcSubTopic, MqttQoS.AT_LEAST_ONCE); |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
TestProtoMqttCallback callback = new TestProtoMqttCallback(client, latch); |
|
|
|
MqttTestRpcProtoCallback callback = new MqttTestRpcProtoCallback(client, rpcSubTopic.replace("+", "0")); |
|
|
|
client.setCallback(callback); |
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
|
|
|
|
String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
String expected = "{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}"; |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
Assert.assertEquals(expected, result); |
|
|
|
String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); |
|
|
|
// TODO: add correct validation of proto requests to device
|
|
|
|
assertTrue(callback.getPayloadBytes().length > 0); |
|
|
|
assertEquals("{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}", actualRpcResponse); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
protected void processProtoTwoWayRpcTestGateway(String deviceName) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(gatewayAccessToken); |
|
|
|
TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName); |
|
|
|
byte[] payloadBytes = connectMsgProto.toByteArray(); |
|
|
|
validateProtoTwoWayRpcGatewayResponse(deviceName, client, payloadBytes); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
protected void processProtoOneWayRpcTestGateway(String deviceName) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(gatewayAccessToken); |
|
|
|
TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName); |
|
|
|
byte[] payloadBytes = connectMsgProto.toByteArray(); |
|
|
|
validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
private TransportApiProtos.ConnectMsg getConnectProto(String deviceName) { |
|
|
|
@ -175,29 +188,30 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); |
|
|
|
} |
|
|
|
|
|
|
|
MqttAsyncClient client = getMqttAsyncClient(accessToken); |
|
|
|
client.setManualAcks(true); |
|
|
|
CountDownLatch latch = new CountDownLatch(10); |
|
|
|
TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(accessToken); |
|
|
|
client.enableManualAcks(); |
|
|
|
MqttTestSequenceCallback callback = new MqttTestSequenceCallback(client, 10, result); |
|
|
|
client.setCallback(callback); |
|
|
|
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1); |
|
|
|
client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE); |
|
|
|
|
|
|
|
latch.await(10, TimeUnit.SECONDS); |
|
|
|
Assert.assertEquals(expected, result); |
|
|
|
callback.getSubscribeLatch().await(10, TimeUnit.SECONDS); |
|
|
|
assertEquals(expected, result); |
|
|
|
} |
|
|
|
|
|
|
|
protected void processJsonTwoWayRpcTestGateway(String deviceName) throws Exception { |
|
|
|
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); |
|
|
|
MqttTestClient client = new MqttTestClient(); |
|
|
|
client.connectAndWait(gatewayAccessToken); |
|
|
|
|
|
|
|
String payload = "{\"device\":\"" + deviceName + "\"}"; |
|
|
|
byte[] payloadBytes = payload.getBytes(); |
|
|
|
|
|
|
|
validateJsonTwoWayRpcGatewayResponse(deviceName, client, payloadBytes); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateOneWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception { |
|
|
|
publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); |
|
|
|
|
|
|
|
protected void validateOneWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { |
|
|
|
client.publish(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); |
|
|
|
Device savedDevice = doExecuteWithRetriesAndInterval( |
|
|
|
() -> getDeviceByName(deviceName), |
|
|
|
20, |
|
|
|
@ -205,24 +219,45 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
); |
|
|
|
assertNotNull(savedDevice); |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
TestOneWayMqttCallback callback = new TestOneWayMqttCallback(client, latch); |
|
|
|
MqttTestCallback callback = new MqttTestCallback(GATEWAY_RPC_TOPIC); |
|
|
|
client.setCallback(callback); |
|
|
|
|
|
|
|
client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value()); |
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
|
|
|
|
client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); |
|
|
|
String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
Assert.assertTrue(StringUtils.isEmpty(result)); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
assertTrue(StringUtils.isEmpty(result)); |
|
|
|
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); |
|
|
|
DeviceTransportType deviceTransportType = deviceProfile.getTransportType(); |
|
|
|
if (deviceTransportType.equals(DeviceTransportType.MQTT)) { |
|
|
|
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); |
|
|
|
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); |
|
|
|
MqttDeviceProfileTransportConfiguration configuration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; |
|
|
|
TransportPayloadType transportPayloadType = configuration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); |
|
|
|
if (transportPayloadType.equals(TransportPayloadType.PROTOBUF)) { |
|
|
|
// TODO: add correct validation of proto requests to device
|
|
|
|
assertTrue(callback.getPayloadBytes().length > 0); |
|
|
|
} else { |
|
|
|
JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest); |
|
|
|
assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes())); |
|
|
|
} |
|
|
|
} else { |
|
|
|
JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest); |
|
|
|
assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes())); |
|
|
|
} |
|
|
|
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateJsonTwoWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception { |
|
|
|
publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); |
|
|
|
private JsonNode getExpectedGatewayJsonRequestData(String deviceName, String requestStr) { |
|
|
|
ObjectNode deviceData = (ObjectNode) JacksonUtil.toJsonNode(requestStr); |
|
|
|
deviceData.put("id", 0); |
|
|
|
ObjectNode expectedRequest = JacksonUtil.newObjectNode(); |
|
|
|
expectedRequest.put("device", deviceName); |
|
|
|
expectedRequest.set("data", deviceData); |
|
|
|
return expectedRequest; |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateJsonTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { |
|
|
|
client.publish(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); |
|
|
|
|
|
|
|
Device savedDevice = doExecuteWithRetriesAndInterval( |
|
|
|
() -> getDeviceByName(deviceName), |
|
|
|
@ -231,25 +266,21 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
); |
|
|
|
assertNotNull(savedDevice); |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
TestJsonMqttCallback callback = new TestJsonMqttCallback(client, latch); |
|
|
|
MqttTestRpcJsonCallback callback = new MqttTestRpcJsonCallback(client, GATEWAY_RPC_TOPIC); |
|
|
|
client.setCallback(callback); |
|
|
|
|
|
|
|
client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value()); |
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
String expected = "{\"success\":true}"; |
|
|
|
assertEquals(expected, result); |
|
|
|
String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); |
|
|
|
log.warn("request payload: {}", JacksonUtil.fromBytes(callback.getPayloadBytes())); |
|
|
|
assertEquals("{\"success\":true}", actualRpcResponse); |
|
|
|
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception { |
|
|
|
publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); |
|
|
|
protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { |
|
|
|
client.publish(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); |
|
|
|
|
|
|
|
Device savedDevice = doExecuteWithRetriesAndInterval( |
|
|
|
() -> getDeviceByName(deviceName), |
|
|
|
@ -258,20 +289,15 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
); |
|
|
|
assertNotNull(savedDevice); |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
TestProtoMqttCallback callback = new TestProtoMqttCallback(client, latch); |
|
|
|
MqttTestRpcProtoCallback callback = new MqttTestRpcProtoCallback(client, GATEWAY_RPC_TOPIC); |
|
|
|
client.setCallback(callback); |
|
|
|
|
|
|
|
client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value()); |
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
String result = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
String expected = "{\"success\":true}"; |
|
|
|
assertEquals(expected, result); |
|
|
|
String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); |
|
|
|
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); |
|
|
|
assertEquals("{\"success\":true}", actualRpcResponse); |
|
|
|
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); |
|
|
|
} |
|
|
|
|
|
|
|
@ -279,132 +305,82 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
return doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class); |
|
|
|
} |
|
|
|
|
|
|
|
protected MqttMessage processJsonMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { |
|
|
|
MqttMessage message = new MqttMessage(); |
|
|
|
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
message.setPayload(DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8)); |
|
|
|
protected byte[] processJsonMessageArrived(String requestTopic, MqttMessage mqttMessage) { |
|
|
|
if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
return DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8); |
|
|
|
} else { |
|
|
|
JsonNode requestMsgNode = JacksonUtil.toJsonNode(new String(mqttMessage.getPayload(), StandardCharset.UTF_8)); |
|
|
|
String deviceName = requestMsgNode.get("device").asText(); |
|
|
|
int requestId = requestMsgNode.get("data").get("id").asInt(); |
|
|
|
message.setPayload(("{\"device\": \"" + deviceName + "\", \"id\": " + requestId + ", \"data\": {\"success\": true}}").getBytes(StandardCharset.UTF_8)); |
|
|
|
} |
|
|
|
return message; |
|
|
|
} |
|
|
|
|
|
|
|
protected class TestOneWayMqttCallback implements MqttCallback { |
|
|
|
|
|
|
|
private final MqttAsyncClient client; |
|
|
|
private final CountDownLatch latch; |
|
|
|
private Integer qoS; |
|
|
|
|
|
|
|
TestOneWayMqttCallback(MqttAsyncClient client, CountDownLatch latch) { |
|
|
|
this.client = client; |
|
|
|
this.latch = latch; |
|
|
|
} |
|
|
|
|
|
|
|
int getQoS() { |
|
|
|
return qoS; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void connectionLost(Throwable throwable) { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { |
|
|
|
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); |
|
|
|
qoS = mqttMessage.getQos(); |
|
|
|
latch.countDown(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
|
|
|
|
|
|
|
String response = "{\"device\": \"" + deviceName + "\", \"id\": " + requestId + ", \"data\": {\"success\": true}}"; |
|
|
|
return response.getBytes(StandardCharset.UTF_8); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
protected class TestJsonMqttCallback implements MqttCallback { |
|
|
|
protected class MqttTestRpcJsonCallback extends MqttTestCallback { |
|
|
|
|
|
|
|
private final MqttAsyncClient client; |
|
|
|
private final CountDownLatch latch; |
|
|
|
private Integer qoS; |
|
|
|
private final MqttTestClient client; |
|
|
|
|
|
|
|
TestJsonMqttCallback(MqttAsyncClient client, CountDownLatch latch) { |
|
|
|
public MqttTestRpcJsonCallback(MqttTestClient client, String awaitSubTopic) { |
|
|
|
super(awaitSubTopic); |
|
|
|
this.client = client; |
|
|
|
this.latch = latch; |
|
|
|
} |
|
|
|
|
|
|
|
int getQoS() { |
|
|
|
return qoS; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void connectionLost(Throwable throwable) { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { |
|
|
|
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); |
|
|
|
String responseTopic; |
|
|
|
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
responseTopic = requestTopic.replace("req", "res"); |
|
|
|
} else { |
|
|
|
responseTopic = requestTopic.replace("request", "response"); |
|
|
|
protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { |
|
|
|
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); |
|
|
|
if (awaitSubTopic.equals(requestTopic)) { |
|
|
|
qoS = mqttMessage.getQos(); |
|
|
|
payloadBytes = mqttMessage.getPayload(); |
|
|
|
String responseTopic; |
|
|
|
if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
responseTopic = requestTopic.replace("req", "res"); |
|
|
|
} else { |
|
|
|
responseTopic = requestTopic.replace("request", "response"); |
|
|
|
} |
|
|
|
try { |
|
|
|
client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); |
|
|
|
} catch (MqttException e) { |
|
|
|
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); |
|
|
|
} |
|
|
|
subscribeLatch.countDown(); |
|
|
|
} |
|
|
|
qoS = mqttMessage.getQos(); |
|
|
|
client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); |
|
|
|
latch.countDown(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
protected class TestProtoMqttCallback implements MqttCallback { |
|
|
|
protected class MqttTestRpcProtoCallback extends MqttTestCallback { |
|
|
|
|
|
|
|
private final MqttAsyncClient client; |
|
|
|
private final CountDownLatch latch; |
|
|
|
private Integer qoS; |
|
|
|
private final MqttTestClient client; |
|
|
|
|
|
|
|
TestProtoMqttCallback(MqttAsyncClient client, CountDownLatch latch) { |
|
|
|
public MqttTestRpcProtoCallback(MqttTestClient client, String awaitSubTopic) { |
|
|
|
super(awaitSubTopic); |
|
|
|
this.client = client; |
|
|
|
this.latch = latch; |
|
|
|
} |
|
|
|
|
|
|
|
int getQoS() { |
|
|
|
return qoS; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void connectionLost(Throwable throwable) { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { |
|
|
|
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); |
|
|
|
String responseTopic; |
|
|
|
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
responseTopic = requestTopic.replace("req", "res"); |
|
|
|
} else { |
|
|
|
responseTopic = requestTopic.replace("request", "response"); |
|
|
|
protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { |
|
|
|
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); |
|
|
|
if (awaitSubTopic.equals(requestTopic)) { |
|
|
|
qoS = mqttMessage.getQos(); |
|
|
|
payloadBytes = mqttMessage.getPayload(); |
|
|
|
String responseTopic; |
|
|
|
if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
responseTopic = requestTopic.replace("req", "res"); |
|
|
|
} else { |
|
|
|
responseTopic = requestTopic.replace("request", "response"); |
|
|
|
} |
|
|
|
try { |
|
|
|
client.publish(responseTopic, processProtoMessageArrived(requestTopic, mqttMessage)); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); |
|
|
|
} |
|
|
|
subscribeLatch.countDown(); |
|
|
|
} |
|
|
|
qoS = mqttMessage.getQos(); |
|
|
|
client.publish(responseTopic, processProtoMessageArrived(requestTopic, mqttMessage)); |
|
|
|
latch.countDown(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
protected MqttMessage processProtoMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { |
|
|
|
MqttMessage message = new MqttMessage(); |
|
|
|
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
protected byte[] processProtoMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { |
|
|
|
if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { |
|
|
|
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration(); |
|
|
|
ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA); |
|
|
|
DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA); |
|
|
|
@ -428,9 +404,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
DynamicMessage rpcResponseMsg = rpcResponseBuilder |
|
|
|
.setField(rpcResponseMsgDescriptor.findFieldByName("payload"), DEVICE_RESPONSE) |
|
|
|
.build(); |
|
|
|
message.setPayload(rpcResponseMsg.toByteArray()); |
|
|
|
return rpcResponseMsg.toByteArray(); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
log.warn("Command Response Ack Error, Invalid response received: ", e); |
|
|
|
throw new RuntimeException("Command Response Ack Error, Invalid response received: ", e); |
|
|
|
} |
|
|
|
} else { |
|
|
|
TransportApiProtos.GatewayDeviceRpcRequestMsg msg = TransportApiProtos.GatewayDeviceRpcRequestMsg.parseFrom(mqttMessage.getPayload()); |
|
|
|
@ -441,9 +417,8 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
.setId(requestId) |
|
|
|
.setData("{\"success\": true}") |
|
|
|
.build(); |
|
|
|
message.setPayload(gatewayRpcResponseMsg.toByteArray()); |
|
|
|
return gatewayRpcResponseMsg.toByteArray(); |
|
|
|
} |
|
|
|
return message; |
|
|
|
} |
|
|
|
|
|
|
|
private ProtoTransportPayloadConfiguration getProtoTransportPayloadConfiguration() { |
|
|
|
@ -455,37 +430,30 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM |
|
|
|
return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; |
|
|
|
} |
|
|
|
|
|
|
|
protected class TestSequenceMqttCallback implements MqttCallback { |
|
|
|
protected class MqttTestSequenceCallback extends MqttTestCallback { |
|
|
|
|
|
|
|
private final MqttAsyncClient client; |
|
|
|
private final CountDownLatch latch; |
|
|
|
private final MqttTestClient client; |
|
|
|
private final List<String> expected; |
|
|
|
|
|
|
|
TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List<String> expected) { |
|
|
|
MqttTestSequenceCallback(MqttTestClient client, int subscribeCount, List<String> expected) { |
|
|
|
super(subscribeCount); |
|
|
|
this.client = client; |
|
|
|
this.latch = latch; |
|
|
|
this.expected = expected; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void connectionLost(Throwable throwable) { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { |
|
|
|
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); |
|
|
|
public void messageArrived(String requestTopic, MqttMessage mqttMessage) { |
|
|
|
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); |
|
|
|
expected.add(new String(mqttMessage.getPayload())); |
|
|
|
String responseTopic = requestTopic.replace("request", "response"); |
|
|
|
var qoS = mqttMessage.getQos(); |
|
|
|
|
|
|
|
client.messageArrivedComplete(mqttMessage.getId(), qoS); |
|
|
|
client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); |
|
|
|
latch.countDown(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
|
|
|
|
|
|
|
qoS = mqttMessage.getQos(); |
|
|
|
try { |
|
|
|
client.messageArrivedComplete(mqttMessage); |
|
|
|
client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); |
|
|
|
} catch (MqttException e) { |
|
|
|
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); |
|
|
|
} |
|
|
|
subscribeLatch.countDown(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|