Browse Source

[3.3.2] MQTT short topics support (#4967)

* added mqtt short topics support

* remove volatile keyword from topic types parameters

* added new tests for mqtt short topics

* fix compilation error after merge

* improvements/typo-fixes after pull request review
pull/5345/head
ShvaykaD 5 years ago
committed by GitHub
parent
commit
845d8247dd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java
  2. 13
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java
  3. 29
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java
  4. 16
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java
  5. 12
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java
  6. 16
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java
  7. 25
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java
  8. 17
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
  9. 25
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java
  10. 30
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java
  11. 12
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java
  12. 17
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java
  13. 91
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java
  14. 12
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java
  15. 26
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java
  16. 126
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java
  17. 55
      common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java
  18. 16
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java
  19. 252
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  20. 56
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
  21. 14
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
  22. 67
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java
  23. 14
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java

28
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java

@ -52,7 +52,17 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
@Test
public void testRequestAttributesValuesFromTheServer() throws Exception {
processTestRequestAttributesValuesFromTheServer();
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception {
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception {
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);
}
@Test
@ -60,18 +70,18 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
processTestGatewayRequestAttributesValuesFromTheServer();
}
protected void processTestRequestAttributesValuesFromTheServer() throws Exception {
protected void processTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
postAttributesAndSubscribeToTopic(savedDevice, client);
postAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic);
Thread.sleep(5000);
TestMqttCallback callback = getTestMqttCallback();
client.setCallback(callback);
validateResponse(client, callback.getLatch(), callback);
validateResponse(client, callback.getLatch(), callback, attrReqTopicPrefix);
}
protected void processTestGatewayRequestAttributesValuesFromTheServer() throws Exception {
@ -103,10 +113,10 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
validateSharedResponseGateway(client, sharedAttributesCallback);
}
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client) throws Exception {
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception {
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
client.publish(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.publish(attrPubTopic, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
}
protected void postGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception {
@ -114,12 +124,12 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(postClientAttributes.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
}
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopicPrefix) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
String payloadStr = "{\"clientKeys\":\"" + keys + "\", \"sharedKeys\":\"" + keys + "\"}";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payloadStr.getBytes());
client.publish(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.publish(attrReqTopicPrefix + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
latch.await(1, TimeUnit.MINUTES);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
String expectedRequestPayload = "{\"client\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}},\"shared\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";

13
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java

@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
@Slf4j
public abstract class AbstractMqttAttributesRequestJsonIntegrationTest extends AbstractMqttAttributesRequestIntegrationTest {
@ -36,7 +37,17 @@ public abstract class AbstractMqttAttributesRequestJsonIntegrationTest extends A
@Test
public void testRequestAttributesValuesFromTheServer() throws Exception {
processTestRequestAttributesValuesFromTheServer();
super.testRequestAttributesValuesFromTheServer();
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception {
super.testRequestAttributesValuesFromTheServerOnShortTopic();
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception {
super.testRequestAttributesValuesFromTheServerOnShortJsonTopic();
}
@Test

29
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java

@ -84,7 +84,21 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
public void testRequestAttributesValuesFromTheServer() throws Exception {
super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
processTestRequestAttributesValuesFromTheServer();
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception {
super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortProtoTopic() throws Exception {
super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);
}
@Test
@ -93,7 +107,10 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
processTestGatewayRequestAttributesValuesFromTheServer();
}
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client) throws Exception {
@Test
public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception { }
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception {
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", AbstractMqttAttributesIntegrationTest.POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
@ -131,8 +148,8 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
.setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject)
.build();
byte[] payload = postAttributesMsg.toByteArray();
client.publish(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, new MqttMessage(payload));
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.publish(attrPubTopic, new MqttMessage(payload));
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
}
protected void postGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception {
@ -149,7 +166,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(bytes));
}
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopic) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder();
attributesRequestBuilder.setClientKeys(keys);
@ -157,7 +174,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(attributesRequest.toByteArray());
client.publish(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX + "1", mqttMessage);
client.publish(attrReqTopic + "1", mqttMessage);
latch.await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
TransportProtos.GetAttributeResponseMsg expectedAttributesResponse = getExpectedAttributeResponseMsg();

16
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java

@ -61,7 +61,17 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC);
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
}
@Test
@ -69,14 +79,14 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
processGatewayTestSubscribeToAttributesUpdates();
}
protected void processTestSubscribeToAttributesUpdates() throws Exception {
protected void processTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttCallback onUpdateCallback = getTestMqttCallback();
client.setCallback(onUpdateCallback);
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(1000);

12
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java

@ -36,7 +36,17 @@ public abstract class AbstractMqttAttributesUpdatesJsonIntegrationTest extends A
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
super.testSubscribeToAttributesUpdatesFromTheServer();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
super.testSubscribeToAttributesUpdatesFromTheServerOnShortTopic();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
super.testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic();
}
@Test

16
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -46,7 +47,20 @@ public abstract class AbstractMqttAttributesUpdatesProtoIntegrationTest extends
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
super.testSubscribeToAttributesUpdatesFromTheServer();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
super.testSubscribeToAttributesUpdatesFromTheServerOnShortTopic();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortProtoTopic() throws Exception {
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC);
}
@Test

