Browse Source

added new MqttTestClient & MqttTestCallback & refactored credentials and attribute updates/request tests

pull/6537/head
ShvaykaD 4 years ago
parent
commit
ee05935a49
  1. 6
      application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java
  2. 60
      application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java
  3. 118
      application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java
  4. 654
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java
  5. 2
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestBackwardCompatibilityIntegrationTest.java
  6. 1
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestProtoIntegrationTest.java
  7. 1
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/MqttAttributesUpdatesBackwardCompatibilityIntegrationTest.java
  8. 41
      application/src/test/java/org/thingsboard/server/transport/mqtt/credentials/BasicMqttCredentialsTest.java

6
application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java

@ -79,8 +79,12 @@ public class TbTestWebSocketClient extends WebSocketClient {
}
public void registerWaitForUpdate() {
registerWaitForUpdate(1);
}
public void registerWaitForUpdate(int count) {
lastMsg = null;
update = new CountDownLatch(1);
update = new CountDownLatch(count);
}
@Override

60
application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestCallback.java

@ -0,0 +1,60 @@
package org.thingsboard.server.transport.mqtt;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Data
public class MqttTestCallback implements MqttCallback {
private final CountDownLatch subscribeLatch;
private final CountDownLatch deliveryLatch;
private int qoS;
private byte[] payloadBytes;
private String awaitSubTopic;
public MqttTestCallback(String awaitSubTopic) {
this.subscribeLatch = new CountDownLatch(1);
this.deliveryLatch = new CountDownLatch(1);
this.awaitSubTopic = awaitSubTopic;
}
public MqttTestCallback() {
this.subscribeLatch = new CountDownLatch(1);
this.deliveryLatch = new CountDownLatch(1);
}
@Override
public void connectionLost(Throwable throwable) {
log.warn("connectionLost: ", throwable);
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
if (awaitSubTopic == null) {
log.warn("messageArrived on topic: {}", requestTopic);
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
} else {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.warn("delivery complete: {}", iMqttDeliveryToken.getResponse());
deliveryLatch.countDown();
}
}

118
application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestClient.java

@ -0,0 +1,118 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.thingsboard.server.common.data.StringUtils;
import java.util.concurrent.TimeUnit;
public class MqttTestClient {
private static final String MQTT_URL = "tcp://localhost:1883";
private static final int TIMEOUT = 30; // seconds
private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(TIMEOUT);
private final MqttAsyncClient client;
public void setCallback(MqttTestCallback callback) {
client.setCallback(callback);
}
public MqttTestClient() throws MqttException {
this.client = createClient();
}
public MqttTestClient(String clientId) throws MqttException {
this.client = createClient(clientId);
}
public void connectAndWait(String userName, String password) throws MqttException {
IMqttToken connect = connect(userName, password);
connect.waitForCompletion(TIMEOUT_MS);
}
public void connectAndWait(String userName) throws MqttException {
connectAndWait(userName, null);
}
public void connectAndWait() throws MqttException {
connectAndWait(null, null);
}
private IMqttToken connect(String userName, String password) throws MqttException {
if (client == null) {
throw new RuntimeException("Failed to connect! MqttAsyncClient is not initialized!");
}
MqttConnectOptions options = new MqttConnectOptions();
if (StringUtils.isNotEmpty(userName)) {
options.setUserName(userName);
}
if (StringUtils.isNotEmpty(password)) {
options.setPassword(password.toCharArray());
}
return client.connect(options);
}
public void disconnectAndWait() throws MqttException {
disconnect().waitForCompletion(TIMEOUT_MS);
}
public IMqttToken disconnect() throws MqttException {
return client.disconnect();
}
public void disconnectForcibly() throws MqttException {
client.disconnectForcibly(TIMEOUT_MS);
}
public void publishAndWait(String topic, byte[] payload) throws MqttException {
publish(topic, payload).waitForCompletion(TIMEOUT_MS);
}
public IMqttDeliveryToken publish(String topic, byte[] payload) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(payload);
return client.publish(topic, message);
}
public void subscribeAndWait(String topic, MqttQoS qoS) throws MqttException {
subscribe(topic, qoS).waitForCompletion(TIMEOUT_MS);
}
public IMqttToken subscribe(String topic, MqttQoS qoS) throws MqttException {
return client.subscribe(topic, qoS.value());
}
private MqttAsyncClient createClient(String clientId) throws MqttException {
if (StringUtils.isEmpty(clientId)) {
clientId = MqttAsyncClient.generateClientId();
}
return new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence());
}
private MqttAsyncClient createClient() throws MqttException {
return createClient(null);
}
}

654
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java

