diff --git a/msa/integration-tests/README.md b/msa/integration-tests/README.md index 5ae6354740..93df2bf77f 100644 --- a/msa/integration-tests/README.md +++ b/msa/integration-tests/README.md @@ -15,4 +15,4 @@ As result, in REPOSITORY column, next images should be present: - Run the integration tests in the [msa/integration-tests](../integration-tests) directory: - mvn clean install -Dintegrationtests.skip=false \ No newline at end of file + mvn clean install -DintegrationTests.skip=false \ No newline at end of file diff --git a/msa/integration-tests/pom.xml b/msa/integration-tests/pom.xml index a1c24f39c6..a9c67ccf93 100644 --- a/msa/integration-tests/pom.xml +++ b/msa/integration-tests/pom.xml @@ -34,9 +34,10 @@ UTF-8 ${basedir}/../.. - true + true 1.9.1 1.3.9 + 4.5.6 @@ -50,6 +51,11 @@ Java-WebSocket ${java-websocket.version} + + org.apache.httpcomponents + httpclient + ${httpclient.version} + io.takari.junit takari-cpsuite @@ -89,7 +95,7 @@ **/*TestSuite.java - ${integrationtests.skip} + ${integrationTests.skip} diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java index ab7f101f98..5ebab780be 100644 --- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java @@ -21,55 +21,68 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.conn.ssl.X509HostnameVerifier; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; import org.junit.*; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.thingsboard.client.tools.RestClient; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; +import javax.net.ssl.*; import java.net.URI; -import java.net.URISyntaxException; +import java.security.cert.X509Certificate; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.TimeUnit; @Slf4j public abstract class AbstractContainerTest { - protected static String httpUrl; - protected static String wsUrl; + protected static final String HTTPS_URL = "https://localhost"; + protected static final String WSS_URL = "wss://localhost"; protected static RestClient restClient; protected ObjectMapper mapper = new ObjectMapper(); @BeforeClass - public static void before() { - httpUrl = "http://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT); - wsUrl = "ws://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT); - restClient = new RestClient(httpUrl); + public static void before() throws Exception { + restClient = new RestClient(HTTPS_URL); + restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); } protected Device createDevice(String name) { return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT"); } - protected WsClient subscribeToTelemetryWebSocket(DeviceId deviceId) throws URISyntaxException, InterruptedException { - WsClient mWs = new WsClient(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + restClient.getToken())); - mWs.connectBlocking(1, TimeUnit.SECONDS); - - JsonObject tsSubCmd = new JsonObject(); - tsSubCmd.addProperty("entityType", EntityType.DEVICE.name()); - tsSubCmd.addProperty("entityId", deviceId.toString()); - tsSubCmd.addProperty("scope", "LATEST_TELEMETRY"); - tsSubCmd.addProperty("cmdId", new Random().nextInt(100)); - tsSubCmd.addProperty("unsubscribe", false); - JsonArray wsTsSubCmds = new JsonArray(); - wsTsSubCmds.add(tsSubCmd); + protected WsClient subscribeToWebSocket(DeviceId deviceId, String scope, CmdsType property) throws Exception { + WsClient wsClient = new WsClient(new URI(WSS_URL + "/api/ws/plugins/telemetry?token=" + restClient.getToken())); + SSLContextBuilder builder = SSLContexts.custom(); + builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); + wsClient.setSocket(builder.build().getSocketFactory().createSocket()); + wsClient.connectBlocking(); + + JsonObject cmdsObject = new JsonObject(); + cmdsObject.addProperty("entityType", EntityType.DEVICE.name()); + cmdsObject.addProperty("entityId", deviceId.toString()); + cmdsObject.addProperty("scope", scope); + cmdsObject.addProperty("cmdId", new Random().nextInt(100)); + + JsonArray cmd = new JsonArray(); + cmd.add(cmdsObject); JsonObject wsRequest = new JsonObject(); - wsRequest.add("tsSubCmds", wsTsSubCmds); - wsRequest.add("historyCmds", new JsonArray()); - wsRequest.add("attrSubCmds", new JsonArray()); - mWs.send(wsRequest.toString()); - return mWs; + wsRequest.add(property.toString(), cmd); + wsClient.send(wsRequest.toString()); + return wsClient; } protected Map getExpectedLatestValues(long ts) { @@ -109,4 +122,54 @@ public abstract class AbstractContainerTest { return values; } + protected enum CmdsType { + TS_SUB_CMDS("tsSubCmds"), + HISTORY_CMDS("historyCmds"), + ATTR_SUB_CMDS("attrSubCmds"); + + private final String text; + + CmdsType(final String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } + } + + private static HttpComponentsClientHttpRequestFactory getRequestFactoryForSelfSignedCert() throws Exception { + SSLContextBuilder builder = SSLContexts.custom(); + builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); + SSLContext sslContext = builder.build(); + SSLConnectionSocketFactory sslSelfSigned = new SSLConnectionSocketFactory(sslContext, new X509HostnameVerifier() { + @Override + public void verify(String host, SSLSocket ssl) { + } + + @Override + public void verify(String host, X509Certificate cert) { + } + + @Override + public void verify(String host, String[] cns, String[] subjectAlts) { + } + + @Override + public boolean verify(String s, SSLSession sslSession) { + return true; + } + }); + + Registry socketFactoryRegistry = RegistryBuilder + .create() + .register("https", sslSelfSigned) + .build(); + + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(cm).build(); + return new HttpComponentsClientHttpRequestFactory(httpClient); + } + } diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index fd2de229d0..0629960b24 100644 --- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -26,12 +26,11 @@ import java.io.File; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*"}) public class ContainerTestSuite { - static final int EXPOSED_PORT = 8080; @ClassRule public static DockerComposeContainer composeContainer = new DockerComposeContainer(new File("./../docker/docker-compose.yml")) .withPull(false) .withLocalCompose(true) .withTailChildContainers(true) - .withExposedService("tb-web-ui1", EXPOSED_PORT, Wait.forHttp("/login")); + .withExposedService("tb-web-ui1", 8080, Wait.forHttp("/login")); } diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 2f05eee4ce..5ef238f8aa 100644 --- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -19,16 +19,12 @@ import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; public class WsClient extends WebSocketClient { - private final BlockingQueue events; private String message; public WsClient(URI serverUri) { super(serverUri); - events = new ArrayBlockingQueue<>(100); } @Override @@ -37,13 +33,11 @@ public class WsClient extends WebSocketClient { @Override public void onMessage(String message) { - events.add(message); this.message = message; } @Override public void onClose(int code, String reason, boolean remote) { - events.clear(); } @Override @@ -54,4 +48,4 @@ public class WsClient extends WebSocketClient { public String getLastMessage() { return this.message; } -} \ No newline at end of file +} diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java index 7cc0a0f6e3..f8e1041252 100644 --- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.msa.connectivity; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; import org.springframework.http.ResponseEntity; @@ -22,7 +23,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.msa.AbstractContainerTest; import org.thingsboard.server.msa.WsClient; -import org.thingsboard.server.msa.WsTelemetryResponse; +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; import java.util.concurrent.TimeUnit; @@ -35,23 +36,24 @@ public class HttpClientTest extends AbstractContainerTest { Device device = createDevice("http_"); DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); - WsClient mWs = subscribeToTelemetryWebSocket(device.getId()); + WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); ResponseEntity deviceTelemetryResponse = restClient.getRestTemplate() - .postForEntity(httpUrl + "/api/v1/{credentialsId}/telemetry", + .postForEntity(HTTPS_URL + "/api/v1/{credentialsId}/telemetry", mapper.readTree(createPayload().toString()), ResponseEntity.class, deviceCredentials.getCredentialsId()); Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful()); TimeUnit.SECONDS.sleep(1); - WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class); + WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class); - Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet()); + Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"), + actualLatestTelemetry.getLatestValues().keySet()); Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString())); Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1")); Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0))); Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73))); - restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId()); + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); } } diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 4ad638ed15..eae98b9f1e 100644 --- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -15,20 +15,39 @@ */ package org.thingsboard.server.msa.connectivity; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import lombok.Data; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.*; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttHandler; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.page.TextPageData; +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.msa.AbstractContainerTest; import org.thingsboard.server.msa.WsClient; -import org.thingsboard.server.msa.WsTelemetryResponse; +import org.thingsboard.server.msa.mapper.AttributesResponse; +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.*; import java.util.concurrent.*; public class MqttClientTest extends AbstractContainerTest { @@ -39,20 +58,22 @@ public class MqttClientTest extends AbstractContainerTest { Device device = createDevice("mqtt_"); DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); - WsClient mWs = subscribeToTelemetryWebSocket(device.getId()); - MqttClient mqttClient = getMqttClient(deviceCredentials); + WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); + MqttClient mqttClient = getMqttClient(deviceCredentials, null); mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())); TimeUnit.SECONDS.sleep(1); - WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class); + WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class); - Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet()); + Assert.assertEquals(4, actualLatestTelemetry.getData().size()); + Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"), + actualLatestTelemetry.getLatestValues().keySet()); Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString())); Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1")); Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0))); Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73))); - restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId()); + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); } @Test @@ -63,12 +84,13 @@ public class MqttClientTest extends AbstractContainerTest { Device device = createDevice("mqtt_"); DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); - WsClient mWs = subscribeToTelemetryWebSocket(device.getId()); - MqttClient mqttClient = getMqttClient(deviceCredentials); + WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); + MqttClient mqttClient = getMqttClient(deviceCredentials, null); mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())); TimeUnit.SECONDS.sleep(1); - WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class); + WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class); + Assert.assertEquals(4, actualLatestTelemetry.getData().size()); Assert.assertEquals(getExpectedLatestValues(ts), actualLatestTelemetry.getLatestValues()); Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", ts, Boolean.TRUE.toString())); @@ -76,15 +98,271 @@ public class MqttClientTest extends AbstractContainerTest { Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", ts, Double.toString(42.0))); Assert.assertTrue(verify(actualLatestTelemetry, "longKey", ts, Long.toString(73))); - restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId()); + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); } - private MqttClient getMqttClient(DeviceCredentials deviceCredentials) throws InterruptedException { - MqttMessageListener queue = new MqttMessageListener(); + @Test + public void publishAttributeUpdateToServer() throws Exception { + restClient.login("tenant@thingsboard.org", "tenant"); + Device device = createDevice("mqtt_"); + DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); + + WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS); + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + JsonObject clientAttributes = new JsonObject(); + clientAttributes.addProperty("attr1", "value1"); + clientAttributes.addProperty("attr2", true); + clientAttributes.addProperty("attr3", 42.0); + clientAttributes.addProperty("attr4", 73); + mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())); + TimeUnit.SECONDS.sleep(1); + WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class); + + Assert.assertEquals(4, actualLatestTelemetry.getData().size()); + Assert.assertEquals(Sets.newHashSet("attr1", "attr2", "attr3", "attr4"), + actualLatestTelemetry.getLatestValues().keySet()); + + Assert.assertTrue(verify(actualLatestTelemetry, "attr1", "value1")); + Assert.assertTrue(verify(actualLatestTelemetry, "attr2", Boolean.TRUE.toString())); + Assert.assertTrue(verify(actualLatestTelemetry, "attr3", Double.toString(42.0))); + Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73))); + + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); + } + + @Test + public void requestAttributeValuesFromServer() throws Exception { + restClient.login("tenant@thingsboard.org", "tenant"); + Device device = createDevice("mqtt_"); + DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + + // Add a new client attribute + JsonObject clientAttributes = new JsonObject(); + String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8); + clientAttributes.addProperty("clientAttr", clientAttributeValue); + mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())); + + // Add a new shared attribute + JsonObject sharedAttributes = new JsonObject(); + String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8); + sharedAttributes.addProperty("sharedAttr", sharedAttributeValue); + ResponseEntity sharedAttributesResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE", + mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, + device.getId()); + Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); + + // Subscribe to attributes response + mqttClient.on("v1/devices/me/attributes/response/+", listener); + // Request attributes + JsonObject request = new JsonObject(); + request.addProperty("clientKeys", "clientAttr"); + request.addProperty("sharedKeys", "sharedAttr"); + mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())); + MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); + + Assert.assertEquals(1, attributes.getClient().size()); + Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr")); + + Assert.assertEquals(1, attributes.getShared().size()); + Assert.assertEquals(sharedAttributeValue, attributes.getShared().get("sharedAttr")); + + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); + } + + @Test + public void subscribeToAttributeUpdatesFromServer() throws Exception { + restClient.login("tenant@thingsboard.org", "tenant"); + Device device = createDevice("mqtt_"); + DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + mqttClient.on("v1/devices/me/attributes", listener); + + String sharedAttributeName = "sharedAttr"; + + // Add a new shared attribute + JsonObject sharedAttributes = new JsonObject(); + String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8); + sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue); + ResponseEntity sharedAttributesResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE", + mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, + device.getId()); + Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); + + MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + Assert.assertEquals(sharedAttributeValue, + mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); + + // Update the shared attribute value + JsonObject updatedSharedAttributes = new JsonObject(); + String updatedSharedAttributeValue = RandomStringUtils.randomAlphanumeric(8); + updatedSharedAttributes.addProperty(sharedAttributeName, updatedSharedAttributeValue); + ResponseEntity updatedSharedAttributesResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE", + mapper.readTree(updatedSharedAttributes.toString()), ResponseEntity.class, + device.getId()); + Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); + + event = listener.getEvents().poll(10, TimeUnit.SECONDS); + Assert.assertEquals(updatedSharedAttributeValue, + mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); + + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); + } + + @Test + public void serverSideRpc() throws Exception { + restClient.login("tenant@thingsboard.org", "tenant"); + Device device = createDevice("mqtt_"); + DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + mqttClient.on("v1/devices/me/rpc/request/+", listener); + + // Send an RPC from the server + JsonObject serverRpcPayload = new JsonObject(); + serverRpcPayload.addProperty("method", "getValue"); + serverRpcPayload.addProperty("params", true); + serverRpcPayload.addProperty("timeout", 1000); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = service.submit(() -> { + try { + return restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/plugins/rpc/twoway/{deviceId}", + mapper.readTree(serverRpcPayload.toString()), String.class, + device.getId()); + } catch (IOException e) { + return ResponseEntity.badRequest().build(); + } + }); + + // Wait for RPC call from the server and send the response + MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + + Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); + + Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length())); + JsonObject clientResponse = new JsonObject(); + clientResponse.addProperty("response", "someResponse"); + // Send a response to the server's RPC request + mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())); + + ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); + Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); + + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); + } + + @Test + public void clientSideRpc() throws Exception { + restClient.login("tenant@thingsboard.org", "tenant"); + Device device = createDevice("mqtt_"); + DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + mqttClient.on("v1/devices/me/rpc/request/+", listener); + + // Get the default rule chain id to make it root again after test finished + RuleChainId defaultRuleChainId = getDefaultRuleChainId(); + + // Create a new root rule chain + RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); + + // Send the request to the server + JsonObject clientRequest = new JsonObject(); + clientRequest.addProperty("method", "getResponse"); + clientRequest.addProperty("params", true); + Integer requestId = 42; + mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())); + + // Check the response from the server + TimeUnit.SECONDS.sleep(1); + MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS); + Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); + Assert.assertEquals(requestId, responseId); + Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); + + // Make the default rule chain a root again + ResponseEntity rootRuleChainResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root", + null, + RuleChain.class, + defaultRuleChainId); + Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful()); + + // Delete the created rule chain + restClient.getRestTemplate().delete(HTTPS_URL + "/api/ruleChain/{ruleChainId}", ruleChainId); + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); + } + + private RuleChainId createRootRuleChainForRpcResponse() throws Exception { + RuleChain newRuleChain = new RuleChain(); + newRuleChain.setName("testRuleChain"); + ResponseEntity ruleChainResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/ruleChain", + newRuleChain, + RuleChain.class); + Assert.assertTrue(ruleChainResponse.getStatusCode().is2xxSuccessful()); + RuleChain ruleChain = ruleChainResponse.getBody(); + + JsonNode configuration = mapper.readTree(this.getClass().getClassLoader().getResourceAsStream("RpcResponseRuleChainMetadata.json")); + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(ruleChain.getId()); + ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt()); + ruleChainMetaData.setNodes(Arrays.asList(mapper.treeToValue(configuration.get("nodes"), RuleNode[].class))); + ruleChainMetaData.setConnections(Arrays.asList(mapper.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class))); + + ResponseEntity ruleChainMetadataResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/ruleChain/metadata", + ruleChainMetaData, + RuleChainMetaData.class); + Assert.assertTrue(ruleChainMetadataResponse.getStatusCode().is2xxSuccessful()); + + // Set a new rule chain as root + ResponseEntity rootRuleChainResponse = restClient.getRestTemplate() + .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root", + null, + RuleChain.class, + ruleChain.getId()); + Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful()); + + return ruleChain.getId(); + } + + private RuleChainId getDefaultRuleChainId() { + ResponseEntity> ruleChains = restClient.getRestTemplate().exchange( + HTTPS_URL + "/api/ruleChains?limit=40&textSearch=", + HttpMethod.GET, + null, + new ParameterizedTypeReference>() { + }); + + Optional defaultRuleChain = ruleChains.getBody().getData() + .stream() + .filter(RuleChain::isRoot) + .findFirst(); + if (!defaultRuleChain.isPresent()) { + Assert.fail("Root rule chain wasn't found"); + } + return defaultRuleChain.get().getId(); + } + + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); - MqttClient mqttClient = MqttClient.create(clientConfig, queue); + MqttClient mqttClient = MqttClient.create(clientConfig, listener); mqttClient.connect("localhost", 1883).sync(); return mqttClient; } diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java new file mode 100644 index 0000000000..f9774eef20 --- /dev/null +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.msa.mapper; + +import lombok.Data; + +import java.util.Map; + +@Data +public class AttributesResponse { + private Map client; + private Map shared; +} diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsTelemetryResponse.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java similarity index 96% rename from msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsTelemetryResponse.java rename to msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java index 834c5d1449..b22244f3a7 100644 --- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsTelemetryResponse.java +++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.msa; +package org.thingsboard.server.msa.mapper; import lombok.Data; diff --git a/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json b/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json new file mode 100644 index 0000000000..09178ef781 --- /dev/null +++ b/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json @@ -0,0 +1,59 @@ +{ + "firstNodeIndex": 0, + "nodes": [ + { + "additionalInfo": { + "layoutX": 325, + "layoutY": 150 + }, + "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", + "name": "msgTypeSwitch", + "debugMode": true, + "configuration": { + "version": 0 + } + }, + { + "additionalInfo": { + "layoutX": 60, + "layoutY": 300 + }, + "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", + "name": "formResponse", + "debugMode": true, + "configuration": { + "jsScript": "if (msg.method == \"getResponse\") {\n return {msg: {\"response\": \"requestReceived\"}, metadata: metadata, msgType: msgType};\n}\n\nreturn {msg: msg, metadata: metadata, msgType: msgType};" + } + }, + { + "additionalInfo": { + "layoutX": 450, + "layoutY": 300 + }, + "type": "org.thingsboard.rule.engine.rpc.TbSendRPCReplyNode", + "name": "rpcReply", + "debugMode": true, + "configuration": { + "requestIdMetaDataAttribute": "requestId" + } + } + ], + "connections": [ + { + "fromIndex": 0, + "toIndex": 1, + "type": "RPC Request from Device" + }, + { + "fromIndex": 1, + "toIndex": 2, + "type": "Success" + }, + { + "fromIndex": 1, + "toIndex": 2, + "type": "Failure" + } + ], + "ruleChainConnections": null +} \ No newline at end of file