25
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.service.security.AccessValidator;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -81,12 +82,32 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab
@Test
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test

17
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java

@ -60,14 +60,14 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
asyncContextTimeoutToUseRpcPlugin = 10000L;
}
protected void processOneWayRpcTest() throws Exception {
protected void processOneWayRpcTest(String rpcSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
client.setCallback(callback);
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.subscribe(rpcSubTopic, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(1000);
@ -86,9 +86,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes);
}
protected void processTwoWayRpcTest() throws Exception {
protected void processTwoWayRpcTest(String rpcSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
client.subscribe(rpcSubTopic, 1);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
@ -199,7 +199,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException {
MqttMessage message = new MqttMessage();
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) {
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) {
message.setPayload(DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8));
} else {
JsonNode requestMsgNode = JacksonUtil.toJsonNode(new String(mqttMessage.getPayload(), StandardCharset.UTF_8));
@ -232,7 +232,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
String responseTopic = requestTopic.replace("request", "response");
String responseTopic;
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) {
responseTopic = requestTopic.replace("req", "res");
} else {
responseTopic = requestTopic.replace("request", "response");
}
qoS = mqttMessage.getQos();
client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage));
latch.countDown();

25
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
@Slf4j
public abstract class AbstractMqttServerSideRpcJsonIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
@ -37,12 +38,32 @@ public abstract class AbstractMqttServerSideRpcJsonIntegrationTest extends Abstr
@Test
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test

30
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java

@ -78,12 +78,32 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
@Test
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortProtoTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC);
}
@Test
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortProtoTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC);
}
@Test
@ -118,9 +138,9 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
return builder.build();
}
protected void processTwoWayRpcTest() throws Exception {
protected void processTwoWayRpcTest(String rpcSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
client.subscribe(rpcSubTopic, 1);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
@ -139,7 +159,7 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException {
MqttMessage message = new MqttMessage();
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) {
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) {
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration();
ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA);
DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA);

12
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java

@ -60,6 +60,18 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

17
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java

@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import java.util.Arrays;
import java.util.List;
@ -45,12 +46,18 @@ public abstract class AbstractMqttAttributesJsonIntegrationTest extends Abstract
processJsonPayloadAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortTopic() throws Exception {
super.testPushAttributesOnShortTopic();
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
super.testPushAttributesOnShortJsonTopic();
}
@Test
public void testPushAttributesGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
String payload = getGatewayAttributesJsonPayload(deviceName1, deviceName2);
processGatewayAttributesTest(expectedKeys, payload.getBytes(), deviceName1, deviceName2);
super.testPushAttributesGateway();
}
}

91
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java