@ -22,36 +22,47 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
import static org.thingsboard.server.common.data.query.EntityKeyType.CLIENT_ATTRIBUTE;
import static org.thingsboard.server.common.data.query.EntityKeyType.SHARED_ATTRIBUTE;
@TestPropertySource(properties = {
"js.evaluator=mock",
})
@Slf4j
public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqttIntegrationTest {
@ -60,11 +71,11 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
"package test;\n" +
"\n" +
"message PostAttributes {\n" +
" string attribute1 = 1;\n" +
" bool attribute2 = 2;\n" +
" double attribute3 = 3;\n" +
" int32 attribute4 = 4;\n" +
" JsonObject attribute5 = 5;\n" +
" string clientStr = 1;\n" +
" bool clientBool = 2;\n" +
" double clientDbl = 3;\n" +
" int32 clientLong = 4;\n" +
" JsonObject clientJson = 5;\n" +
"\n" +
" message JsonObject {\n" +
" int32 someNumber = 6;\n" +
@ -76,17 +87,20 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
" }\n" +
"}";
protected static final String POST_ATTRIBUTES_PAYLOAD = "{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73," +
"\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}";
private static final String CLIENT_ATTRIBUTES_PAYLOAD = "{\"clientStr\":\"value1\",\"clientBool\":true,\"clientDbl\":42.0,\"clientLong\":73," +
"\"clientJson\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}";
private static final String SHARED_ATTRIBUTES_PAYLOAD = "{\"sharedStr\":\"value1\",\"sharedBool\":true,\"sharedDbl\":42.0,\"sharedLong\":73," +
"\"sharedJson\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}";
private static final String RESPONSE_ATTRIBUTES_PAYLOAD_DELETED = "{\"deleted\":[\"attribute5\"]}";
private static final String SHARED_ATTRIBUTES_DELETED_RESPONSE = "{\"deleted\":[\"sharedJson\"]}";
protected List<TransportProtos.TsKvProto> getTsKvProtoList() {
TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto("attribute1", "value1", TransportProtos.KeyValueType.STRING_V);
TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto("attribute2", "true", TransportProtos.KeyValueType.BOOLEAN_V);
TransportProtos.TsKvProto tsKvProtoAttribute3 = getTsKvProto("attribute3", "42.0", TransportProtos.KeyValueType.DOUBLE_V);
TransportProtos.TsKvProto tsKvProtoAttribute4 = getTsKvProto("attribute4", "73", TransportProtos.KeyValueType.LONG_V);
TransportProtos.TsKvProto tsKvProtoAttribute5 = getTsKvProto("attribute5", "{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", TransportProtos.KeyValueType.JSON_V);
private List<TransportProtos.TsKvProto> getTsKvProtoList(String attributePrefix) {
TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto(attributePrefix + "Str", "value1", TransportProtos.KeyValueType.STRING_V);
TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto(attributePrefix + "Bool", "true", TransportProtos.KeyValueType.BOOLEAN_V);
TransportProtos.TsKvProto tsKvProtoAttribute3 = getTsKvProto(attributePrefix + "Dbl", "42.0", TransportProtos.KeyValueType.DOUBLE_V);
TransportProtos.TsKvProto tsKvProtoAttribute4 = getTsKvProto(attributePrefix + "Long", "73", TransportProtos.KeyValueType.LONG_V);
TransportProtos.TsKvProto tsKvProtoAttribute5 = getTsKvProto(attributePrefix + "Json", "{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", TransportProtos.KeyValueType.JSON_V);
List<TransportProtos.TsKvProto> tsKvProtoList = new ArrayList<>();
tsKvProtoList.add(tsKvProtoAttribute1);
tsKvProtoList.add(tsKvProtoAttribute2);
@ -103,118 +117,56 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
return tsKvProtoBuilder.build();
}
protected TestMqttCallback getTestMqttCallback() {
CountDownLatch latch = new CountDownLatch(1);
return new TestMqttCallback(latch);
}
protected static class TestMqttCallback implements MqttCallback {
private final CountDownLatch latch;
private Integer qoS;
private byte[] payloadBytes;
TestMqttCallback(CountDownLatch latch) {
this.latch = latch;
}
public int getQoS() {
return qoS;
}
public byte[] getPayloadBytes() {
return payloadBytes;
}
public CountDownLatch getLatch() {
return latch;
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
latch.countDown();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
// subscribe to attributes updates from server methods
protected void processJsonTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttCallback onUpdateCallback = getTestMqttCallback();
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback onUpdateCallback = new MqttTestCallback();
client.setCallback(onUpdateCallback);
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
Thread.sleep(1000);
validateUpdateAttributesJsonResponse(onUpdateCallback, SHARED_ATTRIBUTES_PAYLOAD);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
validateUpdateAttributesJsonResponse(onUpdateCallback);
TestMqttCallback onDeleteCallback = getTestMqttCallback();
MqttTestCallback onDeleteCallback = new MqttTestCallback();
client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class);
onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS);
validateDeleteAttributesJsonResponse(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
validateUpdateAttributesJsonResponse(onDeleteCallback, SHARED_ATTRIBUTES_DELETED_RESPONSE);
client.disconnect();
}
protected void processProtoTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttCallback onUpdateCallback = getTestMqttCallback();
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback onUpdateCallback = new MqttTestCallback();
client.setCallback(onUpdateCallback);
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(1000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
validateUpdateAttributesProtoResponse(onUpdateCallback);
TestMqttCallback onDeleteCallback = getTestMqttCallback();
MqttTestCallback onDeleteCallback = new MqttTestCallback();
client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class);
onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
validateDeleteAttributesProtoResponse(onDeleteCallback);
client.disconnect();
}
protected void validateUpdateAttributesJsonResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
assertNotNull(callback.getPayloadBytes());
String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
assertEquals(JacksonUtil.toJsonNode(POST_ATTRIBUTES_PAYLOAD), JacksonUtil.toJsonNode(response));
}
protected void validateDeleteAttributesJsonResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
protected void validateUpdateAttributesJsonResponse(MqttTestCallback callback, String expectedResponse) {
assertNotNull(callback.getPayloadBytes());
String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
assertEquals(JacksonUtil.toJsonNode(RESPONSE_ATTRIBUTES_PAYLOAD_DELETED), JacksonUtil.toJsonNode(response));
assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
protected void validateUpdateAttributesProtoResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
protected void validateUpdateAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException {
assertNotNull(callback.getPayloadBytes());
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared");
attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList);
TransportProtos.AttributeUpdateNotificationMsg expectedAttributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build();
@ -227,134 +179,99 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
assertTrue(actualSharedUpdatedList.containsAll(expectedSharedUpdatedList));
}
protected void validateDeleteAttributesProtoResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
protected void validateDeleteAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException {
assertNotNull(callback.getPayloadBytes());
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
attributeUpdateNotificationMsgBuilder.addSharedDeleted("attribute5");
attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson");
TransportProtos.AttributeUpdateNotificationMsg expectedAttributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build();
TransportProtos.AttributeUpdateNotificationMsg actualAttributeUpdateNotificationMsg = TransportProtos.AttributeUpdateNotificationMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedAttributeUpdateNotificationMsg.getSharedDeletedList().size(), actualAttributeUpdateNotificationMsg.getSharedDeletedList().size());
assertEquals("attribute5", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0));
assertEquals("sharedJson", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0));
}
protected void processJsonGatewayTestSubscribeToAttributesUpdates() throws Exception {
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
TestMqttCallback onUpdateCallback = getTestMqttCallback();
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttTestCallback onUpdateCallback = new MqttTestCallback();
client.setCallback(onUpdateCallback);
Device device = new Device();
device.setName("Gateway Device Subscribe to attribute updates");
device.setType("default");
byte[] connectPayloadBytes = getJsonConnectPayloadBytes();
String deviceName = "Gateway Device Subscribe to attribute updates";
byte[] connectPayloadBytes = getJsonConnectPayloadBytes(deviceName, deviceProfile.getTransportType().name());
publishMqttMsg(client, connectPayloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
client.publishAndWait(GATEWAY_CONNECT_TOPIC, connectPayloadBytes);
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class),
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(savedDevice);
client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
Thread.sleep(1000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
validateJsonGatewayUpdateAttributesResponse(onUpdateCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD);
validateJsonGatewayUpdateAttributesResponse(onUpdateCallback);
TestMqttCallback onDeleteCallback = getTestMqttCallback();
MqttTestCallback onDeleteCallback = new MqttTestCallback();
client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class);
onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS);
validateJsonGatewayDeleteAttributesResponse(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
validateJsonGatewayUpdateAttributesResponse(onDeleteCallback, deviceName, SHARED_ATTRIBUTES_DELETED_RESPONSE);
client.disconnect();
}
protected void processProtoGatewayTestSubscribeToAttributesUpdates() throws Exception {
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
TestMqttCallback onUpdateCallback = getTestMqttCallback();
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttTestCallback onUpdateCallback = new MqttTestCallback();
client.setCallback(onUpdateCallback);
Device device = new Device();
device.setName("Gateway Device Subscribe to attribute updates");
device.setType("default");
byte[] connectPayloadBytes = getProtoConnectPayloadBytes();
publishMqttMsg(client, connectPayloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class),
String deviceName = "Gateway Device Subscribe to attribute updates";
byte[] connectPayloadBytes = getProtoConnectPayloadBytes(deviceName, TransportPayloadType.PROTOBUF.name());
client.publishAndWait(GATEWAY_CONNECT_TOPIC, connectPayloadBytes);
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(savedDevice);
client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(1000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
validateProtoGatewayUpdateAttributesResponse(onUpdateCallback);
TestMqttCallback onDeleteCallback = getTestMqttCallback();
assertNotNull(device);
client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
validateProtoGatewayUpdateAttributesResponse(onUpdateCallback, deviceName);
MqttTestCallback onDeleteCallback = new MqttTestCallback();
client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class);
onDeleteCallback.getLatch().await(3, TimeUnit.SECONDS);
validateProtoGatewayDeleteAttributesResponse(onDeleteCallback);
}
protected void validateJsonGatewayUpdateAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
assertNotNull(callback.getPayloadBytes());
String s = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
assertEquals(getJsonResponseGatewayAttributesUpdatedPayload(), s);
doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
validateProtoGatewayDeleteAttributesResponse(onDeleteCallback, deviceName);
client.disconnect();
}
protected void validateJsonGatewayDeleteAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
protected void validateJsonGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName, String expectResultData) {
assertNotNull(callback.getPayloadBytes());
String s = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
assertEquals(s, getJsonResponseGatewayAttributesDeletedPayload());
assertEquals(JacksonUtil.toJsonNode(getGatewayAttributesResponseJson(deviceName, expectResultData)), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
protected byte[] getJsonConnectPayloadBytes() {
String connectPayload = "{\"device\": \"Gateway Device Subscribe to attribute updates\", \"type\": \"" + TransportPayloadType.JSON.name() + "\"}";
protected byte[] getJsonConnectPayloadBytes(String deviceName, String deviceType) {
String connectPayload = "{\"device\":\"" + deviceName + "\", \"type\": \"" + deviceType + "\"}";
return connectPayload.getBytes();
}
private static String getJsonResponseGatewayAttributesUpdatedPayload() {
return "{\"device\":\"" + "Gateway Device Subscribe to attribute updates" + "\"," +
"\"data\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";
}
private static String getJsonResponseGatewayAttributesDeletedPayload() {
return "{\"device\":\"" + "Gateway Device Subscribe to attribute updates" + "\",\"data\":{\"deleted\":[\"attribute5\"]}}";
private static String getGatewayAttributesResponseJson(String deviceName, String expectResultData) {
return "{\"device\":\"" + deviceName + "\"," + "\"data\":" + expectResultData + "}";
}
protected void validateProtoGatewayUpdateAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
protected void validateProtoGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertNotNull(callback.getPayloadBytes());
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared");
attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList);
TransportProtos.AttributeUpdateNotificationMsg expectedAttributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build();
TransportApiProtos.GatewayAttributeUpdateNotificationMsg.Builder gatewayAttributeUpdateNotificationMsgBuilder = TransportApiProtos.GatewayAttributeUpdateNotificationMsg.newBuilder();
gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName("Gateway Device Subscribe to attribute updates");
gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName(deviceName);
gatewayAttributeUpdateNotificationMsgBuilder.setNotificationMsg(expectedAttributeUpdateNotificationMsg);
TransportApiProtos.GatewayAttributeUpdateNotificationMsg expectedGatewayAttributeUpdateNotificationMsg = gatewayAttributeUpdateNotificationMsgBuilder.build();
@ -367,17 +284,17 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
assertEquals(expectedSharedUpdatedList.size(), actualSharedUpdatedList.size());
assertTrue(actualSharedUpdatedList.containsAll(expectedSharedUpdatedList));
}
protected void validateProtoGatewayDeleteAttributesResponse(TestMqttCallback callback) throws InvalidProtocolBufferException {
protected void validateProtoGatewayDeleteAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertNotNull(callback.getPayloadBytes());
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
attributeUpdateNotificationMsgBuilder.addSharedDeleted("attribute5");
attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson");
TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build();
TransportApiProtos.GatewayAttributeUpdateNotificationMsg.Builder gatewayAttributeUpdateNotificationMsgBuilder = TransportApiProtos.GatewayAttributeUpdateNotificationMsg.newBuilder();
gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName("Gateway Device Subscribe to attribute updates");
gatewayAttributeUpdateNotificationMsgBuilder.setDeviceName(deviceName);
gatewayAttributeUpdateNotificationMsgBuilder.setNotificationMsg(attributeUpdateNotificationMsg);
TransportApiProtos.GatewayAttributeUpdateNotificationMsg expectedGatewayAttributeUpdateNotificationMsg = gatewayAttributeUpdateNotificationMsgBuilder.build();
@ -389,118 +306,187 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
TransportProtos.AttributeUpdateNotificationMsg actualAttributeUpdateNotificationMsg = actualGatewayAttributeUpdateNotificationMsg.getNotificationMsg();
assertEquals(expectedAttributeUpdateNotificationMsg.getSharedDeletedList().size(), actualAttributeUpdateNotificationMsg.getSharedDeletedList().size());
assertEquals("attribute5", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0));
}
protected byte[] getProtoConnectPayloadBytes() {
TransportApiProtos.ConnectMsg connectProto = getConnectProto();
return connectProto.toByteArray();
assertEquals("sharedJson", actualAttributeUpdateNotificationMsg.getSharedDeletedList().get(0));
}
private TransportApiProtos.ConnectMsg getConnectProto() {
TransportApiProtos.ConnectMsg.Builder builder = TransportApiProtos.ConnectMsg.newBuilder();
builder.setDeviceName("Gateway Device Subscribe to attribute updates");
builder.setDeviceType(TransportPayloadType.PROTOBUF.name());
return builder.build();
private byte[] getProtoConnectPayloadBytes(String deviceName, String deviceType) {
TransportApiProtos.ConnectMsg connectMsg = TransportApiProtos.ConnectMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.build();
return connectMsg.toByteArray();
}
// request attributes from server methods
protected void processJsonTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
postJsonAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic);
Thread.sleep(5000);
TestMqttCallback callback = getTestMqttCallback();
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
DeviceTypeFilter dtf = new DeviceTypeFilter(savedDevice.getType(), savedDevice.getName());
String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson";
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> clientKeysList = List.of(clientKeysStr.split(","));
List<String> sharedKeysList = List.of(sharedKeysStr.split(","));
List<EntityKey> csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE);
List<EntityKey> shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE);
List<EntityKey> keys = new ArrayList<>();
keys.addAll(csKeys);
keys.addAll(shKeys);
getWsClient().subscribeLatestUpdate(keys, dtf);
getWsClient().registerWaitForUpdate(2);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE",
SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
client.publishAndWait(attrPubTopic, CLIENT_ATTRIBUTES_PAYLOAD.getBytes());
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
String update = getWsClient().waitForUpdate();
assertThat(update).as("ws update received").isNotBlank();
MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1"));
client.setCallback(callback);
validateJsonResponse(client, callback.getLatch(), callback, attrReqTopicPrefix);
String payloadStr = "{\"clientKeys\":\"" + clientKeysStr + "\", \"sharedKeys\":\"" + sharedKeysStr + "\"}";
client.publishAndWait(attrReqTopicPrefix + "1", payloadStr.getBytes());
String expectedResponse = "{\"client\":" + CLIENT_ATTRIBUTES_PAYLOAD + ",\"shared\":" + SHARED_ATTRIBUTES_PAYLOAD + "}";
validateJsonResponse(callback, expectedResponse);
client.disconnect();
}
protected void processProtoTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
postProtoAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic);
Thread.sleep(5000);
TestMqttCallback callback = getTestMqttCallback();
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
DeviceTypeFilter dtf = new DeviceTypeFilter(savedDevice.getType(), savedDevice.getName());
String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson";
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> clientKeysList = List.of(clientKeysStr.split(","));
List<String> sharedKeysList = List.of(sharedKeysStr.split(","));
List<EntityKey> csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE);
List<EntityKey> shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE);
List<EntityKey> keys = new ArrayList<>();
keys.addAll(csKeys);
keys.addAll(shKeys);
getWsClient().subscribeLatestUpdate(keys, dtf);
getWsClient().registerWaitForUpdate(2);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
client.publishAndWait(attrPubTopic, getAttributesProtoPayloadBytes());
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
String update = getWsClient().waitForUpdate();
assertThat(update).as("ws update received").isNotBlank();
MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1"));
client.setCallback(callback);
validateProtoResponse(client, callback.getLatch(), callback, attrReqTopicPrefix);
TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder();
attributesRequestBuilder.setClientKeys(clientKeysStr);
attributesRequestBuilder.setSharedKeys(sharedKeysStr);
TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build();
client.publishAndWait(attrReqTopicPrefix + "1", attributesRequest.toByteArray());
validateProtoResponse(callback, getExpectedAttributeResponseMsg());
client.disconnect();
}
protected void processJsonTestGatewayRequestAttributesValuesFromTheServer() throws Exception {
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
String deviceName = "Gateway Device Request Attributes";
String postClientAttributes = "{\"" + deviceName + "\":" + CLIENT_ATTRIBUTES_PAYLOAD + "}";
client.publishAndWait(GATEWAY_ATTRIBUTES_TOPIC, postClientAttributes.getBytes());
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
postJsonGatewayDeviceClientAttributes(client);
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class),
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(savedDevice);
Thread.sleep(2000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
Thread.sleep(5000);
client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
TestMqttCallback clientAttributesCallback = getTestMqttCallback();
assertNotNull(device);
DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName());
String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson";
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> clientKeysList = List.of(clientKeysStr.split(","));
List<String> sharedKeysList = List.of(sharedKeysStr.split(","));
List<EntityKey> csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE);
List<EntityKey> shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE);
List<EntityKey> keys = new ArrayList<>();
keys.addAll(csKeys);
keys.addAll(shKeys);
EntityDataUpdate initUpdate = getWsClient().subscribeLatestUpdate(keys, dtf);
assertNotNull(initUpdate);
PageData<EntityData> data = initUpdate.getData();
assertNotNull(data);
assertFalse(data.getData().isEmpty());
getWsClient().registerWaitForUpdate();
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
String update = getWsClient().waitForUpdate();
assertThat(update).as("ws update received").isNotBlank();
client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE);
MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
client.setCallback(clientAttributesCallback);
validateJsonClientResponseGateway(client, clientAttributesCallback);
String csKeysStr = "[\"clientStr\", \"clientBool\", \"clientDbl\", \"clientLong\", \"clientJson\"]";
String csRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": true, \"keys\": " + csKeysStr + "}";
client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, csRequestPayloadStr.getBytes());
validateJsonResponseGateway(clientAttributesCallback, deviceName, CLIENT_ATTRIBUTES_PAYLOAD);
TestMqttCallback sharedAttributesCallback = getTestMqttCallback();
MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
client.setCallback(sharedAttributesCallback);
validateJsonSharedResponseGateway(client, sharedAttributesCallback);
String shKeysStr = "[\"sharedStr\", \"sharedBool\", \"sharedDbl\", \"sharedLong\", \"sharedJson\"]";
String shRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": false, \"keys\": " + shKeysStr + "}";
client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, shRequestPayloadStr.getBytes());
validateJsonResponseGateway(sharedAttributesCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD);
client.disconnect();
}
protected void processProtoTestGatewayRequestAttributesValuesFromTheServer() throws Exception {
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
String deviceName = "Gateway Device Request Attributes";
String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson";
List<String> clientKeysList = List.of(clientKeysStr.split(","));
client.publishAndWait(GATEWAY_ATTRIBUTES_TOPIC, getProtoGatewayDeviceClientAttributesPayload(deviceName, clientKeysList));
postProtoGatewayDeviceClientAttributes(client);
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class),
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(savedDevice);
Thread.sleep(2000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
Thread.sleep(5000);
client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
TestMqttCallback clientAttributesCallback = getTestMqttCallback();
assertNotNull(device);
DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName());
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> sharedKeysList = List.of(sharedKeysStr.split(","));
List<EntityKey> csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE);
List<EntityKey> shKeys = getEntityKeys(sharedKeysList, SHARED_ATTRIBUTE);
List<EntityKey> keys = new ArrayList<>();
keys.addAll(csKeys);
keys.addAll(shKeys);
EntityDataUpdate initUpdate = getWsClient().subscribeLatestUpdate(keys, dtf);
assertNotNull(initUpdate);
PageData<EntityData> data = initUpdate.getData();
assertNotNull(data);
assertFalse(data.getData().isEmpty());
getWsClient().registerWaitForUpdate();
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
String update = getWsClient().waitForUpdate();
assertThat(update).as("ws update received").isNotBlank();
client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE);
MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
client.setCallback(clientAttributesCallback);
validateProtoClientResponseGateway(client, clientAttributesCallback);
TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, clientKeysList, true);
client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray());
validateProtoClientResponseGateway(clientAttributesCallback, deviceName);
TestMqttCallback sharedAttributesCallback = getTestMqttCallback();
MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
client.setCallback(sharedAttributesCallback);
validateProtoSharedResponseGateway(client, sharedAttributesCallback);
gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, sharedKeysList, false);
client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray());
validateProtoSharedResponseGateway(sharedAttributesCallback, deviceName);
client.disconnect();
}
protected void postJsonAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception {
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
client.publish(attrPubTopic, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
private List<EntityKey> getEntityKeys(List<String> keys, EntityKeyType scope) {
return keys.stream().map(key -> new EntityKey(scope, key)).collect(Collectors.toList());
}
protected void postProtoAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception {
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", AbstractMqttAttributesIntegrationTest.POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
private byte[] getAttributesProtoPayloadBytes() {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
@ -530,64 +516,39 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("attribute1"), "value1")
.setField(postAttributesMsgDescriptor.findFieldByName("attribute2"), true)
.setField(postAttributesMsgDescriptor.findFieldByName("attribute3"), 42.0)
.setField(postAttributesMsgDescriptor.findFieldByName("attribute4"), 73)
.setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject)
.setField(postAttributesMsgDescriptor.findFieldByName("clientStr"), "value1")
.setField(postAttributesMsgDescriptor.findFieldByName("clientBool"), true)
.setField(postAttributesMsgDescriptor.findFieldByName("clientDbl"), 42.0)
.setField(postAttributesMsgDescriptor.findFieldByName("clientLong"), 73)
.setField(postAttributesMsgDescriptor.findFieldByName("clientJson"), jsonObject)
.build();
byte[] payload = postAttributesMsg.toByteArray();
client.publish(attrPubTopic, new MqttMessage(payload));
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
return postAttributesMsg.toByteArray();
}
protected void postJsonGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception {
String postClientAttributes = "{\"" + "Gateway Device Request Attributes" + "\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(postClientAttributes.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
}
protected void postProtoGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
List<String> expectedKeys = Arrays.asList(keys.split(","));
TransportProtos.PostAttributeMsg postAttributeMsg = getPostAttributeMsg(expectedKeys);
protected byte[] getProtoGatewayDeviceClientAttributesPayload(String deviceName, List<String> clientKeysList) {
TransportProtos.PostAttributeMsg postAttributeMsg = getPostAttributeMsg(clientKeysList);
TransportApiProtos.AttributesMsg.Builder attributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder();
attributesMsgBuilder.setDeviceName("Gateway Device Request Attributes");
attributesMsgBuilder.setDeviceName(deviceName);
attributesMsgBuilder.setMsg(postAttributeMsg);
TransportApiProtos.AttributesMsg attributesMsg = attributesMsgBuilder.build();
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributeMsgBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
gatewayAttributeMsgBuilder.addMsg(attributesMsg);
byte[] bytes = gatewayAttributeMsgBuilder.build().toByteArray();
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(bytes));
return gatewayAttributeMsgBuilder.build().toByteArray();
}
protected void validateJsonResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopicPrefix) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
String payloadStr = "{\"clientKeys\":\"" + keys + "\", \"sharedKeys\":\"" + keys + "\"}";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payloadStr.getBytes());
client.publish(attrReqTopicPrefix + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
latch.await(1, TimeUnit.MINUTES);
protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
String expectedRequestPayload = "{\"client\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}},\"shared\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.toJsonNode(new String(callback.getPayloadBytes(), StandardCharsets.UTF_8)));
assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
protected void validateProtoResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopic) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder();
attributesRequestBuilder.setClientKeys(keys);
attributesRequestBuilder.setSharedKeys(keys);
TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(attributesRequest.toByteArray());
client.publish(attrReqTopic + "1", mqttMessage);
latch.await(3, TimeUnit.SECONDS);
protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
TransportProtos.GetAttributeResponseMsg expectedAttributesResponse = getExpectedAttributeResponseMsg();
TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedAttributesResponse.getRequestId(), actualAttributesResponse.getRequestId());
List<TransportProtos.KeyValueProto> expectedClientKeyValueProtos = expectedAttributesResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
List<TransportProtos.KeyValueProto> expectedSharedKeyValueProtos = expectedAttributesResponse.getSharedAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId());
List<TransportProtos.KeyValueProto> expectedClientKeyValueProtos = expectedResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
List<TransportProtos.KeyValueProto> expectedSharedKeyValueProtos = expectedResponse.getSharedAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
List<TransportProtos.KeyValueProto> actualClientKeyValueProtos = actualAttributesResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
List<TransportProtos.KeyValueProto> actualSharedKeyValueProtos = actualAttributesResponse.getSharedAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
assertTrue(actualClientKeyValueProtos.containsAll(expectedClientKeyValueProtos));
@ -596,42 +557,25 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
private TransportProtos.GetAttributeResponseMsg getExpectedAttributeResponseMsg() {
TransportProtos.GetAttributeResponseMsg.Builder result = TransportProtos.GetAttributeResponseMsg.newBuilder();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList();
result.addAllClientAttributeList(tsKvProtoList);
result.addAllSharedAttributeList(tsKvProtoList);
List<TransportProtos.TsKvProto> csTsKvProtoList = getTsKvProtoList("client");
List<TransportProtos.TsKvProto> shTsKvProtoList = getTsKvProtoList("shared");
result.addAllClientAttributeList(csTsKvProtoList);
result.addAllSharedAttributeList(shTsKvProtoList);
result.setRequestId(1);
return result.build();
}
protected void validateJsonClientResponseGateway(MqttAsyncClient client, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String payloadStr = "{\"id\": 1, \"device\": \"" + "Gateway Device Request Attributes" + "\", \"client\": true, \"keys\": [\"attribute1\", \"attribute2\", \"attribute3\", \"attribute4\", \"attribute5\"]}";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payloadStr.getBytes());
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
callback.getLatch().await(1, TimeUnit.MINUTES);
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
String expectedRequestPayload = "{\"id\":1,\"device\":\"" + "Gateway Device Request Attributes" + "\",\"values\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.toJsonNode(new String(callback.getPayloadBytes(), StandardCharsets.UTF_8)));
}
protected void validateJsonSharedResponseGateway(MqttAsyncClient client, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String payloadStr = "{\"id\": 1, \"device\": \"" + "Gateway Device Request Attributes" + "\", \"client\": false, \"keys\": [\"attribute1\", \"attribute2\", \"attribute3\", \"attribute4\", \"attribute5\"]}";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payloadStr.getBytes());
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
callback.getLatch().await(1, TimeUnit.MINUTES);
protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
String expectedRequestPayload = "{\"id\":1,\"device\":\"" + "Gateway Device Request Attributes" + "\",\"values\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.toJsonNode(new String(callback.getPayloadBytes(), StandardCharsets.UTF_8)));
String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}";
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
protected void validateProtoClientResponseGateway(MqttAsyncClient client, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(keys, true);
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, new MqttMessage(gatewayAttributesRequestMsg.toByteArray()));
callback.getLatch().await(3, TimeUnit.SECONDS);
protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(true);
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true);
TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName());
@ -644,13 +588,10 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
assertTrue(actualClientKeyValueProtos.containsAll(expectedClientKeyValueProtos));
}
protected void validateProtoSharedResponseGateway(MqttAsyncClient client, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(keys, false);
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC, new MqttMessage(gatewayAttributesRequestMsg.toByteArray()));
callback.getLatch().await(3, TimeUnit.SECONDS);
protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(false);
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false);
TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName());
@ -664,27 +605,26 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
assertTrue(actualSharedKeyValueProtos.containsAll(expectedSharedKeyValueProtos));
}
private TransportApiProtos.GatewayAttributeResponseMsg getExpectedGatewayAttributeResponseMsg(boolean client) {
private TransportApiProtos.GatewayAttributeResponseMsg getExpectedGatewayAttributeResponseMsg(String deviceName, boolean client) {
TransportApiProtos.GatewayAttributeResponseMsg.Builder gatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.newBuilder();
TransportProtos.GetAttributeResponseMsg.Builder getAttributeResponseMsgBuilder = TransportProtos.GetAttributeResponseMsg.newBuilder();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList();
if (client) {
getAttributeResponseMsgBuilder.addAllClientAttributeList(tsKvProtoList);
getAttributeResponseMsgBuilder.addAllClientAttributeList(getTsKvProtoList("client"));
} else {
getAttributeResponseMsgBuilder.addAllSharedAttributeList(tsKvProtoList);
getAttributeResponseMsgBuilder.addAllSharedAttributeList(getTsKvProtoList("shared"));
}
getAttributeResponseMsgBuilder.setRequestId(1);
TransportProtos.GetAttributeResponseMsg getAttributeResponseMsg = getAttributeResponseMsgBuilder.build();
gatewayAttributeResponseMsg.setDeviceName("Gateway Device Request Attributes");
gatewayAttributeResponseMsg.setDeviceName(deviceName);
gatewayAttributeResponseMsg.setResponseMsg(getAttributeResponseMsg);
return gatewayAttributeResponseMsg.build();
}
private TransportApiProtos.GatewayAttributesRequestMsg getGatewayAttributesRequestMsg(String keys, boolean client) {
private TransportApiProtos.GatewayAttributesRequestMsg getGatewayAttributesRequestMsg(String deviceName, List<String> keysList, boolean client) {
return TransportApiProtos.GatewayAttributesRequestMsg.newBuilder()
.setDeviceName(deviceName)
.addAllKeys(keysList)
.setClient(client)
.addAllKeys(Arrays.asList(keys.split(",")))
.setDeviceName("Gateway Device Request Attributes")
.setId(1).build();
}
}

2
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestBackwardCompatibilityIntegrationTest.java

@ -85,7 +85,6 @@ public class MqttAttributesRequestBackwardCompatibilityIntegrationTest extends A
@Test
public void testRequestAttributesValuesFromTheServerGatewayWithEnabledJsonCompatibilityAndJsonDownlinks() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Request attribute values from the server proto")
.gatewayName("Gateway Test Request attribute values from the server proto")
.transportPayloadType(TransportPayloadType.PROTOBUF)
.enableCompatibilityWithJsonPayloadFormat(true)
@ -99,7 +98,6 @@ public class MqttAttributesRequestBackwardCompatibilityIntegrationTest extends A
public void testRequestAttributesValuesFromTheServerOnShortJsonTopicWithEnabledJsonCompatibilityAndJsonDownlinks() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Request attribute values from the server proto")
.gatewayName("Gateway Test Request attribute values from the server proto")
.transportPayloadType(TransportPayloadType.PROTOBUF)
.enableCompatibilityWithJsonPayloadFormat(true)
.useJsonPayloadFormatForDefaultDownlinkTopics(true)

1
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/MqttAttributesRequestProtoIntegrationTest.java

@ -67,7 +67,6 @@ public class MqttAttributesRequestProtoIntegrationTest extends AbstractMqttAttri
@Test
public void testRequestAttributesValuesFromTheServerGateway() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Request attribute values from the server proto")
.gatewayName("Gateway Test Request attribute values from the server proto")
.transportPayloadType(TransportPayloadType.PROTOBUF)
.build();

1
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/MqttAttributesUpdatesBackwardCompatibilityIntegrationTest.java

@ -90,7 +90,6 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A
@Test
public void testProtoSubscribeToAttributesUpdatesFromTheServerGatewayWithEnabledJsonCompatibilityAndJsonDownlinks() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Subscribe to attribute updates")
.gatewayName("Gateway Test Subscribe to attribute updates")
.transportPayloadType(TransportPayloadType.PROTOBUF)
.enableCompatibilityWithJsonPayloadFormat(true)

41
application/src/test/java/org/thingsboard/server/transport/mqtt/credentials/BasicMqttCredentialsTest.java

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import java.util.Arrays;
import java.util.HashSet;
@ -93,33 +94,51 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
@Test
public void testCorrectCredentials() throws Exception {
// Check that correct devices receive telemetry
testTelemetryIsDelivered(accessTokenDevice, getMqttAsyncClient(null, USER_NAME1, null));
testTelemetryIsDelivered(clientIdDevice, getMqttAsyncClient(CLIENT_ID, null, null));
testTelemetryIsDelivered(clientIdAndUserNameDevice1, getMqttAsyncClient(CLIENT_ID, USER_NAME1, null));
testTelemetryIsDelivered(clientIdAndUserNameAndPasswordDevice2, getMqttAsyncClient(CLIENT_ID, USER_NAME2, PASSWORD));
MqttTestClient mqttTestClient1 = new MqttTestClient();
mqttTestClient1.connectAndWait(USER_NAME1);
MqttTestClient mqttTestClient2 = new MqttTestClient(CLIENT_ID);
mqttTestClient2.connectAndWait();
MqttTestClient mqttTestClient3 = new MqttTestClient(CLIENT_ID);
mqttTestClient3.connectAndWait(USER_NAME1);
MqttTestClient mqttTestClient4 = new MqttTestClient(CLIENT_ID);
mqttTestClient4.connectAndWait(USER_NAME2, PASSWORD);
// Also correct. Random clientId and password, but matches access token
MqttTestClient mqttTestClient5 = new MqttTestClient(RandomStringUtils.randomAlphanumeric(10));
mqttTestClient5.connectAndWait(USER_NAME2, RandomStringUtils.randomAlphanumeric(10));
testTelemetryIsDelivered(accessTokenDevice, mqttTestClient1);
testTelemetryIsDelivered(clientIdDevice, mqttTestClient2);
testTelemetryIsDelivered(clientIdAndUserNameDevice1, mqttTestClient3);
testTelemetryIsDelivered(clientIdAndUserNameAndPasswordDevice2, mqttTestClient4);
// Also correct. Random clientId and password, but matches access token
testTelemetryIsDelivered(accessToken2Device, getMqttAsyncClient(RandomStringUtils.randomAlphanumeric(10), USER_NAME2, RandomStringUtils.randomAlphanumeric(10)));
testTelemetryIsDelivered(accessToken2Device, mqttTestClient5);
}
@Test(expected = MqttSecurityException.class)
public void testCorrectClientIdAndUserNameButWrongPassword() throws Exception {
// Not correct. Correct clientId and username, but wrong password
testTelemetryIsNotDelivered(clientIdAndUserNameAndPasswordDevice3, getMqttAsyncClient(CLIENT_ID, USER_NAME3, "WRONG PASSWORD"));
MqttTestClient mqttTestClient = new MqttTestClient(CLIENT_ID);
mqttTestClient.connectAndWait(USER_NAME3, "WRONG PASSWORD");
testTelemetryIsNotDelivered(clientIdAndUserNameAndPasswordDevice3, mqttTestClient);
}
private void testTelemetryIsDelivered(Device device, MqttAsyncClient client) throws Exception {
private void testTelemetryIsDelivered(Device device, MqttTestClient client) throws Exception {
testTelemetryIsDelivered(device, client, true);
}
private void testTelemetryIsNotDelivered(Device device, MqttAsyncClient client) throws Exception {
private void testTelemetryIsNotDelivered(Device device, MqttTestClient client) throws Exception {
testTelemetryIsDelivered(device, client, false);
}
private void testTelemetryIsDelivered(Device device, MqttAsyncClient client, boolean ok) throws Exception {
private void testTelemetryIsDelivered(Device device, MqttTestClient client, boolean ok) throws Exception {
String randomKey = RandomStringUtils.randomAlphanumeric(10);
List<String> expectedKeys = Arrays.asList(randomKey);
publishMqttMsg(client, JacksonUtil.toString(JacksonUtil.newObjectNode().put(randomKey, true)).getBytes(), MqttTopics.DEVICE_TELEMETRY_TOPIC);
client.publishAndWait(MqttTopics.DEVICE_TELEMETRY_TOPIC, JacksonUtil.toString(JacksonUtil.newObjectNode().put(randomKey, true)).getBytes());
String deviceId = device.getId().getId().toString();
@ -146,7 +165,7 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
} else {
assertNull(actualKeys);
}
client.disconnect().waitForCompletion();
client.disconnect();
}
protected MqttAsyncClient getMqttAsyncClient(String clientId, String username, String password) throws MqttException {

Loading…
Cancel
Save