|
|
|
@ -15,21 +15,23 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.server.mqtt.rpc; |
|
|
|
|
|
|
|
import java.util.Arrays; |
|
|
|
|
|
|
|
import com.datastax.driver.core.utils.UUIDs; |
|
|
|
import com.fasterxml.jackson.core.type.TypeReference; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.eclipse.paho.client.mqttv3.*; |
|
|
|
import org.junit.*; |
|
|
|
import org.springframework.http.HttpStatus; |
|
|
|
import org.springframework.web.client.HttpClientErrorException; |
|
|
|
import org.thingsboard.server.actors.plugin.PluginProcessingContext; |
|
|
|
import org.thingsboard.server.common.data.Device; |
|
|
|
import org.thingsboard.server.common.data.Tenant; |
|
|
|
import org.thingsboard.server.common.data.User; |
|
|
|
import org.thingsboard.server.common.data.page.TextPageData; |
|
|
|
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|
|
|
import org.thingsboard.server.common.data.security.Authority; |
|
|
|
import org.thingsboard.server.common.data.security.DeviceCredentials; |
|
|
|
import org.thingsboard.server.controller.AbstractControllerTest; |
|
|
|
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|
|
|
|
|
|
|
import java.util.UUID; |
|
|
|
|
|
|
|
import static org.junit.Assert.assertEquals; |
|
|
|
import static org.junit.Assert.assertNotNull; |
|
|
|
@ -42,15 +44,19 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. |
|
|
|
public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractControllerTest { |
|
|
|
|
|
|
|
private static final String MQTT_URL = "tcp://localhost:1883"; |
|
|
|
private static final String FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED = "HttpClientErrorException expected, but not encountered"; |
|
|
|
private static final Long TIME_TO_HANDLE_REQUEST = 500L; |
|
|
|
|
|
|
|
private Tenant savedTenant; |
|
|
|
private User tenantAdmin; |
|
|
|
private Long asyncContextTimeoutToUseRpcPlugin; |
|
|
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
public void beforeTest() throws Exception { |
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
asyncContextTimeoutToUseRpcPlugin = getAsyncContextTimeoutToUseRpcPlugin(); |
|
|
|
|
|
|
|
Tenant tenant = new Tenant(); |
|
|
|
tenant.setTitle("My tenant"); |
|
|
|
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|
|
|
@ -70,8 +76,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
public void afterTest() throws Exception { |
|
|
|
loginSysAdmin(); |
|
|
|
if (savedTenant != null) { |
|
|
|
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) |
|
|
|
.andExpect(status().isOk()); |
|
|
|
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -102,7 +107,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200
|
|
|
|
public void testServerMqttOneWayRpcDeviceOffline() throws Exception { |
|
|
|
Device device = new Device(); |
|
|
|
device.setName("Test One-Way Server-Side RPC Device Offline"); |
|
|
|
@ -115,29 +119,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
try { |
|
|
|
doPost("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(408)); |
|
|
|
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); |
|
|
|
} catch (HttpClientErrorException e) { |
|
|
|
log.error(e.getMessage(), e); |
|
|
|
Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode()); |
|
|
|
Assert.assertEquals("408 null", e.getMessage()); |
|
|
|
} |
|
|
|
|
|
|
|
doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), |
|
|
|
asyncContextTimeoutToUseRpcPlugin); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401
|
|
|
|
public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception { |
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; |
|
|
|
String nonExistentDeviceId = UUID.randomUUID().toString(); |
|
|
|
try { |
|
|
|
doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400)); |
|
|
|
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); |
|
|
|
} catch (HttpClientErrorException e) { |
|
|
|
log.error(e.getMessage(), e); |
|
|
|
Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode()); |
|
|
|
Assert.assertEquals("400 null", e.getMessage()); |
|
|
|
} |
|
|
|
String nonExistentDeviceId = UUIDs.timeBased().toString(); |
|
|
|
|
|
|
|
String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, |
|
|
|
status().isNotFound()); |
|
|
|
Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
@ -168,7 +162,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200
|
|
|
|
public void testServerMqttTwoWayRpcDeviceOffline() throws Exception { |
|
|
|
Device device = new Device(); |
|
|
|
device.setName("Test Two-Way Server-Side RPC Device Offline"); |
|
|
|
@ -181,29 +174,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
|
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; |
|
|
|
String deviceId = savedDevice.getId().getId().toString(); |
|
|
|
try { |
|
|
|
doPost("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(408)); |
|
|
|
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); |
|
|
|
} catch (HttpClientErrorException e) { |
|
|
|
log.error(e.getMessage(), e); |
|
|
|
Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode()); |
|
|
|
Assert.assertEquals("408 null", e.getMessage()); |
|
|
|
} |
|
|
|
|
|
|
|
doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), |
|
|
|
asyncContextTimeoutToUseRpcPlugin); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401
|
|
|
|
public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception { |
|
|
|
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; |
|
|
|
String nonExistentDeviceId = UUID.randomUUID().toString(); |
|
|
|
try { |
|
|
|
doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400)); |
|
|
|
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); |
|
|
|
} catch (HttpClientErrorException e) { |
|
|
|
log.error(e.getMessage(), e); |
|
|
|
Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode()); |
|
|
|
Assert.assertEquals("400 null", e.getMessage()); |
|
|
|
} |
|
|
|
String nonExistentDeviceId = UUIDs.timeBased().toString(); |
|
|
|
|
|
|
|
String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class, |
|
|
|
status().isNotFound()); |
|
|
|
Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result); |
|
|
|
} |
|
|
|
|
|
|
|
private Device getSavedDevice(Device device) throws Exception { |
|
|
|
@ -214,6 +197,13 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
return doGet("/api/device/" + savedDevice.getId().getId().toString() + "/credentials", DeviceCredentials.class); |
|
|
|
} |
|
|
|
|
|
|
|
private Long getAsyncContextTimeoutToUseRpcPlugin() throws Exception { |
|
|
|
TextPageData<PluginMetaData> plugins = doGetTyped("/api/plugin/system?limit=1&textSearch=system rpc plugin", |
|
|
|
new TypeReference<TextPageData<PluginMetaData>>(){}); |
|
|
|
Long systemRpcPluginTimeout = plugins.getData().iterator().next().getConfiguration().get("defaultTimeout").asLong(); |
|
|
|
return systemRpcPluginTimeout + TIME_TO_HANDLE_REQUEST; |
|
|
|
} |
|
|
|
|
|
|
|
private static class TestMqttCallback implements MqttCallback { |
|
|
|
|
|
|
|
private final MqttAsyncClient client; |
|
|
|
@ -228,10 +218,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC |
|
|
|
|
|
|
|
@Override |
|
|
|
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { |
|
|
|
log.info("Message Arrived: " + mqttMessage.getPayload().toString()); |
|
|
|
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); |
|
|
|
MqttMessage message = new MqttMessage(); |
|
|
|
String responseTopic = requestTopic.replace("request", "response"); |
|
|
|
message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes()); |
|
|
|
message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8")); |
|
|
|
client.publish(responseTopic, message); |
|
|
|
} |
|
|
|
|
|
|
|
|