@ -25,6 +25,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.gen.transport.TransportApiProtos;
@ -49,14 +50,14 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
@Test
public void testPushAttributes() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DynamicSchema attributesSchema = getDynamicSchema();
DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -67,7 +68,6 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
@ -78,18 +78,50 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), true)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4)
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), false)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 0.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 0)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), true);
}
@Test
public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception {
public void testPushAttributesOnShortTopic() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortProtoTopic() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesGateway() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null);
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
TransportApiProtos.AttributesMsg firstDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName1, expectedKeys);
TransportApiProtos.AttributesMsg secondDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName2, expectedKeys);
gatewayAttributesMsgProtoBuilder.addAllMsg(Arrays.asList(firstDeviceAttributesMsgProto, secondDeviceAttributesMsgProto));
TransportApiProtos.GatewayAttributesMsg gatewayAttributesMsg = gatewayAttributesMsgProtoBuilder.build();
processGatewayAttributesTest(expectedKeys, gatewayAttributesMsg.toByteArray(), deviceName1, deviceName2);
}
private DynamicSchema getDynamicSchema() {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
@ -97,7 +129,11 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
return protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
}
private DynamicMessage getDefaultDynamicMessage() {
DynamicSchema attributesSchema = getDynamicSchema();
DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -108,6 +144,7 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
@ -117,25 +154,13 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
DynamicMessage.Builder postAttributesBuilder = attributesSchema.newMessageBuilder("PostAttributes");
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "")
return postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), true)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key5"), postAttributesMsg.toByteArray(), true);
}
@Test
public void testPushAttributesGateway() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null);
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
TransportApiProtos.AttributesMsg firstDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName1, expectedKeys);
TransportApiProtos.AttributesMsg secondDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName2, expectedKeys);
gatewayAttributesMsgProtoBuilder.addAllMsg(Arrays.asList(firstDeviceAttributesMsgProto, secondDeviceAttributesMsgProto));
TransportApiProtos.GatewayAttributesMsg gatewayAttributesMsg = gatewayAttributesMsgProtoBuilder.build();
processGatewayAttributesTest(expectedKeys, gatewayAttributesMsg.toByteArray(), deviceName1, deviceName2);
}
private TransportApiProtos.AttributesMsg getDeviceAttributesMsgProto(String deviceName, List<String> expectedKeys) {

12
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java

@ -73,6 +73,18 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryOnShortJsonTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

26
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java

@ -57,25 +57,23 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
processJsonPayloadTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
super.testPushTelemetryOnShortTopic();
}
@Test
public void testPushTelemetryWithTsOnShortJsonTopic() throws Exception {
super.testPushTelemetryOnShortJsonTopic();
}
@Test
public void testPushTelemetryGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
String payload = getGatewayTelemetryJsonPayload(deviceName1, deviceName2, "10000", "20000");
processGatewayTelemetryTest(MqttTopics.GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
super.testPushTelemetryGateway();
}
@Test
public void testGatewayConnect() throws Exception {
String payload = "{\"device\":\"Device A\", \"type\": \"" + TransportPayloadType.JSON.name() + "\"}";
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
String deviceName = "Device A";
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(device);
super.testGatewayConnect();
}
}

126
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java

