From ee05935a498c779643ff46e1aeb65294016933c8 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 10 May 2022 13:42:48 +0300 Subject: [PATCH] added new MqttTestClient & MqttTestCallback & refactored credentials and attribute updates/request tests --- .../controller/TbTestWebSocketClient.java | 6 +- .../transport/mqtt/MqttTestCallback.java | 60 ++ .../server/transport/mqtt/MqttTestClient.java | 118 ++++ ...AbstractMqttAttributesIntegrationTest.java | 654 ++++++++---------- ...tBackwardCompatibilityIntegrationTest.java | 2 - ...AttributesRequestProtoIntegrationTest.java | 1 - ...sBackwardCompatibilityIntegrationTest.java | 1 - .../credentials/BasicMqttCredentialsTest.java | 41 +- 8 files changed, 510 insertions(+), 373 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java create mode 100644 application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 6af6176b06..f7f8c73d34 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -79,8 +79,12 @@ public class TbTestWebSocketClient extends WebSocketClient { } public void registerWaitForUpdate() { + registerWaitForUpdate(1); + } + + public void registerWaitForUpdate(int count) { lastMsg = null; - update = new CountDownLatch(1); + update = new CountDownLatch(count); } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java new file mode 100644 index 0000000000..a17d3bd2a4 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java @@ -0,0 +1,60 @@ +package org.thingsboard.server.transport.mqtt; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import java.util.concurrent.CountDownLatch; + +@Slf4j +@Data +public class MqttTestCallback implements MqttCallback { + + private final CountDownLatch subscribeLatch; + private final CountDownLatch deliveryLatch; + private int qoS; + private byte[] payloadBytes; + private String awaitSubTopic; + + public MqttTestCallback(String awaitSubTopic) { + this.subscribeLatch = new CountDownLatch(1); + this.deliveryLatch = new CountDownLatch(1); + this.awaitSubTopic = awaitSubTopic; + } + + public MqttTestCallback() { + this.subscribeLatch = new CountDownLatch(1); + this.deliveryLatch = new CountDownLatch(1); + } + + @Override + public void connectionLost(Throwable throwable) { + log.warn("connectionLost: ", throwable); + } + + @Override + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + if (awaitSubTopic == null) { + log.warn("messageArrived on topic: {}", requestTopic); + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + subscribeLatch.countDown(); + } else { + log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); + if (awaitSubTopic.equals(requestTopic)) { + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + subscribeLatch.countDown(); + } + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.warn("delivery complete: {}", iMqttDeliveryToken.getResponse()); + deliveryLatch.countDown(); + + } +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java new file mode 100644 index 0000000000..8afa41b160 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java @@ -0,0 +1,118 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt; + +import io.netty.handler.codec.mqtt.MqttQoS; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.thingsboard.server.common.data.StringUtils; + +import java.util.concurrent.TimeUnit; + +public class MqttTestClient { + + private static final String MQTT_URL = "tcp://localhost:1883"; + private static final int TIMEOUT = 30; // seconds + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(TIMEOUT); + + private final MqttAsyncClient client; + + public void setCallback(MqttTestCallback callback) { + client.setCallback(callback); + } + + public MqttTestClient() throws MqttException { + this.client = createClient(); + } + + public MqttTestClient(String clientId) throws MqttException { + this.client = createClient(clientId); + } + + public void connectAndWait(String userName, String password) throws MqttException { + IMqttToken connect = connect(userName, password); + connect.waitForCompletion(TIMEOUT_MS); + } + + public void connectAndWait(String userName) throws MqttException { + connectAndWait(userName, null); + } + + public void connectAndWait() throws MqttException { + connectAndWait(null, null); + } + + private IMqttToken connect(String userName, String password) throws MqttException { + if (client == null) { + throw new RuntimeException("Failed to connect! MqttAsyncClient is not initialized!"); + } + MqttConnectOptions options = new MqttConnectOptions(); + if (StringUtils.isNotEmpty(userName)) { + options.setUserName(userName); + } + if (StringUtils.isNotEmpty(password)) { + options.setPassword(password.toCharArray()); + } + return client.connect(options); + } + + public void disconnectAndWait() throws MqttException { + disconnect().waitForCompletion(TIMEOUT_MS); + } + + public IMqttToken disconnect() throws MqttException { + return client.disconnect(); + } + + public void disconnectForcibly() throws MqttException { + client.disconnectForcibly(TIMEOUT_MS); + } + + public void publishAndWait(String topic, byte[] payload) throws MqttException { + publish(topic, payload).waitForCompletion(TIMEOUT_MS); + } + + public IMqttDeliveryToken publish(String topic, byte[] payload) throws MqttException { + MqttMessage message = new MqttMessage(); + message.setPayload(payload); + return client.publish(topic, message); + } + + public void subscribeAndWait(String topic, MqttQoS qoS) throws MqttException { + subscribe(topic, qoS).waitForCompletion(TIMEOUT_MS); + } + + public IMqttToken subscribe(String topic, MqttQoS qoS) throws MqttException { + return client.subscribe(topic, qoS.value()); + } + + private MqttAsyncClient createClient(String clientId) throws MqttException { + if (StringUtils.isEmpty(clientId)) { + clientId = MqttAsyncClient.generateClientId(); + } + return new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence()); + } + + private MqttAsyncClient createClient() throws MqttException { + return createClient(null); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java index 530e67a625..261f904e2d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java @@ -22,36 +22,47 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.squareup.wire.schema.internal.parser.ProtoFileElement; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.test.context.TestPropertySource; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.DeviceTypeFilter; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.MqttTestClient; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; +import static org.thingsboard.server.common.data.query.EntityKeyType.CLIENT_ATTRIBUTE; +import static org.thingsboard.server.common.data.query.EntityKeyType.SHARED_ATTRIBUTE; + +@TestPropertySource(properties = { + "js.evaluator=mock", +}) @Slf4j public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqttIntegrationTest { @@ -60,11 +71,11 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt "package test;\n" + "\n" + "message PostAttributes {\n" + - " string attribute1 = 1;\n" + - " bool attribute2 = 2;\n" + - " double attribute3 = 3;\n" + - " int32 attribute4 = 4;\n" + - " JsonObject attribute5 = 5;\n" + + " string clientStr = 1;\n" + + " bool clientBool = 2;\n" + + " double clientDbl = 3;\n" + + " int32 clientLong = 4;\n" + + " JsonObject clientJson = 5;\n" + "\n" + " message JsonObject {\n" + " int32 someNumber = 6;\n" + @@ -76,17 +87,20 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt " }\n" + "}"; - protected static final String POST_ATTRIBUTES_PAYLOAD = "{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73," + - "\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}"; + private static final String CLIENT_ATTRIBUTES_PAYLOAD = "{\"clientStr\":\"value1\",\"clientBool\":true,\"clientDbl\":42.0,\"clientLong\":73," + + "\"clientJson\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}"; + + private static final String SHARED_ATTRIBUTES_PAYLOAD = "{\"sharedStr\":\"value1\",\"sharedBool\":true,\"sharedDbl\":42.0,\"sharedLong\":73," + + "\"sharedJson\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}"; - private static final String RESPONSE_ATTRIBUTES_PAYLOAD_DELETED = "{\"deleted\":[\"attribute5\"]}"; + private static final String SHARED_ATTRIBUTES_DELETED_RESPONSE = "{\"deleted\":[\"sharedJson\"]}"; - protected List getTsKvProtoList() { - TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto("attribute1", "value1", TransportProtos.KeyValueType.STRING_V); - TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto("attribute2", "true", TransportProtos.KeyValueType.BOOLEAN_V); - TransportProtos.TsKvProto tsKvProtoAttribute3 = getTsKvProto("attribute3", "42.0", TransportProtos.KeyValueType.DOUBLE_V); - TransportProtos.TsKvProto tsKvProtoAttribute4 = getTsKvProto("attribute4", "73", TransportProtos.KeyValueType.LONG_V); - TransportProtos.TsKvProto tsKvProtoAttribute5 = getTsKvProto("attribute5", "{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", TransportProtos.KeyValueType.JSON_V); + private List getTsKvProtoList(String attributePrefix) { + TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto(attributePrefix + "Str", "value1", TransportProtos.KeyValueType.STRING_V); + TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto(attributePrefix + "Bool", "true", TransportProtos.KeyValueType.BOOLEAN_V); + TransportProtos.TsKvProto tsKvProtoAttribute3 = getTsKvProto(attributePrefix + "Dbl", "42.0", TransportProtos.KeyValueType.DOUBLE_V); + TransportProtos.TsKvProto tsKvProtoAttribute4 = getTsKvProto(attributePrefix + "Long", "73", TransportProtos.KeyValueType.LONG_V); + TransportProtos.TsKvProto tsKvProtoAttribute5 = getTsKvProto(attributePrefix + "Json", "{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", TransportProtos.KeyValueType.JSON_V); List tsKvProtoList = new ArrayList<>(); tsKvProtoList.add(tsKvProtoAttribute1); tsKvProtoList.add(tsKvProtoAttribute2); @@ -103,118 +117,56 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt return tsKvProtoBuilder.build(); } - protected TestMqttCallback getTestMqttCallback() { - CountDownLatch latch = new CountDownLatch(1); - return new TestMqttCallback(latch); - } - - protected static class TestMqttCallback implements MqttCallback { - - private final CountDownLatch latch; - private Integer qoS; - private byte[] payloadBytes; - - TestMqttCallback(CountDownLatch latch) { - this.latch = latch; - } - - public int getQoS() { - return qoS; - } - - public byte[] getPayloadBytes() { - return payloadBytes; - } - - public CountDownLatch getLatch() { - return latch; - } - - @Override - public void connectionLost(Throwable throwable) { - } - - @Override - public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { - qoS = mqttMessage.getQos(); - payloadBytes = mqttMessage.getPayload(); - latch.countDown(); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - } - // subscribe to attributes updates from server methods protected void processJsonTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception { - - MqttAsyncClient client = getMqttAsyncClient(accessToken); - - TestMqttCallback onUpdateCallback = getTestMqttCallback(); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + MqttTestCallback onUpdateCallback = new MqttTestCallback(); client.setCallback(onUpdateCallback); + client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); - client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()); + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); - Thread.sleep(1000); + validateUpdateAttributesJsonResponse(onUpdateCallback, SHARED_ATTRIBUTES_PAYLOAD); - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS); - - validateUpdateAttributesJsonResponse(onUpdateCallback); - - TestMqttCallback onDeleteCallback = getTestMqttCallback(); + MqttTestCallback onDeleteCallback = new MqttTestCallback(); client.setCallback(onDeleteCallback); - - doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class); - onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS); - - validateDeleteAttributesJsonResponse(onDeleteCallback); + doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); + onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + validateUpdateAttributesJsonResponse(onDeleteCallback, SHARED_ATTRIBUTES_DELETED_RESPONSE); + client.disconnect(); } protected void processProtoTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception { - - MqttAsyncClient client = getMqttAsyncClient(accessToken); - - TestMqttCallback onUpdateCallback = getTestMqttCallback(); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + MqttTestCallback onUpdateCallback = new MqttTestCallback(); client.setCallback(onUpdateCallback); + client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); - client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()); - - Thread.sleep(1000); - - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS); - + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); validateUpdateAttributesProtoResponse(onUpdateCallback); - TestMqttCallback onDeleteCallback = getTestMqttCallback(); + MqttTestCallback onDeleteCallback = new MqttTestCallback(); client.setCallback(onDeleteCallback); - - doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class); - onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS); - + doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); + onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); validateDeleteAttributesProtoResponse(onDeleteCallback); + client.disconnect(); } - protected void validateUpdateAttributesJsonResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { - assertNotNull(callback.getPayloadBytes()); - String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); - assertEquals(JacksonUtil.toJsonNode(POST_ATTRIBUTES_PAYLOAD), JacksonUtil.toJsonNode(response)); - } - - protected void validateDeleteAttributesJsonResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { + protected void validateUpdateAttributesJsonResponse(MqttTestCallback callback, String expectedResponse) { assertNotNull(callback.getPayloadBytes()); - String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); - assertEquals(JacksonUtil.toJsonNode(RESPONSE_ATTRIBUTES_PAYLOAD_DELETED), JacksonUtil.toJsonNode(response)); + assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - protected void validateUpdateAttributesProtoResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { + protected void validateUpdateAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); - List tsKvProtoList = getTsKvProtoList(); + List tsKvProtoList = getTsKvProtoList("shared"); attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); TransportProtos.AttributeUpdateNotificationMsg expectedAttributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build(); @@ -227,134 +179,99 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertTrue(actualSharedUpdatedList.containsAll(expectedSharedUpdatedList)); } - protected void validateDeleteAttributesProtoResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { + protected void validateDeleteAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); - attributeUpdateNotificationMsgBuilder.addSharedDeleted("attribute5"); + attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); TransportProtos.AttributeUpdateNotificationMsg expectedAttributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build(); TransportProtos.AttributeUpdateNotificationMsg actualAttributeUpdateNotificationMsg = TransportProtos.AttributeUpdateNotificationMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedAttributeUpdateNotificationMsg.getSharedDeletedList().size(), actualAttributeUpdateNotificationMsg.getSharedDeletedList().size()); - assertEquals("attribute5", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0)); + assertEquals("sharedJson", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0)); } protected void processJsonGatewayTestSubscribeToAttributesUpdates() throws Exception { - - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); - - TestMqttCallback onUpdateCallback = getTestMqttCallback(); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + MqttTestCallback onUpdateCallback = new MqttTestCallback(); client.setCallback(onUpdateCallback); - Device device = new Device(); - device.setName("Gateway Device Subscribe to attribute updates"); - device.setType("default"); - - byte[] connectPayloadBytes = getJsonConnectPayloadBytes(); + String deviceName = "Gateway Device Subscribe to attribute updates"; + byte[] connectPayloadBytes = getJsonConnectPayloadBytes(deviceName, deviceProfile.getTransportType().name()); - publishMqttMsg(client, connectPayloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); + client.publishAndWait(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); - Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class), + Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), 20, 100); assertNotNull(savedDevice); - client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value()); + client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); - Thread.sleep(1000); + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS); + validateJsonGatewayUpdateAttributesResponse(onUpdateCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD); - validateJsonGatewayUpdateAttributesResponse(onUpdateCallback); - - TestMqttCallback onDeleteCallback = getTestMqttCallback(); + MqttTestCallback onDeleteCallback = new MqttTestCallback(); client.setCallback(onDeleteCallback); - doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class); - onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS); - - validateJsonGatewayDeleteAttributesResponse(onDeleteCallback); + doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); + onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); + validateJsonGatewayUpdateAttributesResponse(onDeleteCallback, deviceName, SHARED_ATTRIBUTES_DELETED_RESPONSE); + client.disconnect(); } protected void processProtoGatewayTestSubscribeToAttributesUpdates() throws Exception { - - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); - - TestMqttCallback onUpdateCallback = getTestMqttCallback(); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + MqttTestCallback onUpdateCallback = new MqttTestCallback(); client.setCallback(onUpdateCallback); - - Device device = new Device(); - device.setName("Gateway Device Subscribe to attribute updates"); - device.setType("default"); - - byte[] connectPayloadBytes = getProtoConnectPayloadBytes(); - - publishMqttMsg(client, connectPayloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC); - - Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class), + String deviceName = "Gateway Device Subscribe to attribute updates"; + byte[] connectPayloadBytes = getProtoConnectPayloadBytes(deviceName, TransportPayloadType.PROTOBUF.name()); + client.publishAndWait(GATEWAY_CONNECT_TOPIC, connectPayloadBytes); + Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), 20, 100); - - assertNotNull(savedDevice); - - client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value()); - - Thread.sleep(1000); - - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS); - - validateProtoGatewayUpdateAttributesResponse(onUpdateCallback); - - TestMqttCallback onDeleteCallback = getTestMqttCallback(); + assertNotNull(device); + client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); + doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + validateProtoGatewayUpdateAttributesResponse(onUpdateCallback, deviceName); + MqttTestCallback onDeleteCallback = new MqttTestCallback(); client.setCallback(onDeleteCallback); - - doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class); - onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS); - - validateProtoGatewayDeleteAttributesResponse(onDeleteCallback); - - } - - protected void validateJsonGatewayUpdateAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { - assertNotNull(callback.getPayloadBytes()); - String s = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); - assertEquals(getJsonResponseGatewayAttributesUpdatedPayload(), s); + doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); + validateProtoGatewayDeleteAttributesResponse(onDeleteCallback, deviceName); + client.disconnect(); } - protected void validateJsonGatewayDeleteAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { + protected void validateJsonGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName, String expectResultData) { assertNotNull(callback.getPayloadBytes()); - String s = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); - assertEquals(s, getJsonResponseGatewayAttributesDeletedPayload()); + assertEquals(JacksonUtil.toJsonNode(getGatewayAttributesResponseJson(deviceName, expectResultData)), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - protected byte[] getJsonConnectPayloadBytes() { - String connectPayload = "{\"device\": \"Gateway Device Subscribe to attribute updates\", \"type\": \"" + TransportPayloadType.JSON.name() + "\"}"; + protected byte[] getJsonConnectPayloadBytes(String deviceName, String deviceType) { + String connectPayload = "{\"device\":\"" + deviceName + "\", \"type\": \"" + deviceType + "\"}"; return connectPayload.getBytes(); } - private static String getJsonResponseGatewayAttributesUpdatedPayload() { - return "{\"device\":\"" + "Gateway Device Subscribe to attribute updates" + "\"," + - "\"data\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}"; - } - - private static String getJsonResponseGatewayAttributesDeletedPayload() { - return "{\"device\":\"" + "Gateway Device Subscribe to attribute updates" + "\",\"data\":{\"deleted\":[\"attribute5\"]}}"; + private static String getGatewayAttributesResponseJson(String deviceName, String expectResultData) { + return "{\"device\":\"" + deviceName + "\"," + "\"data\":" + expectResultData + "}"; } - protected void validateProtoGatewayUpdateAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { + protected void validateProtoGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertNotNull(callback.getPayloadBytes()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); - List tsKvProtoList = getTsKvProtoList(); + List tsKvProtoList = getTsKvProtoList("shared"); attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); TransportProtos.AttributeUpdateNotificationMsg expectedAttributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build(); TransportApiProtos.GatewayAttributeUpdateNotificationMsg.Builder gatewayAttributeUpdateNotificationMsgBuilder = TransportApiProtos.GatewayAttributeUpdateNotificationMsg.newBuilder(); - gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName("Gateway Device Subscribe to attribute updates"); + gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName(deviceName); gatewayAttributeUpdateNotificationMsgBuilder.setNotificationMsg(expectedAttributeUpdateNotificationMsg); TransportApiProtos.GatewayAttributeUpdateNotificationMsg expectedGatewayAttributeUpdateNotificationMsg = gatewayAttributeUpdateNotificationMsgBuilder.build(); @@ -367,17 +284,17 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertEquals(expectedSharedUpdatedList.size(), actualSharedUpdatedList.size()); assertTrue(actualSharedUpdatedList.containsAll(expectedSharedUpdatedList)); - } - protected void validateProtoGatewayDeleteAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException { + protected void validateProtoGatewayDeleteAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertNotNull(callback.getPayloadBytes()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); - attributeUpdateNotificationMsgBuilder.addSharedDeleted("attribute5"); + attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build(); TransportApiProtos.GatewayAttributeUpdateNotificationMsg.Builder gatewayAttributeUpdateNotificationMsgBuilder = TransportApiProtos.GatewayAttributeUpdateNotificationMsg.newBuilder(); - gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName("Gateway Device Subscribe to attribute updates"); + gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName(deviceName); gatewayAttributeUpdateNotificationMsgBuilder.setNotificationMsg(attributeUpdateNotificationMsg); TransportApiProtos.GatewayAttributeUpdateNotificationMsg expectedGatewayAttributeUpdateNotificationMsg = gatewayAttributeUpdateNotificationMsgBuilder.build(); @@ -389,118 +306,187 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt TransportProtos.AttributeUpdateNotificationMsg actualAttributeUpdateNotificationMsg = actualGatewayAttributeUpdateNotificationMsg.getNotificationMsg(); assertEquals(expectedAttributeUpdateNotificationMsg.getSharedDeletedList().size(), actualAttributeUpdateNotificationMsg.getSharedDeletedList().size()); - assertEquals("attribute5", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0)); - - } - - protected byte[] getProtoConnectPayloadBytes() { - TransportApiProtos.ConnectMsg connectProto = getConnectProto(); - return connectProto.toByteArray(); + assertEquals("sharedJson", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0)); } - private TransportApiProtos.ConnectMsg getConnectProto() { - TransportApiProtos.ConnectMsg.Builder builder = TransportApiProtos.ConnectMsg.newBuilder(); - builder.setDeviceName("Gateway Device Subscribe to attribute updates"); - builder.setDeviceType(TransportPayloadType.PROTOBUF.name()); - return builder.build(); + private byte[] getProtoConnectPayloadBytes(String deviceName, String deviceType) { + TransportApiProtos.ConnectMsg connectMsg = TransportApiProtos.ConnectMsg.newBuilder() + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .build(); + return connectMsg.toByteArray(); } // request attributes from server methods protected void processJsonTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception { - - MqttAsyncClient client = getMqttAsyncClient(accessToken); - - postJsonAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic); - - Thread.sleep(5000); - - TestMqttCallback callback = getTestMqttCallback(); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + DeviceTypeFilter dtf = new DeviceTypeFilter(savedDevice.getType(), savedDevice.getName()); + String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; + String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; + List clientKeysList = List.of(clientKeysStr.split(",")); + List sharedKeysList = List.of(sharedKeysStr.split(",")); + List csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE); + List shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE); + List keys = new ArrayList<>(); + keys.addAll(csKeys); + keys.addAll(shKeys); + getWsClient().subscribeLatestUpdate(keys, dtf); + getWsClient().registerWaitForUpdate(2); + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", + SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + client.publishAndWait(attrPubTopic, CLIENT_ATTRIBUTES_PAYLOAD.getBytes()); + client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); + String update = getWsClient().waitForUpdate(); + assertThat(update).as("ws update received").isNotBlank(); + MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1")); client.setCallback(callback); - - validateJsonResponse(client, callback.getLatch(), callback, attrReqTopicPrefix); + String payloadStr = "{\"clientKeys\":\"" + clientKeysStr + "\", \"sharedKeys\":\"" + sharedKeysStr + "\"}"; + client.publishAndWait(attrReqTopicPrefix + "1", payloadStr.getBytes()); + String expectedResponse = "{\"client\":" + CLIENT_ATTRIBUTES_PAYLOAD + ",\"shared\":" + SHARED_ATTRIBUTES_PAYLOAD + "}"; + validateJsonResponse(callback, expectedResponse); + client.disconnect(); } protected void processProtoTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception { - - MqttAsyncClient client = getMqttAsyncClient(accessToken); - - postProtoAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic); - - Thread.sleep(5000); - - TestMqttCallback callback = getTestMqttCallback(); + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + DeviceTypeFilter dtf = new DeviceTypeFilter(savedDevice.getType(), savedDevice.getName()); + String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; + String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; + List clientKeysList = List.of(clientKeysStr.split(",")); + List sharedKeysList = List.of(sharedKeysStr.split(",")); + List csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE); + List shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE); + List keys = new ArrayList<>(); + keys.addAll(csKeys); + keys.addAll(shKeys); + getWsClient().subscribeLatestUpdate(keys, dtf); + getWsClient().registerWaitForUpdate(2); + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + client.publishAndWait(attrPubTopic, getAttributesProtoPayloadBytes()); + client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); + String update = getWsClient().waitForUpdate(); + assertThat(update).as("ws update received").isNotBlank(); + MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1")); client.setCallback(callback); - - validateProtoResponse(client, callback.getLatch(), callback, attrReqTopicPrefix); + TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder(); + attributesRequestBuilder.setClientKeys(clientKeysStr); + attributesRequestBuilder.setSharedKeys(sharedKeysStr); + TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build(); + client.publishAndWait(attrReqTopicPrefix + "1", attributesRequest.toByteArray()); + validateProtoResponse(callback, getExpectedAttributeResponseMsg()); + client.disconnect(); } protected void processJsonTestGatewayRequestAttributesValuesFromTheServer() throws Exception { + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + String deviceName = "Gateway Device Request Attributes"; + String postClientAttributes = "{\"" + deviceName + "\":" + CLIENT_ATTRIBUTES_PAYLOAD + "}"; + client.publishAndWait(GATEWAY_ATTRIBUTES_TOPIC, postClientAttributes.getBytes()); - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); - - postJsonGatewayDeviceClientAttributes(client); - - Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class), + Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), 20, 100); - - assertNotNull(savedDevice); - - Thread.sleep(2000); - - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - - Thread.sleep(5000); - - client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - - TestMqttCallback clientAttributesCallback = getTestMqttCallback(); + assertNotNull(device); + + DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName()); + String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; + String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; + List clientKeysList = List.of(clientKeysStr.split(",")); + List sharedKeysList = List.of(sharedKeysStr.split(",")); + List csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE); + List shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE); + List keys = new ArrayList<>(); + keys.addAll(csKeys); + keys.addAll(shKeys); + EntityDataUpdate initUpdate = getWsClient().subscribeLatestUpdate(keys, dtf); + assertNotNull(initUpdate); + PageData data = initUpdate.getData(); + assertNotNull(data); + assertFalse(data.getData().isEmpty()); + getWsClient().registerWaitForUpdate(); + + doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + String update = getWsClient().waitForUpdate(); + assertThat(update).as("ws update received").isNotBlank(); + + client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE); + + MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(clientAttributesCallback); - validateJsonClientResponseGateway(client, clientAttributesCallback); + String csKeysStr = "[\"clientStr\", \"clientBool\", \"clientDbl\", \"clientLong\", \"clientJson\"]"; + String csRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": true, \"keys\": " + csKeysStr + "}"; + client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, csRequestPayloadStr.getBytes()); + validateJsonResponseGateway(clientAttributesCallback, deviceName, CLIENT_ATTRIBUTES_PAYLOAD); - TestMqttCallback sharedAttributesCallback = getTestMqttCallback(); + MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(sharedAttributesCallback); - validateJsonSharedResponseGateway(client, sharedAttributesCallback); + String shKeysStr = "[\"sharedStr\", \"sharedBool\", \"sharedDbl\", \"sharedLong\", \"sharedJson\"]"; + String shRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": false, \"keys\": " + shKeysStr + "}"; + client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, shRequestPayloadStr.getBytes()); + validateJsonResponseGateway(sharedAttributesCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD); + + client.disconnect(); } protected void processProtoTestGatewayRequestAttributesValuesFromTheServer() throws Exception { + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + String deviceName = "Gateway Device Request Attributes"; + String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; + List clientKeysList = List.of(clientKeysStr.split(",")); + client.publishAndWait(GATEWAY_ATTRIBUTES_TOPIC, getProtoGatewayDeviceClientAttributesPayload(deviceName, clientKeysList)); - postProtoGatewayDeviceClientAttributes(client); - - Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class), + Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), 20, 100); - - assertNotNull(savedDevice); - - Thread.sleep(2000); - - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - - Thread.sleep(5000); - - client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - - TestMqttCallback clientAttributesCallback = getTestMqttCallback(); + assertNotNull(device); + + DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName()); + String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; + List sharedKeysList = List.of(sharedKeysStr.split(",")); + List csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE); + List shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE); + List keys = new ArrayList<>(); + keys.addAll(csKeys); + keys.addAll(shKeys); + EntityDataUpdate initUpdate = getWsClient().subscribeLatestUpdate(keys, dtf); + assertNotNull(initUpdate); + PageData data = initUpdate.getData(); + assertNotNull(data); + assertFalse(data.getData().isEmpty()); + getWsClient().registerWaitForUpdate(); + + doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + String update = getWsClient().waitForUpdate(); + assertThat(update).as("ws update received").isNotBlank(); + + client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE); + + MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(clientAttributesCallback); - validateProtoClientResponseGateway(client, clientAttributesCallback); + TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, clientKeysList, true); + client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray()); + validateProtoClientResponseGateway(clientAttributesCallback, deviceName); - TestMqttCallback sharedAttributesCallback = getTestMqttCallback(); + MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(sharedAttributesCallback); - validateProtoSharedResponseGateway(client, sharedAttributesCallback); + gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, sharedKeysList, false); + client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray()); + validateProtoSharedResponseGateway(sharedAttributesCallback, deviceName); + + client.disconnect(); } - protected void postJsonAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception { - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - client.publish(attrPubTopic, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); + private List getEntityKeys(List keys, EntityKeyType scope) { + return keys.stream().map(key -> new EntityKey(scope, key)).collect(Collectors.toList()); } - protected void postProtoAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception { - doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", AbstractMqttAttributesIntegrationTest.POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + private byte[] getAttributesProtoPayloadBytes() { DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; @@ -530,64 +516,39 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType(); assertNotNull(postAttributesMsgDescriptor); DynamicMessage postAttributesMsg = postAttributesBuilder - .setField(postAttributesMsgDescriptor.findFieldByName("attribute1"), "value1") - .setField(postAttributesMsgDescriptor.findFieldByName("attribute2"), true) - .setField(postAttributesMsgDescriptor.findFieldByName("attribute3"), 42.0) - .setField(postAttributesMsgDescriptor.findFieldByName("attribute4"), 73) - .setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject) + .setField(postAttributesMsgDescriptor.findFieldByName("clientStr"), "value1") + .setField(postAttributesMsgDescriptor.findFieldByName("clientBool"), true) + .setField(postAttributesMsgDescriptor.findFieldByName("clientDbl"), 42.0) + .setField(postAttributesMsgDescriptor.findFieldByName("clientLong"), 73) + .setField(postAttributesMsgDescriptor.findFieldByName("clientJson"), jsonObject) .build(); - byte[] payload = postAttributesMsg.toByteArray(); - client.publish(attrPubTopic, new MqttMessage(payload)); - client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()); + return postAttributesMsg.toByteArray(); } - protected void postJsonGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception { - String postClientAttributes = "{\"" + "Gateway Device Request Attributes" + "\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}"; - client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(postClientAttributes.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - } - - protected void postProtoGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception { - String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; - List expectedKeys = Arrays.asList(keys.split(",")); - TransportProtos.PostAttributeMsg postAttributeMsg = getPostAttributeMsg(expectedKeys); + protected byte[] getProtoGatewayDeviceClientAttributesPayload(String deviceName, List clientKeysList) { + TransportProtos.PostAttributeMsg postAttributeMsg = getPostAttributeMsg(clientKeysList); TransportApiProtos.AttributesMsg.Builder attributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); - attributesMsgBuilder.setDeviceName("Gateway Device Request Attributes"); + attributesMsgBuilder.setDeviceName(deviceName); attributesMsgBuilder.setMsg(postAttributeMsg); TransportApiProtos.AttributesMsg attributesMsg = attributesMsgBuilder.build(); TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributeMsgBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder(); gatewayAttributeMsgBuilder.addMsg(attributesMsg); - byte[] bytes = gatewayAttributeMsgBuilder.build().toByteArray(); - client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(bytes)); + return gatewayAttributeMsgBuilder.build().toByteArray(); } - protected void validateJsonResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopicPrefix) throws MqttException, InterruptedException, InvalidProtocolBufferException { - String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; - String payloadStr = "{\"clientKeys\":\"" + keys + "\", \"sharedKeys\":\"" + keys + "\"}"; - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setPayload(payloadStr.getBytes()); - client.publish(attrReqTopicPrefix + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - latch.await(1, TimeUnit.MINUTES); + protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); - String expectedRequestPayload = "{\"client\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}},\"shared\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}"; - assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.toJsonNode(new String(callback.getPayloadBytes(), StandardCharsets.UTF_8))); + assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - protected void validateProtoResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopic) throws MqttException, InterruptedException, InvalidProtocolBufferException { - String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; - TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder(); - attributesRequestBuilder.setClientKeys(keys); - attributesRequestBuilder.setSharedKeys(keys); - TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build(); - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setPayload(attributesRequest.toByteArray()); - client.publish(attrReqTopic + "1", mqttMessage); - latch.await(3, TimeUnit.SECONDS); + protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); - TransportProtos.GetAttributeResponseMsg expectedAttributesResponse = getExpectedAttributeResponseMsg(); TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); - assertEquals(expectedAttributesResponse.getRequestId(), actualAttributesResponse.getRequestId()); - List expectedClientKeyValueProtos = expectedAttributesResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); - List expectedSharedKeyValueProtos = expectedAttributesResponse.getSharedAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); + assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId()); + List expectedClientKeyValueProtos = expectedResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); + List expectedSharedKeyValueProtos = expectedResponse.getSharedAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); List actualClientKeyValueProtos = actualAttributesResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); List actualSharedKeyValueProtos = actualAttributesResponse.getSharedAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); assertTrue(actualClientKeyValueProtos.containsAll(expectedClientKeyValueProtos)); @@ -596,42 +557,25 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt private TransportProtos.GetAttributeResponseMsg getExpectedAttributeResponseMsg() { TransportProtos.GetAttributeResponseMsg.Builder result = TransportProtos.GetAttributeResponseMsg.newBuilder(); - List tsKvProtoList = getTsKvProtoList(); - result.addAllClientAttributeList(tsKvProtoList); - result.addAllSharedAttributeList(tsKvProtoList); + List csTsKvProtoList = getTsKvProtoList("client"); + List shTsKvProtoList = getTsKvProtoList("shared"); + result.addAllClientAttributeList(csTsKvProtoList); + result.addAllSharedAttributeList(shTsKvProtoList); result.setRequestId(1); return result.build(); } - protected void validateJsonClientResponseGateway(MqttAsyncClient client, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException { - String payloadStr = "{\"id\": 1, \"device\": \"" + "Gateway Device Request Attributes" + "\", \"client\": true, \"keys\": [\"attribute1\", \"attribute2\", \"attribute3\", \"attribute4\", \"attribute5\"]}"; - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setPayload(payloadStr.getBytes()); - client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - callback.getLatch().await(1, TimeUnit.MINUTES); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); - String expectedRequestPayload = "{\"id\":1,\"device\":\"" + "Gateway Device Request Attributes" + "\",\"values\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}"; - assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.toJsonNode(new String(callback.getPayloadBytes(), StandardCharsets.UTF_8))); - } - - protected void validateJsonSharedResponseGateway(MqttAsyncClient client, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException { - String payloadStr = "{\"id\": 1, \"device\": \"" + "Gateway Device Request Attributes" + "\", \"client\": false, \"keys\": [\"attribute1\", \"attribute2\", \"attribute3\", \"attribute4\", \"attribute5\"]}"; - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setPayload(payloadStr.getBytes()); - client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - callback.getLatch().await(1, TimeUnit.MINUTES); + protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); - String expectedRequestPayload = "{\"id\":1,\"device\":\"" + "Gateway Device Request Attributes" + "\",\"values\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}"; - assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.toJsonNode(new String(callback.getPayloadBytes(), StandardCharsets.UTF_8))); + String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}"; + assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - protected void validateProtoClientResponseGateway(MqttAsyncClient client, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException { - String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; - TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(keys, true); - client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, new MqttMessage(gatewayAttributesRequestMsg.toByteArray())); - callback.getLatch().await(3, TimeUnit.SECONDS); + protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); - TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(true); + TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName()); @@ -644,13 +588,10 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertTrue(actualClientKeyValueProtos.containsAll(expectedClientKeyValueProtos)); } - protected void validateProtoSharedResponseGateway(MqttAsyncClient client, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException { - String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; - TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(keys, false); - client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, new MqttMessage(gatewayAttributesRequestMsg.toByteArray())); - callback.getLatch().await(3, TimeUnit.SECONDS); + protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { + callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); - TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(false); + TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName()); @@ -664,27 +605,26 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertTrue(actualSharedKeyValueProtos.containsAll(expectedSharedKeyValueProtos)); } - private TransportApiProtos.GatewayAttributeResponseMsg getExpectedGatewayAttributeResponseMsg(boolean client) { + private TransportApiProtos.GatewayAttributeResponseMsg getExpectedGatewayAttributeResponseMsg(String deviceName, boolean client) { TransportApiProtos.GatewayAttributeResponseMsg.Builder gatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.newBuilder(); TransportProtos.GetAttributeResponseMsg.Builder getAttributeResponseMsgBuilder = TransportProtos.GetAttributeResponseMsg.newBuilder(); - List tsKvProtoList = getTsKvProtoList(); if (client) { - getAttributeResponseMsgBuilder.addAllClientAttributeList(tsKvProtoList); + getAttributeResponseMsgBuilder.addAllClientAttributeList(getTsKvProtoList("client")); } else { - getAttributeResponseMsgBuilder.addAllSharedAttributeList(tsKvProtoList); + getAttributeResponseMsgBuilder.addAllSharedAttributeList(getTsKvProtoList("shared")); } getAttributeResponseMsgBuilder.setRequestId(1); TransportProtos.GetAttributeResponseMsg getAttributeResponseMsg = getAttributeResponseMsgBuilder.build(); - gatewayAttributeResponseMsg.setDeviceName("Gateway Device Request Attributes"); + gatewayAttributeResponseMsg.setDeviceName(deviceName); gatewayAttributeResponseMsg.setResponseMsg(getAttributeResponseMsg); return gatewayAttributeResponseMsg.build(); } - private TransportApiProtos.GatewayAttributesRequestMsg getGatewayAttributesRequestMsg(String keys, boolean client) { + private TransportApiProtos.GatewayAttributesRequestMsg getGatewayAttributesRequestMsg(String deviceName, List keysList, boolean client) { return TransportApiProtos.GatewayAttributesRequestMsg.newBuilder() + .setDeviceName(deviceName) + .addAllKeys(keysList) .setClient(client) - .addAllKeys(Arrays.asList(keys.split(","))) - .setDeviceName("Gateway Device Request Attributes") .setId(1).build(); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestBackwardCompatibilityIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestBackwardCompatibilityIntegrationTest.java index f2bacc66ed..2231cb89eb 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestBackwardCompatibilityIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestBackwardCompatibilityIntegrationTest.java @@ -85,7 +85,6 @@ public class MqttAttributesRequestBackwardCompatibilityIntegrationTest extends A @Test public void testRequestAttributesValuesFromTheServerGatewayWithEnabledJsonCompatibilityAndJsonDownlinks() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() - .deviceName("Test Request attribute values from the server proto") .gatewayName("Gateway Test Request attribute values from the server proto") .transportPayloadType(TransportPayloadType.PROTOBUF) .enableCompatibilityWithJsonPayloadFormat(true) @@ -99,7 +98,6 @@ public class MqttAttributesRequestBackwardCompatibilityIntegrationTest extends A public void testRequestAttributesValuesFromTheServerOnShortJsonTopicWithEnabledJsonCompatibilityAndJsonDownlinks() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .deviceName("Test Request attribute values from the server proto") - .gatewayName("Gateway Test Request attribute values from the server proto") .transportPayloadType(TransportPayloadType.PROTOBUF) .enableCompatibilityWithJsonPayloadFormat(true) .useJsonPayloadFormatForDefaultDownlinkTopics(true) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestProtoIntegrationTest.java index 402d9a7263..d330cdbbaf 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestProtoIntegrationTest.java @@ -67,7 +67,6 @@ public class MqttAttributesRequestProtoIntegrationTest extends AbstractMqttAttri @Test public void testRequestAttributesValuesFromTheServerGateway() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() - .deviceName("Test Request attribute values from the server proto") .gatewayName("Gateway Test Request attribute values from the server proto") .transportPayloadType(TransportPayloadType.PROTOBUF) .build(); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/MqttAttributesUpdatesBackwardCompatibilityIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/MqttAttributesUpdatesBackwardCompatibilityIntegrationTest.java index c79f695fb6..2d72af2526 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/MqttAttributesUpdatesBackwardCompatibilityIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/MqttAttributesUpdatesBackwardCompatibilityIntegrationTest.java @@ -90,7 +90,6 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A @Test public void testProtoSubscribeToAttributesUpdatesFromTheServerGatewayWithEnabledJsonCompatibilityAndJsonDownlinks() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() - .deviceName("Test Subscribe to attribute updates") .gatewayName("Gateway Test Subscribe to attribute updates") .transportPayloadType(TransportPayloadType.PROTOBUF) .enableCompatibilityWithJsonPayloadFormat(true) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/credentials/BasicMqttCredentialsTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/credentials/BasicMqttCredentialsTest.java index 107d8dd0fa..795ca7f660 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/credentials/BasicMqttCredentialsTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/credentials/BasicMqttCredentialsTest.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestClient; import java.util.Arrays; import java.util.HashSet; @@ -93,33 +94,51 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest { @Test public void testCorrectCredentials() throws Exception { // Check that correct devices receive telemetry - testTelemetryIsDelivered(accessTokenDevice, getMqttAsyncClient(null, USER_NAME1, null)); - testTelemetryIsDelivered(clientIdDevice, getMqttAsyncClient(CLIENT_ID, null, null)); - testTelemetryIsDelivered(clientIdAndUserNameDevice1, getMqttAsyncClient(CLIENT_ID, USER_NAME1, null)); - testTelemetryIsDelivered(clientIdAndUserNameAndPasswordDevice2, getMqttAsyncClient(CLIENT_ID, USER_NAME2, PASSWORD)); + MqttTestClient mqttTestClient1 = new MqttTestClient(); + mqttTestClient1.connectAndWait(USER_NAME1); + + MqttTestClient mqttTestClient2 = new MqttTestClient(CLIENT_ID); + mqttTestClient2.connectAndWait(); + + MqttTestClient mqttTestClient3 = new MqttTestClient(CLIENT_ID); + mqttTestClient3.connectAndWait(USER_NAME1); + + MqttTestClient mqttTestClient4 = new MqttTestClient(CLIENT_ID); + mqttTestClient4.connectAndWait(USER_NAME2, PASSWORD); + + // Also correct. Random clientId and password, but matches access token + MqttTestClient mqttTestClient5 = new MqttTestClient(RandomStringUtils.randomAlphanumeric(10)); + mqttTestClient5.connectAndWait(USER_NAME2, RandomStringUtils.randomAlphanumeric(10)); + + testTelemetryIsDelivered(accessTokenDevice, mqttTestClient1); + testTelemetryIsDelivered(clientIdDevice, mqttTestClient2); + testTelemetryIsDelivered(clientIdAndUserNameDevice1, mqttTestClient3); + testTelemetryIsDelivered(clientIdAndUserNameAndPasswordDevice2, mqttTestClient4); // Also correct. Random clientId and password, but matches access token - testTelemetryIsDelivered(accessToken2Device, getMqttAsyncClient(RandomStringUtils.randomAlphanumeric(10), USER_NAME2, RandomStringUtils.randomAlphanumeric(10))); + testTelemetryIsDelivered(accessToken2Device, mqttTestClient5); } @Test(expected = MqttSecurityException.class) public void testCorrectClientIdAndUserNameButWrongPassword() throws Exception { // Not correct. Correct clientId and username, but wrong password - testTelemetryIsNotDelivered(clientIdAndUserNameAndPasswordDevice3, getMqttAsyncClient(CLIENT_ID, USER_NAME3, "WRONG PASSWORD")); + MqttTestClient mqttTestClient = new MqttTestClient(CLIENT_ID); + mqttTestClient.connectAndWait(USER_NAME3, "WRONG PASSWORD"); + testTelemetryIsNotDelivered(clientIdAndUserNameAndPasswordDevice3, mqttTestClient); } - private void testTelemetryIsDelivered(Device device, MqttAsyncClient client) throws Exception { + private void testTelemetryIsDelivered(Device device, MqttTestClient client) throws Exception { testTelemetryIsDelivered(device, client, true); } - private void testTelemetryIsNotDelivered(Device device, MqttAsyncClient client) throws Exception { + private void testTelemetryIsNotDelivered(Device device, MqttTestClient client) throws Exception { testTelemetryIsDelivered(device, client, false); } - private void testTelemetryIsDelivered(Device device, MqttAsyncClient client, boolean ok) throws Exception { + private void testTelemetryIsDelivered(Device device, MqttTestClient client, boolean ok) throws Exception { String randomKey = RandomStringUtils.randomAlphanumeric(10); List expectedKeys = Arrays.asList(randomKey); - publishMqttMsg(client, JacksonUtil.toString(JacksonUtil.newObjectNode().put(randomKey, true)).getBytes(), MqttTopics.DEVICE_TELEMETRY_TOPIC); + client.publishAndWait(MqttTopics.DEVICE_TELEMETRY_TOPIC, JacksonUtil.toString(JacksonUtil.newObjectNode().put(randomKey, true)).getBytes()); String deviceId = device.getId().getId().toString(); @@ -146,7 +165,7 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest { } else { assertNull(actualKeys); } - client.disconnect().waitForCompletion(); + client.disconnect(); } protected MqttAsyncClient getMqttAsyncClient(String clientId, String username, String password) throws MqttException {