From 845d8247dd185a4dbae3cc43a6fa10ae3b033ea7 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Fri, 8 Oct 2021 15:20:38 +0300 Subject: [PATCH] [3.3.2] MQTT short topics support (#4967) * added mqtt short topics support * remove volatile keyword from topic types parameters * added new tests for mqtt short topics * fix compilation error after merge * improvements/typo-fixes after pull request review --- ...tMqttAttributesRequestIntegrationTest.java | 28 +- ...tAttributesRequestJsonIntegrationTest.java | 13 +- ...AttributesRequestProtoIntegrationTest.java | 29 +- ...tMqttAttributesUpdatesIntegrationTest.java | 16 +- ...tAttributesUpdatesJsonIntegrationTest.java | 12 +- ...AttributesUpdatesProtoIntegrationTest.java | 16 +- ...ttServerSideRpcDefaultIntegrationTest.java | 25 +- ...tractMqttServerSideRpcIntegrationTest.java | 17 +- ...tMqttServerSideRpcJsonIntegrationTest.java | 25 +- ...MqttServerSideRpcProtoIntegrationTest.java | 30 ++- ...AbstractMqttAttributesIntegrationTest.java | 12 + ...ractMqttAttributesJsonIntegrationTest.java | 17 +- ...actMqttAttributesProtoIntegrationTest.java | 91 ++++--- ...AbstractMqttTimeseriesIntegrationTest.java | 12 + ...ractMqttTimeseriesJsonIntegrationTest.java | 26 +- ...actMqttTimeseriesProtoIntegrationTest.java | 126 +++++---- .../data/device/profile/MqttTopics.java | 55 ++-- .../coap/adaptors/ProtoCoapAdaptor.java | 16 +- .../transport/mqtt/MqttTransportHandler.java | 252 ++++++++++++++++-- .../mqtt/adaptors/JsonMqttAdaptor.java | 56 ++-- .../mqtt/adaptors/MqttTransportAdaptor.java | 14 +- .../mqtt/adaptors/ProtoMqttAdaptor.java | 67 +++-- .../transport/adaptor/ProtoConverter.java | 14 + 23 files changed, 709 insertions(+), 260 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java index bfe9176b4d..c54e76e4a9 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java @@ -52,7 +52,17 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr @Test public void testRequestAttributesValuesFromTheServer() throws Exception { - processTestRequestAttributesValuesFromTheServer(); + processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); + } + + @Test + public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception { + processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX); + } + + @Test + public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception { + processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX); } @Test @@ -60,18 +70,18 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr processTestGatewayRequestAttributesValuesFromTheServer(); } - protected void processTestRequestAttributesValuesFromTheServer() throws Exception { + protected void processTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception { MqttAsyncClient client = getMqttAsyncClient(accessToken); - postAttributesAndSubscribeToTopic(savedDevice, client); + postAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic); Thread.sleep(5000); TestMqttCallback callback = getTestMqttCallback(); client.setCallback(callback); - validateResponse(client, callback.getLatch(), callback); + validateResponse(client, callback.getLatch(), callback, attrReqTopicPrefix); } protected void processTestGatewayRequestAttributesValuesFromTheServer() throws Exception { @@ -103,10 +113,10 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr validateSharedResponseGateway(client, sharedAttributesCallback); } - protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client) throws Exception { + protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception { doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - client.publish(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); - client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); + client.publish(attrPubTopic, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); + client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); } protected void postGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception { @@ -114,12 +124,12 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(postClientAttributes.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); } - protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException { + protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopicPrefix) throws MqttException, InterruptedException, InvalidProtocolBufferException { String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; String payloadStr = "{\"clientKeys\":\"" + keys + "\", \"sharedKeys\":\"" + keys + "\"}"; MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(payloadStr.getBytes()); - client.publish(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); + client.publish(attrReqTopicPrefix + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1)); latch.await(1, TimeUnit.MINUTES); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); String expectedRequestPayload = "{\"client\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}},\"shared\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java index c805572e3d..2bd1877ee1 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java @@ -20,6 +20,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.device.profile.MqttTopics; @Slf4j public abstract class AbstractMqttAttributesRequestJsonIntegrationTest extends AbstractMqttAttributesRequestIntegrationTest { @@ -36,7 +37,17 @@ public abstract class AbstractMqttAttributesRequestJsonIntegrationTest extends A @Test public void testRequestAttributesValuesFromTheServer() throws Exception { - processTestRequestAttributesValuesFromTheServer(); + super.testRequestAttributesValuesFromTheServer(); + } + + @Test + public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception { + super.testRequestAttributesValuesFromTheServerOnShortTopic(); + } + + @Test + public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception { + super.testRequestAttributesValuesFromTheServerOnShortJsonTopic(); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java index 6949a544de..3015d91ffb 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java @@ -84,7 +84,21 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends public void testRequestAttributesValuesFromTheServer() throws Exception { super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto", TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED); - processTestRequestAttributesValuesFromTheServer(); + processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); + } + + @Test + public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception { + super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto", + TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED); + processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX); + } + + @Test + public void testRequestAttributesValuesFromTheServerOnShortProtoTopic() throws Exception { + super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto", + TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED); + processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX); } @Test @@ -93,7 +107,10 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends processTestGatewayRequestAttributesValuesFromTheServer(); } - protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client) throws Exception { + @Test + public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception { } + + protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception { doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", AbstractMqttAttributesIntegrationTest.POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); @@ -131,8 +148,8 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends .setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject) .build(); byte[] payload = postAttributesMsg.toByteArray(); - client.publish(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, new MqttMessage(payload)); - client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttQoS.AT_MOST_ONCE.value()); + client.publish(attrPubTopic, new MqttMessage(payload)); + client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()); } protected void postGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception { @@ -149,7 +166,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(bytes)); } - protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException { + protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopic) throws MqttException, InterruptedException, InvalidProtocolBufferException { String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder(); attributesRequestBuilder.setClientKeys(keys); @@ -157,7 +174,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build(); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(attributesRequest.toByteArray()); - client.publish(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX + "1", mqttMessage); + client.publish(attrReqTopic + "1", mqttMessage); latch.await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); TransportProtos.GetAttributeResponseMsg expectedAttributesResponse = getExpectedAttributeResponseMsg(); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java index 61c5a25303..f179741abb 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java @@ -61,7 +61,17 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr @Test public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { - processTestSubscribeToAttributesUpdates(); + processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception { + processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception { + processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC); } @Test @@ -69,14 +79,14 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr processGatewayTestSubscribeToAttributesUpdates(); } - protected void processTestSubscribeToAttributesUpdates() throws Exception { + protected void processTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception { MqttAsyncClient client = getMqttAsyncClient(accessToken); TestMqttCallback onUpdateCallback = getTestMqttCallback(); client.setCallback(onUpdateCallback); - client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value()); + client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()); Thread.sleep(1000); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java index 54b2abccc3..4efadaccaa 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java @@ -36,7 +36,17 @@ public abstract class AbstractMqttAttributesUpdatesJsonIntegrationTest extends A @Test public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { - processTestSubscribeToAttributesUpdates(); + super.testSubscribeToAttributesUpdatesFromTheServer(); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception { + super.testSubscribeToAttributesUpdatesFromTheServerOnShortTopic(); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception { + super.testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic(); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java index 6c2565a016..606e05930d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java @@ -21,6 +21,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; @@ -46,7 +47,20 @@ public abstract class AbstractMqttAttributesUpdatesProtoIntegrationTest extends @Test public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { - processTestSubscribeToAttributesUpdates(); + super.testSubscribeToAttributesUpdatesFromTheServer(); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception { + super.testSubscribeToAttributesUpdatesFromTheServerOnShortTopic(); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {} + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerOnShortProtoTopic() throws Exception { + processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java index ea19ac6835..9be3d1d2ca 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java @@ -21,6 +21,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.service.security.AccessValidator; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -81,12 +82,32 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab @Test public void testServerMqttOneWayRpc() throws Exception { - processOneWayRpcTest(); + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + } + + @Test + public void testServerMqttOneWayRpcOnShortTopic() throws Exception { + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + } + + @Test + public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception { + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test public void testServerMqttTwoWayRpc() throws Exception { - processTwoWayRpcTest(); + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + } + + @Test + public void testServerMqttTwoWayRpcOnShortTopic() throws Exception { + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + } + + @Test + public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception { + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index e0ad7f79fc..1b468194b6 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -60,14 +60,14 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM asyncContextTimeoutToUseRpcPlugin = 10000L; } - protected void processOneWayRpcTest() throws Exception { + protected void processOneWayRpcTest(String rpcSubTopic) throws Exception { MqttAsyncClient client = getMqttAsyncClient(accessToken); CountDownLatch latch = new CountDownLatch(1); TestMqttCallback callback = new TestMqttCallback(client, latch); client.setCallback(callback); - client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE.value()); + client.subscribe(rpcSubTopic, MqttQoS.AT_MOST_ONCE.value()); Thread.sleep(1000); @@ -86,9 +86,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes); } - protected void processTwoWayRpcTest() throws Exception { + protected void processTwoWayRpcTest(String rpcSubTopic) throws Exception { MqttAsyncClient client = getMqttAsyncClient(accessToken); - client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1); + client.subscribe(rpcSubTopic, 1); CountDownLatch latch = new CountDownLatch(1); TestMqttCallback callback = new TestMqttCallback(client, latch); @@ -199,7 +199,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { MqttMessage message = new MqttMessage(); - if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) { + if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { message.setPayload(DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8)); } else { JsonNode requestMsgNode = JacksonUtil.toJsonNode(new String(mqttMessage.getPayload(), StandardCharset.UTF_8)); @@ -232,7 +232,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); - String responseTopic = requestTopic.replace("request", "response"); + String responseTopic; + if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { + responseTopic = requestTopic.replace("req", "res"); + } else { + responseTopic = requestTopic.replace("request", "response"); + } qoS = mqttMessage.getQos(); client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage)); latch.countDown(); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java index 4b2f38cbd9..f8c0cd8bed 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java @@ -21,6 +21,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.device.profile.MqttTopics; @Slf4j public abstract class AbstractMqttServerSideRpcJsonIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { @@ -37,12 +38,32 @@ public abstract class AbstractMqttServerSideRpcJsonIntegrationTest extends Abstr @Test public void testServerMqttOneWayRpc() throws Exception { - processOneWayRpcTest(); + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + } + + @Test + public void testServerMqttOneWayRpcOnShortTopic() throws Exception { + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + } + + @Test + public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception { + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test public void testServerMqttTwoWayRpc() throws Exception { - processTwoWayRpcTest(); + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + } + + @Test + public void testServerMqttTwoWayRpcOnShortTopic() throws Exception { + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + } + + @Test + public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception { + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } @Test diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java index 3792d66ac4..2c9f1f7889 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java @@ -78,12 +78,32 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst @Test public void testServerMqttOneWayRpc() throws Exception { - processOneWayRpcTest(); + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + } + + @Test + public void testServerMqttOneWayRpcOnShortTopic() throws Exception { + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + } + + @Test + public void testServerMqttOneWayRpcOnShortProtoTopic() throws Exception { + processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); } @Test public void testServerMqttTwoWayRpc() throws Exception { - processTwoWayRpcTest(); + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC); + } + + @Test + public void testServerMqttTwoWayRpcOnShortTopic() throws Exception { + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC); + } + + @Test + public void testServerMqttTwoWayRpcOnShortProtoTopic() throws Exception { + processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC); } @Test @@ -118,9 +138,9 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst return builder.build(); } - protected void processTwoWayRpcTest() throws Exception { + protected void processTwoWayRpcTest(String rpcSubTopic) throws Exception { MqttAsyncClient client = getMqttAsyncClient(accessToken); - client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1); + client.subscribe(rpcSubTopic, 1); CountDownLatch latch = new CountDownLatch(1); TestMqttCallback callback = new TestMqttCallback(client, latch); @@ -139,7 +159,7 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { MqttMessage message = new MqttMessage(); - if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) { + if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) { ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration(); ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA); DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java index 6a45ab5989..e6cf60fb2e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java @@ -60,6 +60,18 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes()); } + @Test + public void testPushAttributesOnShortTopic() throws Exception { + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes()); + } + + @Test + public void testPushAttributesOnShortJsonTopic() throws Exception { + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes()); + } + @Test public void testPushAttributesGateway() throws Exception { List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java index 345913a64f..9530bb2449 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java @@ -20,6 +20,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.device.profile.MqttTopics; import java.util.Arrays; import java.util.List; @@ -45,12 +46,18 @@ public abstract class AbstractMqttAttributesJsonIntegrationTest extends Abstract processJsonPayloadAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes()); } + @Test + public void testPushAttributesOnShortTopic() throws Exception { + super.testPushAttributesOnShortTopic(); + } + + @Test + public void testPushAttributesOnShortJsonTopic() throws Exception { + super.testPushAttributesOnShortJsonTopic(); + } + @Test public void testPushAttributesGateway() throws Exception { - List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); - String deviceName1 = "Device A"; - String deviceName2 = "Device B"; - String payload = getGatewayAttributesJsonPayload(deviceName1, deviceName2); - processGatewayAttributesTest(expectedKeys, payload.getBytes(), deviceName1, deviceName2); + super.testPushAttributesGateway(); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java index c77691981e..afe624c55b 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; import org.thingsboard.server.gen.transport.TransportApiProtos; @@ -49,14 +50,14 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac @Test public void testPushAttributes() throws Exception { super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC); - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); - assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); - MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; - TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration(); - assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); - ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; - ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA); - DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA); + DynamicMessage postAttributesMsg = getDefaultDynamicMessage(); + processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false); + } + + @Test + public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception { + super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC); + DynamicSchema attributesSchema = getDynamicSchema(); DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject"); Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); @@ -67,7 +68,6 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType(); assertNotNull(jsonObjectBuilderDescriptor); DynamicMessage jsonObject = jsonObjectBuilder - .setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42) .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1) .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2) .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3) @@ -78,18 +78,50 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType(); assertNotNull(postAttributesMsgDescriptor); DynamicMessage postAttributesMsg = postAttributesBuilder - .setField(postAttributesMsgDescriptor.findFieldByName("key1"), "value1") - .setField(postAttributesMsgDescriptor.findFieldByName("key2"), true) - .setField(postAttributesMsgDescriptor.findFieldByName("key3"), 3.0) - .setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4) + .setField(postAttributesMsgDescriptor.findFieldByName("key1"), "") + .setField(postAttributesMsgDescriptor.findFieldByName("key2"), false) + .setField(postAttributesMsgDescriptor.findFieldByName("key3"), 0.0) + .setField(postAttributesMsgDescriptor.findFieldByName("key4"), 0) .setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject) .build(); - processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false); + processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), true); } @Test - public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception { + public void testPushAttributesOnShortTopic() throws Exception { super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC); + DynamicMessage postAttributesMsg = getDefaultDynamicMessage(); + processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false); + } + + @Test + public void testPushAttributesOnShortJsonTopic() throws Exception { + super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC); + processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes()); + } + + @Test + public void testPushAttributesOnShortProtoTopic() throws Exception { + super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC); + DynamicMessage postAttributesMsg = getDefaultDynamicMessage(); + processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false); + } + + @Test + public void testPushAttributesGateway() throws Exception { + super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null); + TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder(); + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + String deviceName1 = "Device A"; + String deviceName2 = "Device B"; + TransportApiProtos.AttributesMsg firstDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName1, expectedKeys); + TransportApiProtos.AttributesMsg secondDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName2, expectedKeys); + gatewayAttributesMsgProtoBuilder.addAllMsg(Arrays.asList(firstDeviceAttributesMsgProto, secondDeviceAttributesMsgProto)); + TransportApiProtos.GatewayAttributesMsg gatewayAttributesMsg = gatewayAttributesMsgProtoBuilder.build(); + processGatewayAttributesTest(expectedKeys, gatewayAttributesMsg.toByteArray(), deviceName1, deviceName2); + } + + private DynamicSchema getDynamicSchema() { DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; @@ -97,7 +129,11 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA); - DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA); + return protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA); + } + + private DynamicMessage getDefaultDynamicMessage() { + DynamicSchema attributesSchema = getDynamicSchema(); DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject"); Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); @@ -108,6 +144,7 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType(); assertNotNull(jsonObjectBuilderDescriptor); DynamicMessage jsonObject = jsonObjectBuilder + .setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42) .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1) .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2) .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3) @@ -117,25 +154,13 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac DynamicMessage.Builder postAttributesBuilder = attributesSchema.newMessageBuilder("PostAttributes"); Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType(); assertNotNull(postAttributesMsgDescriptor); - DynamicMessage postAttributesMsg = postAttributesBuilder - .setField(postAttributesMsgDescriptor.findFieldByName("key1"), "") + return postAttributesBuilder + .setField(postAttributesMsgDescriptor.findFieldByName("key1"), "value1") + .setField(postAttributesMsgDescriptor.findFieldByName("key2"), true) + .setField(postAttributesMsgDescriptor.findFieldByName("key3"), 3.0) + .setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4) .setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject) .build(); - processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key5"), postAttributesMsg.toByteArray(), true); - } - - @Test - public void testPushAttributesGateway() throws Exception { - super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null); - TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder(); - List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); - String deviceName1 = "Device A"; - String deviceName2 = "Device B"; - TransportApiProtos.AttributesMsg firstDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName1, expectedKeys); - TransportApiProtos.AttributesMsg secondDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName2, expectedKeys); - gatewayAttributesMsgProtoBuilder.addAllMsg(Arrays.asList(firstDeviceAttributesMsgProto, secondDeviceAttributesMsgProto)); - TransportApiProtos.GatewayAttributesMsg gatewayAttributesMsg = gatewayAttributesMsgProtoBuilder.build(); - processGatewayAttributesTest(expectedKeys, gatewayAttributesMsg.toByteArray(), deviceName1, deviceName2); } private TransportApiProtos.AttributesMsg getDeviceAttributesMsgProto(String deviceName, List expectedKeys) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index 4bd8977abc..d7b4cc553e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -73,6 +73,18 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true); } + @Test + public void testPushTelemetryOnShortTopic() throws Exception { + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false); + } + + @Test + public void testPushTelemetryOnShortJsonTopic() throws Exception { + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false); + } + @Test public void testPushTelemetryGateway() throws Exception { List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java index 1b6107ff71..6887941e4d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java @@ -57,25 +57,23 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract processJsonPayloadTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true); } + @Test + public void testPushTelemetryOnShortTopic() throws Exception { + super.testPushTelemetryOnShortTopic(); + } + + @Test + public void testPushTelemetryWithTsOnShortJsonTopic() throws Exception { + super.testPushTelemetryOnShortJsonTopic(); + } + @Test public void testPushTelemetryGateway() throws Exception { - List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); - String deviceName1 = "Device A"; - String deviceName2 = "Device B"; - String payload = getGatewayTelemetryJsonPayload(deviceName1, deviceName2, "10000", "20000"); - processGatewayTelemetryTest(MqttTopics.GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2); + super.testPushTelemetryGateway(); } @Test public void testGatewayConnect() throws Exception { - String payload = "{\"device\":\"Device A\", \"type\": \"" + TransportPayloadType.JSON.name() + "\"}"; - MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); - publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC); - - String deviceName = "Device A"; - Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), - 20, - 100); - assertNotNull(device); + super.testGatewayConnect(); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java index 9b0fa5dd33..a10e19a35c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java @@ -21,6 +21,7 @@ import com.google.protobuf.DynamicMessage; import com.squareup.wire.schema.internal.parser.ProtoFileElement; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Ignore; import org.junit.Test; @@ -55,41 +56,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac @Test public void testPushTelemetry() throws Exception { super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null); - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); - assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); - MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; - TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration(); - assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); - ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; - ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA); - DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema"); - - DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject"); - Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); - assertNotNull(nestedJsonObjectBuilderDescriptor); - DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build(); - - DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject"); - Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType(); - assertNotNull(jsonObjectBuilderDescriptor); - DynamicMessage jsonObject = jsonObjectBuilder - .setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42) - .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1) - .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2) - .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3) - .setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject) - .build(); - - DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry"); - Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType(); - assertNotNull(postTelemetryMsgDescriptor); - DynamicMessage postTelemetryMsg = postTelemetryBuilder - .setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "value1") - .setField(postTelemetryMsgDescriptor.findFieldByName("key2"), true) - .setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 3.0) - .setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4) - .setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject) - .build(); + DynamicMessage postTelemetryMsg = getDefaultDynamicMessage(); processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false); } @@ -121,14 +88,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac " }\n" + "}"; super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED); - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); - assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); - MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; - TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration(); - assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); - ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; - ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr); - DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema"); + DynamicSchema telemetrySchema = getDynamicSchema(schemaStr); DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject"); Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); @@ -173,14 +133,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac @Test public void testPushTelemetryWithExplicitPresenceProtoKeys() throws Exception { super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null); - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); - assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); - MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; - TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration(); - assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); - ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; - ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA); - DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema"); + DynamicSchema telemetrySchema = getDynamicSchema(DEVICE_TELEMETRY_PROTO_SCHEMA); DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject"); Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); @@ -237,14 +190,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac " }\n" + "}"; super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED); - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); - assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); - MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; - TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration(); - assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); - ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; - ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr); - DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema"); + DynamicSchema telemetrySchema = getDynamicSchema(schemaStr); DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject"); Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); @@ -281,6 +227,26 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), true, true); } + @Test + public void testPushTelemetryOnShortTopic() throws Exception { + super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null); + DynamicMessage postTelemetryMsg = getDefaultDynamicMessage(); + processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false); + } + + @Test + public void testPushTelemetryOnShortJsonTopic() throws Exception { + super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null); + processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes(), false); + } + + @Test + public void testPushTelemetryOnShortProtoTopic() throws Exception { + super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null); + DynamicMessage postTelemetryMsg = getDefaultDynamicMessage(); + processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false); + } + @Test public void testPushTelemetryGateway() throws Exception { super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED); @@ -310,6 +276,48 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac assertNotNull(device); } + private DynamicSchema getDynamicSchema(String deviceTelemetryProtoSchema) { + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); + assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); + MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration; + TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration(); + assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration); + ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; + ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(deviceTelemetryProtoSchema); + return protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema"); + } + + private DynamicMessage getDefaultDynamicMessage() { + DynamicSchema telemetrySchema = getDynamicSchema(DEVICE_TELEMETRY_PROTO_SCHEMA); + + DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject"); + Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType(); + assertNotNull(nestedJsonObjectBuilderDescriptor); + DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build(); + + DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject"); + Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType(); + assertNotNull(jsonObjectBuilderDescriptor); + DynamicMessage jsonObject = jsonObjectBuilder + .setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42) + .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1) + .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2) + .addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3) + .setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject) + .build(); + + DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry"); + Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType(); + assertNotNull(postTelemetryMsgDescriptor); + return postTelemetryBuilder + .setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "value1") + .setField(postTelemetryMsgDescriptor.findFieldByName("key2"), true) + .setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 3.0) + .setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4) + .setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject) + .build(); + } + private TransportApiProtos.ConnectMsg getConnectProto(String deviceName) { TransportApiProtos.ConnectMsg.Builder builder = TransportApiProtos.ConnectMsg.newBuilder(); builder.setDeviceName(deviceName); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java index 2b1aba3c0f..93d2f28ab2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java @@ -34,20 +34,25 @@ public class MqttTopics { private static final String SOFTWARE = "/sw"; private static final String CHUNK = "/chunk/"; private static final String ERROR = "/error"; - + private static final String TELEMETRY_SHORT = "/t"; + private static final String ATTRIBUTES_SHORT = "/a"; + private static final String RPC_SHORT = "/r"; + private static final String REQUEST_SHORT = "/req"; + private static final String RESPONSE_SHORT = "/res"; + private static final String JSON_SHORT = "j"; + private static final String PROTO_SHORT = "p"; private static final String ATTRIBUTES_RESPONSE = ATTRIBUTES + RESPONSE; private static final String ATTRIBUTES_REQUEST = ATTRIBUTES + REQUEST; - + private static final String ATTRIBUTES_RESPONSE_SHORT = ATTRIBUTES_SHORT + RESPONSE_SHORT + "/"; + private static final String ATTRIBUTES_REQUEST_SHORT = ATTRIBUTES_SHORT + REQUEST_SHORT + "/"; private static final String DEVICE_RPC_RESPONSE = RPC + RESPONSE + "/"; private static final String DEVICE_RPC_REQUEST = RPC + REQUEST + "/"; - + private static final String DEVICE_RPC_RESPONSE_SHORT = RPC_SHORT + RESPONSE_SHORT + "/"; + private static final String DEVICE_RPC_REQUEST_SHORT = RPC_SHORT + REQUEST_SHORT + "/"; private static final String DEVICE_ATTRIBUTES_RESPONSE = ATTRIBUTES_RESPONSE + "/"; private static final String DEVICE_ATTRIBUTES_REQUEST = ATTRIBUTES_REQUEST + "/"; - - // V1_JSON topics - + // v1 topics public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me"; - public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_RESPONSE; public static final String DEVICE_RPC_RESPONSE_SUB_TOPIC = DEVICE_RPC_RESPONSE_TOPIC + SUB_TOPIC; public static final String DEVICE_RPC_REQUESTS_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_REQUEST; @@ -60,9 +65,7 @@ public class MqttTopics { public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + ATTRIBUTES; public static final String DEVICE_PROVISION_REQUEST_TOPIC = PROVISION + REQUEST; public static final String DEVICE_PROVISION_RESPONSE_TOPIC = PROVISION + RESPONSE; - - // V1_JSON gateway topics - + // v1 gateway topics public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + CONNECT; public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + DISCONNECT; @@ -72,22 +75,44 @@ public class MqttTopics { public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE; - // v2 topics public static final String BASE_DEVICE_API_TOPIC_V2 = "v2"; - public static final String REQUEST_ID_PATTERN = "(?\\d+)"; public static final String CHUNK_PATTERN = "(?\\d+)"; - public static final String DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN; public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC; public static final String DEVICE_FIRMWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + ERROR; - public static final String DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT = BASE_DEVICE_API_TOPIC_V2 + "/%s" + RESPONSE + "/%s" + CHUNK + "%d"; - public static final String DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN; public static final String DEVICE_SOFTWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC; public static final String DEVICE_SOFTWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + ERROR; + public static final String DEVICE_ATTRIBUTES_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT; + public static final String DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT + "/" + JSON_SHORT; + public static final String DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT + "/" + PROTO_SHORT; + public static final String DEVICE_TELEMETRY_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT; + public static final String DEVICE_TELEMETRY_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT + "/" + JSON_SHORT; + public static final String DEVICE_TELEMETRY_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT + "/" + PROTO_SHORT; + public static final String DEVICE_RPC_RESPONSE_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT; + public static final String DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT + JSON_SHORT + "/"; + public static final String DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT + PROTO_SHORT + "/"; + public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + SUB_TOPIC; + public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + JSON_SHORT + "/" + SUB_TOPIC; + public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + PROTO_SHORT + "/" + SUB_TOPIC; + public static final String DEVICE_RPC_REQUESTS_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT; + public static final String DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT + JSON_SHORT + "/"; + public static final String DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT + PROTO_SHORT + "/"; + public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + SUB_TOPIC; + public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + JSON_SHORT + "/" + SUB_TOPIC; + public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + PROTO_SHORT + "/" + SUB_TOPIC; + public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_RESPONSE_SHORT; + public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + SUB_TOPIC; + public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + JSON_SHORT + "/"; + public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX + SUB_TOPIC; + public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + PROTO_SHORT + "/"; + public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX + SUB_TOPIC; + public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_REQUEST_SHORT; + public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX = DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX + JSON_SHORT + "/"; + public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX = DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX + PROTO_SHORT + "/"; private MqttTopics() { } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java index 1c8f84fbd0..1ad29524d2 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java @@ -20,7 +20,6 @@ import com.google.gson.JsonParser; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.MediaTypeRegistry; @@ -32,6 +31,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; +import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.CoapTransportResource; @@ -44,8 +44,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { @Override public TransportProtos.PostTelemetryMsg convertToPostTelemetry(UUID sessionId, Request inbound, Descriptors.Descriptor telemetryMsgDescriptor) throws AdaptorException { + ProtoConverter.validateDescriptor(telemetryMsgDescriptor); try { - return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), telemetryMsgDescriptor))); + return JsonConverter.convertToTelemetryProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), telemetryMsgDescriptor))); } catch (Exception e) { throw new AdaptorException(e); } @@ -53,8 +54,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { @Override public TransportProtos.PostAttributeMsg convertToPostAttributes(UUID sessionId, Request inbound, Descriptors.Descriptor attributesMsgDescriptor) throws AdaptorException { + ProtoConverter.validateDescriptor(attributesMsgDescriptor); try { - return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), attributesMsgDescriptor))); + return JsonConverter.convertToAttributesProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), attributesMsgDescriptor))); } catch (Exception e) { throw new AdaptorException(e); } @@ -71,8 +73,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { if (requestId.isEmpty()) { throw new AdaptorException("Request id is missing!"); } else { + ProtoConverter.validateDescriptor(rpcResponseMsgDescriptor); try { - JsonElement response = new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor)); + JsonElement response = new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor)); return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.orElseThrow(() -> new AdaptorException("Request id is missing!"))) .setPayload(response.toString()).build(); } catch (Exception e) { @@ -158,11 +161,6 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { return response; } - private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException { - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes); - return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage); - } - @Override public int getContentFormat() { return MediaTypeRegistry.APPLICATION_OCTET_STREAM; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 0a115e12fa..35e6e8c508 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -101,8 +101,6 @@ import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; -import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN; -import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN; /** * @author Andrew Shvayka @@ -110,8 +108,8 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC @Slf4j public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener>, SessionMsgListener { - private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); - private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); + private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); + private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; @@ -133,6 +131,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final ConcurrentHashMap chunkSizes; private final ConcurrentMap rpcAwaitingAck; + private TopicType attrSubTopicType; + private TopicType rpcSubTopicType; + private TopicType attrReqTopicType; + private TopicType toServerRpcSubTopicType; + MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) { this.sessionId = UUID.randomUUID(); this.context = context; @@ -355,14 +358,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { - TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg); + TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); + attrReqTopicType = TopicType.V1; } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) { - TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg); + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { - TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg); + TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + toServerRpcSubTopicType = TopicType.V1; } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); @@ -370,6 +375,57 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE); } else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) { getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE); + } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) { + TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) { + TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) { + TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) { + TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); + } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) { + TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); + } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) { + TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) { + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) { + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) { + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) { + TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + toServerRpcSubTopicType = TopicType.V2_JSON; + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) { + TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + toServerRpcSubTopicType = TopicType.V2_PROTO; + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) { + TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + toServerRpcSubTopicType = TopicType.V2; + } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) { + TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX); + transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); + attrReqTopicType = TopicType.V2_JSON; + } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) { + TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX); + transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); + attrReqTopicType = TopicType.V2_PROTO; + } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) { + TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX); + transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); + attrReqTopicType = TopicType.V2; } else { transportService.reportActivity(deviceSessionCtx.getSessionInfo()); ack(ctx, msgId); @@ -541,19 +597,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { switch (topic) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { - transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); - registerSubQoS(topic, grantedQoSList, reqQoS); + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); activityReported = true; break; } case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { - transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); - registerSubQoS(topic, grantedQoSList, reqQoS); + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); activityReported = true; break; } case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: @@ -580,6 +670,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); } + private void processRpcSubscribe(List grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) { + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); + rpcSubTopicType = topicType; + registerSubQoS(topic, grantedQoSList, reqQoS); + } + + private void processAttributesSubscribe(List grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) { + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); + attrSubTopicType = topicType; + registerSubQoS(topic, grantedQoSList, reqQoS); + } + private void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { grantedQoSList.add(getMinSupportedQos(reqQoS)); mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS)); @@ -595,18 +697,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement mqttQoSMap.remove(new MqttTopicMatcher(topicName)); try { switch (topicName) { - case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: { transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null); activityReported = true; break; } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: { transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null); activityReported = true; break; } + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: + case MqttTopics.GATEWAY_RPC_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: + case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: + case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: + case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: + case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: + case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: { + activityReported = true; + break; + } } } catch (Exception e) { log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); @@ -837,8 +964,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { + log.trace("[{}] Received get attributes response", sessionId); + String topicBase; + MqttTransportAdaptor adaptor; + switch (attrReqTopicType) { + case V2: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX; + break; + case V2_JSON: + adaptor = context.getJsonMqttAdaptor(); + topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX; + break; + case V2_PROTO: + adaptor = context.getProtoMqttAdaptor(); + topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX; + break; + default: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX; + break; + } try { - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + adaptor.convertToPublish(deviceSessionCtx, response, topicBase).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } catch (Exception e) { log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); } @@ -847,8 +995,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { log.trace("[{}] Received attributes update notification to device", sessionId); + log.info("[{}] : attrSubTopicType: {}", notification.toString(), attrSubTopicType); + String topic; + MqttTransportAdaptor adaptor; + switch (attrSubTopicType) { + case V2: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC; + break; + case V2_JSON: + adaptor = context.getJsonMqttAdaptor(); + topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC; + break; + case V2_PROTO: + adaptor = context.getProtoMqttAdaptor(); + topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC; + break; + default: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + topic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC; + break; + } try { - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } catch (Exception e) { log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e); } @@ -862,9 +1031,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { - log.trace("[{}] Received RPC command to device", sessionId); + log.info("[{}] Received RPC command to device", sessionId); + String baseTopic; + MqttTransportAdaptor adaptor; + switch (rpcSubTopicType) { + case V2: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC; + break; + case V2_JSON: + adaptor = context.getJsonMqttAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC; + break; + case V2_PROTO: + adaptor = context.getProtoMqttAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC; + break; + default: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; + break; + } try { - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { + adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> { int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); if (isAckExpected(payload)) { rpcAwaitingAck.put(msgId, rpcRequest); @@ -898,9 +1087,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) { - log.trace("[{}] Received RPC command to server", sessionId); + log.trace("[{}] Received RPC response from server", sessionId); + String baseTopic; + MqttTransportAdaptor adaptor; + switch (toServerRpcSubTopicType) { + case V2: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC; + break; + case V2_JSON: + adaptor = context.getJsonMqttAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC; + break; + case V2_PROTO: + adaptor = context.getProtoMqttAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC; + break; + default: + adaptor = deviceSessionCtx.getPayloadAdaptor(); + baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; + break; + } try { - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + adaptor.convertToPublish(deviceSessionCtx, rpcResponse, baseTopic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } catch (Exception e) { log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); } @@ -923,4 +1132,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); } + + private enum TopicType { + V1, V2, V2_JSON, V2_PROTO + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 19d2e15fc5..dc2b74a8ea 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -64,6 +64,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { try { return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload)); } catch (IllegalStateException | JsonSyntaxException ex) { + log.warn("Failed to decode post telemetry request", ex); throw new AdaptorException(ex); } } @@ -74,6 +75,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { try { return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload)); } catch (IllegalStateException | JsonSyntaxException ex) { + log.warn("Failed to decode post attributes request", ex); throw new AdaptorException(ex); } } @@ -84,6 +86,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { try { return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload); } catch (IllegalStateException | JsonSyntaxException ex) { + log.warn("Failed to decode claim device request", ex); throw new AdaptorException(ex); } } @@ -99,33 +102,33 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } @Override - public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - return processGetAttributeRequestMsg(inbound, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); + public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { + return processGetAttributeRequestMsg(inbound, topicBase); } @Override - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - return processToDeviceRpcResponseMsg(inbound, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC); + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { + return processToDeviceRpcResponseMsg(inbound, topicBase); } @Override - public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - return processToServerRpcRequestMsg(ctx, inbound, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); + public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { + return processToServerRpcRequestMsg(ctx, inbound, topicBase); } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { - return processConvertFromAttributeResponseMsg(ctx, responseMsg, MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX); + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException { + return processConvertFromAttributeResponseMsg(ctx, responseMsg, topicBase); } @Override public Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { - return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); + return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg); } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) { - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg))); + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) { + return Optional.of(createMqttPublishMsg(ctx, topic, JsonConverter.toJson(notificationMsg))); } @Override @@ -135,8 +138,8 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false))); + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) { + return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false))); } @Override @@ -145,8 +148,8 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) { - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse))); + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) { + return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse))); } @Override @@ -169,11 +172,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } - protected TransportProtos.GetAttributeRequestMsg processGetAttributeRequestMsg(MqttPublishMessage inbound, String topic) throws AdaptorException { + private TransportProtos.GetAttributeRequestMsg processGetAttributeRequestMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); try { TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder(); - result.setRequestId(getRequestId(topicName, topic)); + result.setRequestId(getRequestId(topicName, topicBase)); String payload = inbound.payload().toString(UTF8); JsonElement requestBody = new JsonParser().parse(payload); Set clientKeys = toStringSet(requestBody, "clientKeys"); @@ -191,49 +194,50 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } - protected TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topic) throws AdaptorException { + private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); try { - int requestId = getRequestId(topicName, topic); + int requestId = getRequestId(topicName, topicBase); String payload = inbound.payload().toString(UTF8); return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build(); } catch (RuntimeException e) { - log.warn("Failed to decode Rpc response", e); + log.warn("Failed to decode rpc response", e); throw new AdaptorException(e); } } - protected TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topic) throws AdaptorException { + private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false); try { - int requestId = getRequestId(topicName, topic); + int requestId = getRequestId(topicName, topicBase); return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId); } catch (IllegalStateException | JsonSyntaxException ex) { + log.warn("Failed to decode to server rpc request", ex); throw new AdaptorException(ex); } } - protected Optional processConvertFromAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topic) throws AdaptorException { + private Optional processConvertFromAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException { if (!StringUtils.isEmpty(responseMsg.getError())) { throw new AdaptorException(responseMsg.getError()); } else { int requestId = responseMsg.getRequestId(); if (requestId >= 0) { return Optional.of(createMqttPublishMsg(ctx, - topic + requestId, + topicBase + requestId, JsonConverter.toJson(responseMsg))); } return Optional.empty(); } } - protected Optional processConvertFromGatewayAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, String topic) throws AdaptorException { + private Optional processConvertFromGatewayAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { if (!StringUtils.isEmpty(responseMsg.getError())) { throw new AdaptorException(responseMsg.getError()); } else { JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, responseMsg); - return Optional.of(createMqttPublishMsg(ctx, topic, result)); + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, result)); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 62ae6ae027..53ef439eda 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -52,27 +52,27 @@ public interface MqttTransportAdaptor { PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; - GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; + GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException; - ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException; + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException; - ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException; + ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException; ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; - Optional convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException; Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException; - Optional convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException; Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; - Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException; Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; - Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException; ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java index a007c73a5a..4fa47b367d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java @@ -20,7 +20,6 @@ import com.google.gson.JsonParser; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -49,10 +48,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; byte[] bytes = toBytes(inbound.payload()); - Descriptors.Descriptor telemetryDynamicMsgDescriptor = getDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor()); + Descriptors.Descriptor telemetryDynamicMsgDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor()); try { - return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor))); + return JsonConverter.convertToTelemetryProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor))); } catch (Exception e) { + log.warn("Failed to decode post telemetry request", e); throw new AdaptorException(e); } } @@ -61,10 +61,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; byte[] bytes = toBytes(inbound.payload()); - Descriptors.Descriptor attributesDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor()); + Descriptors.Descriptor attributesDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor()); try { - return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor))); + return JsonConverter.convertToAttributesProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor))); } catch (Exception e) { + log.warn("Failed to decode post attributes request", e); throw new AdaptorException(e); } } @@ -75,16 +76,17 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { try { return ProtoConverter.convertToClaimDeviceProto(ctx.getDeviceId(), bytes); } catch (InvalidProtocolBufferException e) { + log.warn("Failed to decode claim device request", e); throw new AdaptorException(e); } } @Override - public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { + public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { byte[] bytes = toBytes(inbound.payload()); String topicName = inbound.variableHeader().topicName(); try { - int requestId = getRequestId(topicName, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); + int requestId = getRequestId(topicName, topicBase); return ProtoConverter.convertToGetAttributeRequestMessage(bytes, requestId); } catch (InvalidProtocolBufferException e) { log.warn("Failed to decode get attributes request", e); @@ -93,29 +95,30 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { } @Override - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException { + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException { DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; String topicName = mqttMsg.variableHeader().topicName(); byte[] bytes = toBytes(mqttMsg.payload()); - Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor()); + Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor()); try { - int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC); - JsonElement response = new JsonParser().parse(dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor)); + int requestId = getRequestId(topicName, topicBase); + JsonElement response = new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor)); return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(response.toString()).build(); } catch (Exception e) { - log.warn("Failed to decode Rpc response", e); + log.warn("Failed to decode rpc response", e); throw new AdaptorException(e); } } @Override - public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException { + public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException { byte[] bytes = toBytes(mqttMsg.payload()); String topicName = mqttMsg.variableHeader().topicName(); try { - int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); + int requestId = getRequestId(topicName, topicBase); return ProtoConverter.convertToServerRpcRequest(bytes, requestId); } catch (InvalidProtocolBufferException e) { + log.warn("Failed to decode to server rpc request", e); throw new AdaptorException(e); } } @@ -126,40 +129,43 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { try { return ProtoConverter.convertToProvisionRequestMsg(bytes); } catch (InvalidProtocolBufferException ex) { + log.warn("Failed to decode provision request", ex); throw new AdaptorException(ex); } } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException { if (!StringUtils.isEmpty(responseMsg.getError())) { throw new AdaptorException(responseMsg.getError()); } else { int requestId = responseMsg.getRequestId(); if (requestId >= 0) { - return Optional.of(createMqttPublishMsg(ctx, - MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId, - responseMsg.toByteArray())); + return Optional.of(createMqttPublishMsg(ctx, topicBase + requestId, responseMsg.toByteArray())); } return Optional.empty(); } } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException { + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException { DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; DynamicMessage.Builder rpcRequestDynamicMessageBuilder = deviceSessionCtx.getRpcRequestDynamicMessageBuilder(); - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder))); + if (rpcRequestDynamicMessageBuilder == null) { + throw new AdaptorException("Failed to get rpcRequestDynamicMessageBuilder!"); + } else { + return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder))); + } } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) { - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), rpcResponse.toByteArray())); + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) { + return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), rpcResponse.toByteArray())); } @Override - public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) { - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, notificationMsg.toByteArray())); + public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) { + return Optional.of(createMqttPublishMsg(ctx, topic, notificationMsg.toByteArray())); } @Override @@ -213,17 +219,4 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { private int getRequestId(String topicName, String topic) { return Integer.parseInt(topicName.substring(topic.length())); } - - private Descriptors.Descriptor getDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException { - if (descriptor == null) { - throw new AdaptorException("Failed to get dynamic message descriptor!"); - } - return descriptor; - } - - private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException { - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes); - return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage); - } - } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java index 39d4db069e..0d1348b462 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -190,4 +191,17 @@ public class ProtoConverter { throw new AdaptorException("Failed to convert ToDeviceRpcRequestMsg to Dynamic Rpc request message due to: ", e); } } + + public static Descriptors.Descriptor validateDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException { + if (descriptor == null) { + throw new AdaptorException("Failed to get dynamic message descriptor!"); + } + return descriptor; + } + + public static String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException { + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes); + return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage); + } + }