@ -21,6 +21,7 @@ import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@ -55,41 +56,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
@Test
public void testPushTelemetry() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
DynamicMessage postTelemetryMsg = postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postTelemetryMsgDescriptor.findFieldByName("key2"), true)
.setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@ -121,14 +88,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicSchema telemetrySchema = getDynamicSchema(schemaStr);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -173,14 +133,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
@Test
public void testPushTelemetryWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicSchema telemetrySchema = getDynamicSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -237,14 +190,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicSchema telemetrySchema = getDynamicSchema(schemaStr);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -281,6 +227,26 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), true, true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
public void testPushTelemetryOnShortJsonTopic() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryOnShortProtoTopic() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
public void testPushTelemetryGateway() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
@ -310,6 +276,48 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
assertNotNull(device);
}
private DynamicSchema getDynamicSchema(String deviceTelemetryProtoSchema) {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(deviceTelemetryProtoSchema);
return protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
}
private DynamicMessage getDefaultDynamicMessage() {
DynamicSchema telemetrySchema = getDynamicSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
return postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postTelemetryMsgDescriptor.findFieldByName("key2"), true)
.setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
}
private TransportApiProtos.ConnectMsg getConnectProto(String deviceName) {
TransportApiProtos.ConnectMsg.Builder builder = TransportApiProtos.ConnectMsg.newBuilder();
builder.setDeviceName(deviceName);

55
common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java

@ -34,20 +34,25 @@ public class MqttTopics {
private static final String SOFTWARE = "/sw";
private static final String CHUNK = "/chunk/";
private static final String ERROR = "/error";
private static final String TELEMETRY_SHORT = "/t";
private static final String ATTRIBUTES_SHORT = "/a";
private static final String RPC_SHORT = "/r";
private static final String REQUEST_SHORT = "/req";
private static final String RESPONSE_SHORT = "/res";
private static final String JSON_SHORT = "j";
private static final String PROTO_SHORT = "p";
private static final String ATTRIBUTES_RESPONSE = ATTRIBUTES + RESPONSE;
private static final String ATTRIBUTES_REQUEST = ATTRIBUTES + REQUEST;
private static final String ATTRIBUTES_RESPONSE_SHORT = ATTRIBUTES_SHORT + RESPONSE_SHORT + "/";
private static final String ATTRIBUTES_REQUEST_SHORT = ATTRIBUTES_SHORT + REQUEST_SHORT + "/";
private static final String DEVICE_RPC_RESPONSE = RPC + RESPONSE + "/";
private static final String DEVICE_RPC_REQUEST = RPC + REQUEST + "/";
private static final String DEVICE_RPC_RESPONSE_SHORT = RPC_SHORT + RESPONSE_SHORT + "/";
private static final String DEVICE_RPC_REQUEST_SHORT = RPC_SHORT + REQUEST_SHORT + "/";
private static final String DEVICE_ATTRIBUTES_RESPONSE = ATTRIBUTES_RESPONSE + "/";
private static final String DEVICE_ATTRIBUTES_REQUEST = ATTRIBUTES_REQUEST + "/";
// V1_JSON topics
// v1 topics
public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me";
public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_RESPONSE;
public static final String DEVICE_RPC_RESPONSE_SUB_TOPIC = DEVICE_RPC_RESPONSE_TOPIC + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_REQUEST;
@ -60,9 +65,7 @@ public class MqttTopics {
public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + ATTRIBUTES;
public static final String DEVICE_PROVISION_REQUEST_TOPIC = PROVISION + REQUEST;
public static final String DEVICE_PROVISION_RESPONSE_TOPIC = PROVISION + RESPONSE;
// V1_JSON gateway topics
// v1 gateway topics
public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway";
public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + CONNECT;
public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + DISCONNECT;
@ -72,22 +75,44 @@ public class MqttTopics {
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC;
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST;
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE;
// v2 topics
public static final String BASE_DEVICE_API_TOPIC_V2 = "v2";
public static final String REQUEST_ID_PATTERN = "(?<requestId>\\d+)";
public static final String CHUNK_PATTERN = "(?<chunk>\\d+)";
public static final String DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN;
public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC;
public static final String DEVICE_FIRMWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + ERROR;
public static final String DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT = BASE_DEVICE_API_TOPIC_V2 + "/%s" + RESPONSE + "/%s" + CHUNK + "%d";
public static final String DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN;
public static final String DEVICE_SOFTWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC;
public static final String DEVICE_SOFTWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + ERROR;
public static final String DEVICE_ATTRIBUTES_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT;
public static final String DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT + "/" + JSON_SHORT;
public static final String DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT + "/" + PROTO_SHORT;
public static final String DEVICE_TELEMETRY_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT;
public static final String DEVICE_TELEMETRY_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT + "/" + JSON_SHORT;
public static final String DEVICE_TELEMETRY_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT + "/" + PROTO_SHORT;
public static final String DEVICE_RPC_RESPONSE_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT;
public static final String DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT + JSON_SHORT + "/";
public static final String DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT + PROTO_SHORT + "/";
public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + SUB_TOPIC;
public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + JSON_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + PROTO_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT;
public static final String DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT + JSON_SHORT + "/";
public static final String DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT + PROTO_SHORT + "/";
public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + JSON_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + PROTO_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_RESPONSE_SHORT;
public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + JSON_SHORT + "/";
public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + PROTO_SHORT + "/";
public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_REQUEST_SHORT;
public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX = DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX + JSON_SHORT + "/";
public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX = DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX + PROTO_SHORT + "/";
private MqttTopics() {
}

16
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java

@ -20,7 +20,6 @@ import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
@ -32,6 +31,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.coap.CoapTransportResource;
@ -44,8 +44,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(UUID sessionId, Request inbound, Descriptors.Descriptor telemetryMsgDescriptor) throws AdaptorException {
ProtoConverter.validateDescriptor(telemetryMsgDescriptor);
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), telemetryMsgDescriptor)));
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), telemetryMsgDescriptor)));
} catch (Exception e) {
throw new AdaptorException(e);
}
@ -53,8 +54,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.PostAttributeMsg convertToPostAttributes(UUID sessionId, Request inbound, Descriptors.Descriptor attributesMsgDescriptor) throws AdaptorException {
ProtoConverter.validateDescriptor(attributesMsgDescriptor);
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), attributesMsgDescriptor)));
return JsonConverter.convertToAttributesProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), attributesMsgDescriptor)));
} catch (Exception e) {
throw new AdaptorException(e);
}
@ -71,8 +73,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
if (requestId.isEmpty()) {
throw new AdaptorException("Request id is missing!");
} else {
ProtoConverter.validateDescriptor(rpcResponseMsgDescriptor);
try {
JsonElement response = new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor));
JsonElement response = new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor));
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.orElseThrow(() -> new AdaptorException("Request id is missing!")))
.setPayload(response.toString()).build();
} catch (Exception e) {
@ -158,11 +161,6 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
return response;
}
private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
@Override
public int getContentFormat() {
return MediaTypeRegistry.APPLICATION_OCTET_STREAM;

252
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -101,8 +101,6 @@ import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN;
/**
* @author Andrew Shvayka
@ -110,8 +108,8 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC
@Slf4j
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
@ -133,6 +131,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final ConcurrentHashMap<String, Integer> chunkSizes;
private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
private TopicType attrSubTopicType;
private TopicType rpcSubTopicType;
private TopicType attrReqTopicType;
private TopicType toServerRpcSubTopicType;
MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) {
this.sessionId = UUID.randomUUID();
this.context = context;
@ -355,14 +358,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V1;
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V1;
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
@ -370,6 +375,57 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE);
} else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) {
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V2_JSON;
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V2_PROTO;
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V2;
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V2_JSON;
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V2_PROTO;
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V2;
} else {
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
ack(ctx, msgId);
@ -541,19 +597,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
registerSubQoS(topic, grantedQoSList, reqQoS);
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
registerSubQoS(topic, grantedQoSList, reqQoS);
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
@ -580,6 +670,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
private void processRpcSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
rpcSubTopicType = topicType;
registerSubQoS(topic, grantedQoSList, reqQoS);
}
private void processAttributesSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
attrSubTopicType = topicType;
registerSubQoS(topic, grantedQoSList, reqQoS);
}
private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
@ -595,18 +697,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
mqttQoSMap.remove(new MqttTopicMatcher(topicName));
try {
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: {
activityReported = true;
break;
}
}
} catch (Exception e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
@ -837,8 +964,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
log.trace("[{}] Received get attributes response", sessionId);
String topicBase;
MqttTransportAdaptor adaptor;
switch (attrReqTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
adaptor.convertToPublish(deviceSessionCtx, response, topicBase).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
}
@ -847,8 +995,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to device", sessionId);
log.info("[{}] : attrSubTopicType: {}", notification.toString(), attrSubTopicType);
String topic;
MqttTransportAdaptor adaptor;
switch (attrSubTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
}
@ -862,9 +1031,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
log.info("[{}] Received RPC command to device", sessionId);
String baseTopic;
MqttTransportAdaptor adaptor;
switch (rpcSubTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> {
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
if (isAckExpected(payload)) {
rpcAwaitingAck.put(msgId, rpcRequest);
@ -898,9 +1087,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
log.trace("[{}] Received RPC command to server", sessionId);
log.trace("[{}] Received RPC response from server", sessionId);
String baseTopic;
MqttTransportAdaptor adaptor;
switch (toServerRpcSubTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
adaptor.convertToPublish(deviceSessionCtx, rpcResponse, baseTopic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
}
@ -923,4 +1132,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
}
private enum TopicType {
V1, V2, V2_JSON, V2_PROTO
}
}

56
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java

@ -64,6 +64,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode post telemetry request", ex);
throw new AdaptorException(ex);
}
}
@ -74,6 +75,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode post attributes request", ex);
throw new AdaptorException(ex);
}
}
@ -84,6 +86,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try {
return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload);
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode claim device request", ex);
throw new AdaptorException(ex);
}
}
@ -99,33 +102,33 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return processGetAttributeRequestMsg(inbound, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
return processGetAttributeRequestMsg(inbound, topicBase);
}
@Override
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return processToDeviceRpcResponseMsg(inbound, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
return processToDeviceRpcResponseMsg(inbound, topicBase);
}
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return processToServerRpcRequestMsg(ctx, inbound, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
return processToServerRpcRequestMsg(ctx, inbound, topicBase);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
return processConvertFromAttributeResponseMsg(ctx, responseMsg, MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX);
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
return processConvertFromAttributeResponseMsg(ctx, responseMsg, topicBase);
}
@Override
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg)));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
return Optional.of(createMqttPublishMsg(ctx, topic, JsonConverter.toJson(notificationMsg)));
}
@Override
@ -135,8 +138,8 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
}
@Override
@ -145,8 +148,8 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
}
@Override
@ -169,11 +172,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
protected TransportProtos.GetAttributeRequestMsg processGetAttributeRequestMsg(MqttPublishMessage inbound, String topic) throws AdaptorException {
private TransportProtos.GetAttributeRequestMsg processGetAttributeRequestMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
result.setRequestId(getRequestId(topicName, topic));
result.setRequestId(getRequestId(topicName, topicBase));
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
@ -191,49 +194,50 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
protected TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topic) throws AdaptorException {
private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
int requestId = getRequestId(topicName, topic);
int requestId = getRequestId(topicName, topicBase);
String payload = inbound.payload().toString(UTF8);
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
} catch (RuntimeException e) {
log.warn("Failed to decode Rpc response", e);
log.warn("Failed to decode rpc response", e);
throw new AdaptorException(e);
}
}
protected TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topic) throws AdaptorException {
private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
try {
int requestId = getRequestId(topicName, topic);
int requestId = getRequestId(topicName, topicBase);
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode to server rpc request", ex);
throw new AdaptorException(ex);
}
}
protected Optional<MqttMessage> processConvertFromAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topic) throws AdaptorException {
private Optional<MqttMessage> processConvertFromAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
} else {
int requestId = responseMsg.getRequestId();
if (requestId >= 0) {
return Optional.of(createMqttPublishMsg(ctx,
topic + requestId,
topicBase + requestId,
JsonConverter.toJson(responseMsg)));
}
return Optional.empty();
}
}
protected Optional<MqttMessage> processConvertFromGatewayAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, String topic) throws AdaptorException {
private Optional<MqttMessage> processConvertFromGatewayAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
} else {
JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, responseMsg);
return Optional.of(createMqttPublishMsg(ctx, topic, result));
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, result));
}
}

14
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java

@ -52,27 +52,27 @@ public interface MqttTransportAdaptor {
PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;

67
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java

@ -20,7 +20,6 @@ import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
@ -49,10 +48,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
Descriptors.Descriptor telemetryDynamicMsgDescriptor = getDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor());
Descriptors.Descriptor telemetryDynamicMsgDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor());
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor)));
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor)));
} catch (Exception e) {
log.warn("Failed to decode post telemetry request", e);
throw new AdaptorException(e);
}
}
@ -61,10 +61,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
Descriptors.Descriptor attributesDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
Descriptors.Descriptor attributesDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor)));
return JsonConverter.convertToAttributesProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor)));
} catch (Exception e) {
log.warn("Failed to decode post attributes request", e);
throw new AdaptorException(e);
}
}
@ -75,16 +76,17 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
try {
return ProtoConverter.convertToClaimDeviceProto(ctx.getDeviceId(), bytes);
} catch (InvalidProtocolBufferException e) {
log.warn("Failed to decode claim device request", e);
throw new AdaptorException(e);
}
}
@Override
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
byte[] bytes = toBytes(inbound.payload());
String topicName = inbound.variableHeader().topicName();
try {
int requestId = getRequestId(topicName, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
int requestId = getRequestId(topicName, topicBase);
return ProtoConverter.convertToGetAttributeRequestMessage(bytes, requestId);
} catch (InvalidProtocolBufferException e) {
log.warn("Failed to decode get attributes request", e);
@ -93,29 +95,30 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException {
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
String topicName = mqttMsg.variableHeader().topicName();
byte[] bytes = toBytes(mqttMsg.payload());
Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor());
Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor());
try {
int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
JsonElement response = new JsonParser().parse(dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor));
int requestId = getRequestId(topicName, topicBase);
JsonElement response = new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor));
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(response.toString()).build();
} catch (Exception e) {
log.warn("Failed to decode Rpc response", e);
log.warn("Failed to decode rpc response", e);
throw new AdaptorException(e);
}
}
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException {
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException {
byte[] bytes = toBytes(mqttMsg.payload());
String topicName = mqttMsg.variableHeader().topicName();
try {
int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
int requestId = getRequestId(topicName, topicBase);
return ProtoConverter.convertToServerRpcRequest(bytes, requestId);
} catch (InvalidProtocolBufferException e) {
log.warn("Failed to decode to server rpc request", e);
throw new AdaptorException(e);
}
}
@ -126,40 +129,43 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
try {
return ProtoConverter.convertToProvisionRequestMsg(bytes);
} catch (InvalidProtocolBufferException ex) {
log.warn("Failed to decode provision request", ex);
throw new AdaptorException(ex);
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
} else {
int requestId = responseMsg.getRequestId();
if (requestId >= 0) {
return Optional.of(createMqttPublishMsg(ctx,
MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
responseMsg.toByteArray()));
return Optional.of(createMqttPublishMsg(ctx, topicBase + requestId, responseMsg.toByteArray()));
}
return Optional.empty();
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
DynamicMessage.Builder rpcRequestDynamicMessageBuilder = deviceSessionCtx.getRpcRequestDynamicMessageBuilder();
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder)));
if (rpcRequestDynamicMessageBuilder == null) {
throw new AdaptorException("Failed to get rpcRequestDynamicMessageBuilder!");
} else {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder)));
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), rpcResponse.toByteArray()));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), rpcResponse.toByteArray()));
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, notificationMsg.toByteArray()));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
return Optional.of(createMqttPublishMsg(ctx, topic, notificationMsg.toByteArray()));
}
@Override
@ -213,17 +219,4 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
private int getRequestId(String topicName, String topic) {
return Integer.parseInt(topicName.substring(topic.length()));
}
private Descriptors.Descriptor getDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException {
if (descriptor == null) {
throw new AdaptorException("Failed to get dynamic message descriptor!");
}
return descriptor;
}
private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
}

14
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java

@ -19,6 +19,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
@ -190,4 +191,17 @@ public class ProtoConverter {
throw new AdaptorException("Failed to convert ToDeviceRpcRequestMsg to Dynamic Rpc request message due to: ", e);
}
}
public static Descriptors.Descriptor validateDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException {
if (descriptor == null) {
throw new AdaptorException("Failed to get dynamic message descriptor!");
}
return descriptor;
}
public static String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
}

Loading…
Cancel
Save