Browse Source

added parser tests

pull/3740/head
ShvaykaD 5 years ago
parent
commit
4eafa92cb7
  1. 40
      application/src/test/java/org/thingsboard/server/mqtt/AbstractMqttIntegrationTest.java
  2. 3
      application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java
  3. 1
      application/src/test/java/org/thingsboard/server/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java
  4. 3
      application/src/test/java/org/thingsboard/server/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java
  5. 18
      application/src/test/java/org/thingsboard/server/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java
  6. 18
      application/src/test/java/org/thingsboard/server/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java
  7. 2
      application/src/test/java/org/thingsboard/server/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java
  8. 408
      application/src/test/java/org/thingsboard/server/mqtt/proto/DynamicProtoTest.java
  9. 4
      application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java
  10. 244
      common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttProtoDeviceProfileTransportConfiguration.java
  11. 2
      pom.xml

40
application/src/test/java/org/thingsboard/server/mqtt/AbstractMqttIntegrationTest.java

@ -62,6 +62,41 @@ public abstract class AbstractMqttIntegrationTest extends AbstractControllerTest
private static final AtomicInteger atomicInteger = new AtomicInteger(2);
protected static final String DEVICE_TELEMETRY_PROTO_SCHEMA = "syntax = \"proto2\";\n" +
"package test;\n" +
"\n" +
"option java_package = \"org.thingsboard.server.gen.test\";\n" +
"option java_outer_classname = \"TestProtos\";\n" +
"\n" +
"message nestedPostTelemetry {\n" +
" required bool bool_v = 1;\n" +
" required int64 long_v = 2;\n" +
" required double double_v = 3;\n" +
" required string string_v = 4;\n" +
" required string json_v = 5;\n" +
"}\n" +
"\n" +
"message PostTelemetry {\n" +
" repeated nestedPostTelemetry postTelemetry = 1;\n" +
"}";
protected static final String DEVICE_ATTRIBUTES_PROTO_SCHEMA = "syntax = \"proto2\";\n" +
"package test;\n" +
"\n" +
"option java_package = \"org.thingsboard.server.gen.test\";\n" +
"option java_outer_classname = \"TestProtos\";\n" +
"\n" +
"message nestedPostAttributes {\n" +
" required bool bool_v = 1;\n" +
" required int64 long_v = 2;\n" +
" required double double_v = 3;\n" +
" required string string_v = 4;\n" +
" required string json_v = 5;\n" +
"}\n" +
"\n" +
"message PostAttributes {\n" +
" repeated nestedPostAttributes postAttributes = 1;\n" +
"}";
protected Tenant savedTenant;
protected User tenantAdmin;
@ -219,7 +254,10 @@ public abstract class AbstractMqttIntegrationTest extends AbstractControllerTest
if (TransportPayloadType.JSON.equals(transportPayloadType)) {
transportConfiguration = new MqttJsonDeviceProfileTransportConfiguration();
} else {
transportConfiguration = new MqttProtoDeviceProfileTransportConfiguration();
MqttProtoDeviceProfileTransportConfiguration protoTransportConfiguration = new MqttProtoDeviceProfileTransportConfiguration();
protoTransportConfiguration.setDeviceAttributesProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
protoTransportConfiguration.setDeviceTelemetryProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
transportConfiguration = protoTransportConfiguration;
}
if (!StringUtils.isEmpty(telemetryTopic)) {
transportConfiguration.setDeviceTelemetryTopic(telemetryTopic);

3
application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java

@ -32,7 +32,8 @@ import java.util.Arrays;
"org.thingsboard.server.mqtt.attributes.updates.sql.*Test",
"org.thingsboard.server.mqtt.attributes.request.sql.*Test",
"org.thingsboard.server.mqtt.claim.sql.*Test",
"org.thingsboard.server.mqtt.provision.sql.*Test"
"org.thingsboard.server.mqtt.provision.sql.*Test",
"org.thingsboard.server.mqtt.proto.sql.*Test"
})
public class MqttSqlTestSuite {

1
application/src/test/java/org/thingsboard/server/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java

@ -25,6 +25,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.profile.MqttTopics;

3
application/src/test/java/org/thingsboard/server/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java

@ -55,15 +55,14 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
processAfterTest();
}
@Test
@Ignore
@Test
public void testRequestAttributesValuesFromTheServer() throws Exception {
processTestRequestAttributesValuesFromTheServer();
}
@Test
@Ignore
public void testRequestAttributesValuesFromTheServerGateway() throws Exception {
processTestGatewayRequestAttributesValuesFromTheServer();
}

18
application/src/test/java/org/thingsboard/server/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java

@ -65,15 +65,15 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
processAfterTest();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerGateway() throws Exception {
processGatewayTestSubscribeToAttributesUpdates();
}
// @Test
// public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
// processTestSubscribeToAttributesUpdates();
// }
//
// @Test
// public void testSubscribeToAttributesUpdatesFromTheServerGateway() throws Exception {
// processGatewayTestSubscribeToAttributesUpdates();
// }
protected void processTestSubscribeToAttributesUpdates() throws Exception {

18
application/src/test/java/org/thingsboard/server/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java

@ -39,13 +39,13 @@ public abstract class AbstractMqttAttributesUpdatesJsonIntegrationTest extends A
processAfterTest();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerGateway() throws Exception {
processGatewayTestSubscribeToAttributesUpdates();
}
// @Test
// public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
// processTestSubscribeToAttributesUpdates();
// }
//
// @Test
// public void testSubscribeToAttributesUpdatesFromTheServerGateway() throws Exception {
// processGatewayTestSubscribeToAttributesUpdates();
// }
}

2
application/src/test/java/org/thingsboard/server/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java

@ -48,13 +48,11 @@ public abstract class AbstractMqttAttributesUpdatesProtoIntegrationTest extends
}
@Test
@Ignore
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
}
@Test
@Ignore
public void testSubscribeToAttributesUpdatesFromTheServerGateway() throws Exception {
processGatewayTestSubscribeToAttributesUpdates();
}

408
application/src/test/java/org/thingsboard/server/mqtt/proto/DynamicProtoTest.java

@ -0,0 +1,408 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.mqtt.proto;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.ProtoParser;
import lombok.extern.slf4j.Slf4j;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.thingsboard.server.common.data.device.profile.MqttProtoDeviceProfileTransportConfiguration;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.thingsboard.server.common.data.device.profile.MqttProtoDeviceProfileTransportConfiguration.LOCATION;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class DynamicProtoTest {
private static final String PROTO_SCHEMA_WITH_NESTED_MSG_TYPES = "syntax = \"proto3\";\n" +
"\n" +
"package testnested;\n" +
"\n" +
"message Outer {\n" +
" message MiddleAA {\n" +
" message Inner {\n" +
" int64 ival = 1;\n" +
" bool booly = 2;\n" +
" }\n" +
" Inner inner = 1;\n" +
" }\n" +
" message MiddleBB {\n" +
" message Inner {\n" +
" int32 ival = 1;\n" +
" bool booly = 2;\n" +
" }\n" +
" Inner inner = 1;\n" +
" }\n" +
" MiddleAA middleAA = 1;\n" +
" MiddleBB middleBB = 2;\n" +
"}\n";
private static final String PROTO_SCHEMA_WITH_ONE_OFS = "syntax = \"proto3\";\n" +
"\n" +
"package testoneofs;\n" +
"\n" +
"message SubMessage {\n" +
" repeated string name = 1;\n" +
"}\n" +
"\n" +
"message SampleMessage {\n" +
" oneof testOneOf {\n" +
" string name = 4;\n" +
" SubMessage subMessage = 9;\n" +
" }\n" +
"}";
private static final String IVALID_PROTO_SCHEMA_REQUIRED_FIELD_EXISTS = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message SchemaValidationTest {\n" +
" required int32 parameter = 1;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_NOT_VALID_SYNTAX = "syntax = \"proto2\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message SchemaValidationTest {\n" +
" required int32 parameter = 1;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_OPTIONS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"option java_package = \"com.test.schemavalidation\";\n" +
"option java_multiple_files = true;\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message SchemaValidationTest {\n" +
" int32 parameter = 1;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_PUBLIC_IMPORTS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"import public \"oldschema.proto\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message SchemaValidationTest {\n" +
" int32 parameter = 1;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_IMPORTS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"import \"oldschema.proto\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message SchemaValidationTest {\n" +
" int32 parameter = 1;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_EXTEND_DECLARATION_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"extend google.protobuf.MethodOptions {\n" +
" MyMessage my_method_option = 50007;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_ENUM_OPTIONS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"enum testEnum {\n" +
" option allow_alias = true;\n" +
" DEFAULT = 0;\n" +
" STARTED = 1;\n" +
" RUNNING = 2;\n" +
"}\n" +
"\n" +
"message testMessage {\n" +
" int32 parameter = 1;\n" +
"}\n";
private static final String INVALID_PROTO_SCHEMA_NO_MESSAGE_TYPES_EXISTS = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"enum testEnum {\n" +
" DEFAULT = 0;\n" +
" STARTED = 1;\n" +
" RUNNING = 2;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_MESSAGE_OPTIONS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message testMessage {\n" +
" option allow_alias = true;\n" +
" int32 parameter = 1;\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_MESSAGE_EXTENSIONS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message TestMessage {\n" +
" extensions 100 to 199;\n" +
"}\n";
private static final String INVALID_PROTO_SCHEMA_MESSAGE_GROUPS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message TestMessage {\n" +
" repeated group Result = 1 {\n" +
" string url = 2;\n" +
" string title = 3;\n" +
" repeated string snippets = 4;\n" +
" }\n" +
"}\n";
private static final String INVALID_PROTO_SCHEMA_MESSAGE_RESERVED_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message Foo {\n" +
" reserved 2, 15, 9 to 11;\n" +
" reserved \"foo\", \"bar\";\n" +
"}";
private static final String INVALID_PROTO_SCHEMA_ONE_OFS_GROUPS_NOT_SUPPORTED = "syntax = \"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message SampleMessage {\n" +
" oneof test_oneof {\n" +
" string name = 1;\n" +
" group Result = 2 {\n" +
" \tstring url = 3;\n" +
" \tstring title = 4;\n" +
" \trepeated string snippets = 5;\n" +
" }\n" +
" }\n" +
"}";
private static final MqttProtoDeviceProfileTransportConfiguration mqttProtoDeviceProfileTransportConfiguration = new MqttProtoDeviceProfileTransportConfiguration();
private static void validateTransportProtoSchema(String schema, String schemaName) {
mqttProtoDeviceProfileTransportConfiguration.validateTransportProtoSchema(schema, schemaName);
}
private static DynamicSchema getDynamicSchema(String schema, String schemaName) {
ProtoFileElement protoFileElement = getTransportProtoSchema(schema);
return mqttProtoDeviceProfileTransportConfiguration.getDynamicSchema(protoFileElement, schemaName);
}
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
@Test
public void testDynamicSchemaProtoFileValidation() {
processValidation("Failed to parse: testParseToProtoFile schema due to: Syntax error in :6:4: 'required' label forbidden in proto3 field declarations", IVALID_PROTO_SCHEMA_REQUIRED_FIELD_EXISTS, "testParseToProtoFile");
}
@Test
public void testDynamicSchemaSyntaxValidation() {
processValidation("Invalid schema syntax: proto2 for: testSyntaxValidation provided! Only proto3 allowed!", INVALID_PROTO_SCHEMA_NOT_VALID_SYNTAX, "testSyntaxValidation");
}
@Test
public void testDynamicSchemaOptionsValidation() {
processValidation("Invalid testOptionsValidation schema provided! Schema options don't support!", INVALID_PROTO_SCHEMA_OPTIONS_NOT_SUPPORTED, "testOptionsValidation");
}
@Test
public void testDynamicSchemaPublicImportsValidation() {
processValidation("Invalid testPublicImportsValidation schema provided! Schema public imports don't support!", INVALID_PROTO_SCHEMA_PUBLIC_IMPORTS_NOT_SUPPORTED, "testPublicImportsValidation");
}
@Test
public void testDynamicSchemaImportsValidation() {
processValidation("Invalid testImportsValidation schema provided! Schema imports don't support!", INVALID_PROTO_SCHEMA_IMPORTS_NOT_SUPPORTED, "testImportsValidation");
}
@Test
public void testDynamicSchemaExtendDeclarationsValidation() {
processValidation("Invalid testExtendDeclarationsValidation schema provided! Schema extend declarations don't support!", INVALID_PROTO_SCHEMA_EXTEND_DECLARATION_NOT_SUPPORTED, "testExtendDeclarationsValidation");
}
@Test
public void testDynamicSchemaEnumOptionsValidation() {
processValidation("Invalid testEnumOptionsValidation schema provided! Enum definitions options are not supported!", INVALID_PROTO_SCHEMA_ENUM_OPTIONS_NOT_SUPPORTED, "testEnumOptionsValidation");
}
@Test
public void testDynamicSchemaNoOneMessageTypeExistsValidation() {
processValidation("Invalid noOneMessageTypeExists schema provided! At least one Message definition should exists!", INVALID_PROTO_SCHEMA_NO_MESSAGE_TYPES_EXISTS, "noOneMessageTypeExists");
}
@Test
public void testDynamicSchemaMessageTypeOptionsValidation() {
processValidation("Invalid messageTypeOptions schema provided! Message definition options don't support!", INVALID_PROTO_SCHEMA_MESSAGE_OPTIONS_NOT_SUPPORTED, "messageTypeOptions");
}
@Test
public void testDynamicSchemaMessageTypeExtensionsValidation() {
processValidation("Invalid messageTypeExtensions schema provided! Message definition extensions don't support!", INVALID_PROTO_SCHEMA_MESSAGE_EXTENSIONS_NOT_SUPPORTED, "messageTypeExtensions");
}
@Test
public void testDynamicSchemaMessageTypeReservedElementsValidation() {
processValidation("Invalid messageTypeReservedElements schema provided! Message definition reserved elements don't support!", INVALID_PROTO_SCHEMA_MESSAGE_RESERVED_NOT_SUPPORTED, "messageTypeReservedElements");
}
@Test
public void testDynamicSchemaMessageTypeGroupsElementsValidation() {
processValidation("Invalid messageTypeGroupsElements schema provided! Message definition groups don't support!", INVALID_PROTO_SCHEMA_MESSAGE_GROUPS_NOT_SUPPORTED, "messageTypeGroupsElements");
}
@Test
public void testDynamicSchemaOneOfsTypeGroupsElementsValidation() {
processValidation("Invalid oneOfsTypeGroupsElements schema provided! OneOf definition groups don't support!", INVALID_PROTO_SCHEMA_ONE_OFS_GROUPS_NOT_SUPPORTED, "oneOfsTypeGroupsElements");
}
private void processValidation(String expectedMessage, String schema, String schemaName) {
exceptionRule.expect(IllegalArgumentException.class);
exceptionRule.expectMessage(expectedMessage);
validateTransportProtoSchema(schema, schemaName);
}
@Test
public void testDynamicSchemaCreationWithMessageNestedTypes() throws Exception {
String testNestedTypesProtoSchema = "testNestedTypesProtoSchema";
validateTransportProtoSchema(PROTO_SCHEMA_WITH_NESTED_MSG_TYPES, testNestedTypesProtoSchema);
DynamicSchema dynamicSchema = getDynamicSchema(PROTO_SCHEMA_WITH_NESTED_MSG_TYPES, testNestedTypesProtoSchema);
assertNotNull(dynamicSchema);
Set<String> messageTypes = dynamicSchema.getMessageTypes();
assertEquals(5, messageTypes.size());
assertTrue(messageTypes.contains("testnested.Outer"));
assertTrue(messageTypes.contains("testnested.Outer.MiddleAA"));
assertTrue(messageTypes.contains("testnested.Outer.MiddleAA.Inner"));
assertTrue(messageTypes.contains("testnested.Outer.MiddleBB"));
assertTrue(messageTypes.contains("testnested.Outer.MiddleBB.Inner"));
DynamicMessage.Builder middleAAInnerMsgBuilder = dynamicSchema.newMessageBuilder("testnested.Outer.MiddleAA.Inner");
Descriptors.Descriptor middleAAInnerMsgDescriptor = middleAAInnerMsgBuilder.getDescriptorForType();
DynamicMessage middleAAInnerMsg = middleAAInnerMsgBuilder
.setField(middleAAInnerMsgDescriptor.findFieldByName("ival"), 1L)
.setField(middleAAInnerMsgDescriptor.findFieldByName("booly"), true)
.build();
DynamicMessage.Builder middleAAMsgBuilder = dynamicSchema.newMessageBuilder("testnested.Outer.MiddleAA");
Descriptors.Descriptor middleAAMsgDescriptor = middleAAMsgBuilder.getDescriptorForType();
DynamicMessage middleAAMsg = middleAAMsgBuilder
.setField(middleAAMsgDescriptor.findFieldByName("inner"), middleAAInnerMsg)
.build();
DynamicMessage.Builder middleBBInnerMsgBuilder = dynamicSchema.newMessageBuilder("testnested.Outer.MiddleAA.Inner");
Descriptors.Descriptor middleBBInnerMsgDescriptor = middleBBInnerMsgBuilder.getDescriptorForType();
DynamicMessage middleBBInnerMsg = middleBBInnerMsgBuilder
.setField(middleBBInnerMsgDescriptor.findFieldByName("ival"), 0L)
.setField(middleBBInnerMsgDescriptor.findFieldByName("booly"), false)
.build();
DynamicMessage.Builder middleBBMsgBuilder = dynamicSchema.newMessageBuilder("testnested.Outer.MiddleBB");
Descriptors.Descriptor middleBBMsgDescriptor = middleBBMsgBuilder.getDescriptorForType();
DynamicMessage middleBBMsg = middleBBMsgBuilder
.setField(middleBBMsgDescriptor.findFieldByName("inner"), middleBBInnerMsg)
.build();
DynamicMessage.Builder outerMsgBuilder = dynamicSchema.newMessageBuilder("testnested.Outer");
Descriptors.Descriptor outerMsgBuilderDescriptor = outerMsgBuilder.getDescriptorForType();
DynamicMessage outerMsg = outerMsgBuilder
.setField(outerMsgBuilderDescriptor.findFieldByName("middleAA"), middleAAMsg)
.setField(outerMsgBuilderDescriptor.findFieldByName("middleBB"), middleBBMsg)
.build();
assertEquals("{\n" +
" \"middleAA\": {\n" +
" \"inner\": {\n" +
" \"ival\": \"1\",\n" +
" \"booly\": true\n" +
" }\n" +
" },\n" +
" \"middleBB\": {\n" +
" \"inner\": {\n" +
" \"ival\": 0,\n" +
" \"booly\": false\n" +
" }\n" +
" }\n" +
"}", dynamicMsgToJson(outerMsgBuilderDescriptor, outerMsg.toByteArray()));
}
@Test
public void testDynamicSchemaCreationWithMessageOneOfs() throws Exception {
String testOneOfsProtoSchema = "testOneOfsProtoSchema";
validateTransportProtoSchema(PROTO_SCHEMA_WITH_ONE_OFS, testOneOfsProtoSchema);
DynamicSchema dynamicSchema = getDynamicSchema(PROTO_SCHEMA_WITH_ONE_OFS, testOneOfsProtoSchema);
assertNotNull(dynamicSchema);
Set<String> messageTypes = dynamicSchema.getMessageTypes();
assertEquals(2, messageTypes.size());
assertTrue(messageTypes.contains("testoneofs.SubMessage"));
assertTrue(messageTypes.contains("testoneofs.SampleMessage"));
DynamicMessage.Builder sampleMsgBuilder = dynamicSchema.newMessageBuilder("testoneofs.SampleMessage");
Descriptors.Descriptor sampleMsgDescriptor = sampleMsgBuilder.getDescriptorForType();
assertNotNull(sampleMsgDescriptor);
List<Descriptors.FieldDescriptor> fields = sampleMsgDescriptor.getFields();
assertEquals(2, fields.size());
DynamicMessage sampleMsg = sampleMsgBuilder
.setField(sampleMsgDescriptor.findFieldByName("name"), "Bob")
.build();
assertEquals("{\n" + " \"name\": \"Bob\"\n" + "}", dynamicMsgToJson(sampleMsgDescriptor, sampleMsg.toByteArray()));
DynamicMessage.Builder subMsgBuilder = dynamicSchema.newMessageBuilder("testoneofs.SubMessage");
Descriptors.Descriptor subMsgDescriptor = subMsgBuilder.getDescriptorForType();
DynamicMessage subMsg = subMsgBuilder
.addRepeatedField(subMsgDescriptor.findFieldByName("name"), "Alice")
.addRepeatedField(subMsgDescriptor.findFieldByName("name"), "John")
.build();
DynamicMessage sampleMsgWithOneOfSubMessage = sampleMsgBuilder.setField(sampleMsgDescriptor.findFieldByName("subMessage"), subMsg).build();
assertEquals("{\n" + " \"subMessage\": {\n" + " \"name\": [\"Alice\", \"John\"]\n" + " }\n" + "}",
dynamicMsgToJson(sampleMsgDescriptor, sampleMsgWithOneOfSubMessage.toByteArray()));
}
private String dynamicMsgToJson(Descriptors.Descriptor descriptor, byte[] payload) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, payload);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
private static ProtoFileElement getTransportProtoSchema(String protoSchema) {
return new ProtoParser(LOCATION, protoSchema.toCharArray()).readProtoFile();
}
}

4
application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java

@ -46,25 +46,21 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
}
@Test
@Ignore
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
}
@Test
@Ignore
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
}
@Test
@Ignore
public void testGatewayServerMqttOneWayRpc() throws Exception {
processOneWayRpcTestGateway("Gateway Device OneWay RPC Proto");
}
@Test
@Ignore
public void testGatewayServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTestGateway("Gateway Device TwoWay RPC Proto");
}

244
common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttProtoDeviceProfileTransportConfiguration.java

@ -22,12 +22,13 @@ import com.github.os72.protobuf.dynamic.EnumDefinition;
import com.github.os72.protobuf.dynamic.MessageDefinition;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.Field;
import com.squareup.wire.Syntax;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.internal.parser.EnumConstantElement;
import com.squareup.wire.schema.internal.parser.EnumElement;
import com.squareup.wire.schema.internal.parser.FieldElement;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.OneOfElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.ProtoParser;
import com.squareup.wire.schema.internal.parser.TypeElement;
@ -37,6 +38,8 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -50,6 +53,10 @@ public class MqttProtoDeviceProfileTransportConfiguration extends MqttDeviceProf
public static final String ATTRIBUTES_PROTO_SCHEMA = "attributes proto schema";
public static final String TELEMETRY_PROTO_SCHEMA = "telemetry proto schema";
public static String invalidSchemaProvidedMessage(String schemaName) {
return "Invalid " + schemaName + " schema provided!";
}
private String deviceTelemetryProtoSchema;
private String deviceAttributesProtoSchema;
@ -69,85 +76,212 @@ public class MqttProtoDeviceProfileTransportConfiguration extends MqttDeviceProf
try {
protoFileElement = schemaParser.readProtoFile();
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse: " + schemaName + " due to: " + e.getMessage());
}
List<TypeElement> types = protoFileElement.getTypes();
if (!types.isEmpty()) {
if (types.stream().noneMatch(typeElement -> typeElement instanceof MessageElement)) {
throw new IllegalArgumentException("Invalid " + schemaName + " provided! At least one Message definition should exists!");
}
} else {
throw new IllegalArgumentException("Invalid " + schemaName + " provided!");
throw new IllegalArgumentException("Failed to parse: " + schemaName + " schema due to: " + e.getMessage());
}
checkProtoFileSyntax(schemaName, protoFileElement);
checkProtoFileCommonSettings(schemaName, protoFileElement.getOptions().isEmpty(), " Schema options don't support!");
checkProtoFileCommonSettings(schemaName, protoFileElement.getPublicImports().isEmpty(), " Schema public imports don't support!");
checkProtoFileCommonSettings(schemaName, protoFileElement.getImports().isEmpty(), " Schema imports don't support!");
checkProtoFileCommonSettings(schemaName, protoFileElement.getExtendDeclarations().isEmpty(), " Schema extend declarations don't support!");
checkTypeElements(schemaName, protoFileElement);
}
public Descriptors.Descriptor getDynamicMessageDescriptor(String protoSchema, String schemaName) {
ProtoFileElement protoFileElement = getTransportProtoSchema(protoSchema);
try {
ProtoFileElement protoFileElement = getTransportProtoSchema(protoSchema);
DynamicSchema dynamicSchema = getDynamicSchema(protoFileElement, schemaName);
String lastMsgName = getMessageTypes(protoFileElement.getTypes()).stream()
.map(MessageElement::getName).reduce((previous, last) -> last).get();
DynamicMessage.Builder builder = dynamicSchema.newMessageBuilder(lastMsgName);
return builder.getDescriptorForType();
} catch (Exception e) {
log.warn("Failed to get Message Descriptor due to {}", e.getMessage());
return null;
}
}
public DynamicSchema getDynamicSchema(ProtoFileElement protoFileElement, String schemaName) {
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
schemaBuilder.setName(schemaName);
schemaBuilder.setPackage(!isEmptyStr(protoFileElement.getPackageName()) ?
protoFileElement.getPackageName() : schemaName.toLowerCase());
List<TypeElement> types = protoFileElement.getTypes();
List<MessageElement> messageTypes = getMessageTypes(types);
if (!messageTypes.isEmpty()) {
List<EnumElement> enumTypes = getEnumElements(types);
if (!enumTypes.isEmpty()) {
enumTypes.forEach(enumElement -> {
EnumDefinition enumDefinition = getEnumDefinition(enumElement);
schemaBuilder.addEnumDefinition(enumDefinition);
});
}
List<MessageDefinition> messageDefinitions = getMessageDefinitions(messageTypes);
messageDefinitions.forEach(schemaBuilder::addMessageDefinition);
try {
return schemaBuilder.build();
} catch (Descriptors.DescriptorValidationException e) {
throw new RuntimeException("Failed to create dynamic schema due to: " + e.getMessage());
}
} else {
throw new RuntimeException("Failed to get Dynamic Schema! Message types is empty for schema:" + schemaName);
}
}
private void checkProtoFileSyntax(String schemaName, ProtoFileElement protoFileElement) {
if (protoFileElement.getSyntax() == null || !protoFileElement.getSyntax().equals(Syntax.PROTO_3)) {
throw new IllegalArgumentException("Invalid schema syntax: " + protoFileElement.getSyntax() +
" for: " + schemaName + " provided! Only " + Syntax.PROTO_3 + " allowed!");
}
}
private void checkProtoFileCommonSettings(String schemaName, boolean isEmptySettings, String invalidSettingsMessage) {
if (!isEmptySettings) {
throw new IllegalArgumentException(invalidSchemaProvidedMessage(schemaName) + invalidSettingsMessage);
}
}
private void checkTypeElements(String schemaName, ProtoFileElement protoFileElement) {
List<TypeElement> types = protoFileElement.getTypes();
if (!types.isEmpty()) {
if (types.stream().noneMatch(typeElement -> typeElement instanceof MessageElement)) {
throw new IllegalArgumentException(invalidSchemaProvidedMessage(schemaName) + " At least one Message definition should exists!");
} else {
checkEnumElements(schemaName, getEnumElements(types));
checkMessageElements(schemaName, getMessageTypes(types));
}
} else {
throw new IllegalArgumentException(invalidSchemaProvidedMessage(schemaName) + " Type elements is empty!");
}
}
List<EnumElement> enumTypes = types.stream()
.filter(typeElement -> typeElement instanceof EnumElement)
.map(typeElement -> (EnumElement) typeElement)
.collect(Collectors.toList());
private void checkEnumElements(String schemaName, List<EnumElement> enumTypes) {
if (enumTypes.stream().anyMatch(enumElement -> !enumElement.getNestedTypes().isEmpty())) {
throw new IllegalArgumentException(invalidSchemaProvidedMessage(schemaName) + " Nested types in Enum definitions are not supported!");
}
if (enumTypes.stream().anyMatch(enumElement -> !enumElement.getOptions().isEmpty())) {
throw new IllegalArgumentException(invalidSchemaProvidedMessage(schemaName) + " Enum definitions options are not supported!");
}
}
private void checkMessageElements(String schemaName, List<MessageElement> messageElementsList) {
if (!messageElementsList.isEmpty()) {
messageElementsList.forEach(messageElement -> {
checkProtoFileCommonSettings(schemaName, messageElement.getGroups().isEmpty(),
" Message definition groups don't support!");
checkProtoFileCommonSettings(schemaName, messageElement.getOptions().isEmpty(),
" Message definition options don't support!");
checkProtoFileCommonSettings(schemaName, messageElement.getExtensions().isEmpty(),
" Message definition extensions don't support!");
checkProtoFileCommonSettings(schemaName, messageElement.getReserveds().isEmpty(),
" Message definition reserved elements don't support!");
List<OneOfElement> oneOfs = messageElement.getOneOfs();
if (!oneOfs.isEmpty()) {
oneOfs.forEach(oneOfElement ->
checkProtoFileCommonSettings(schemaName, oneOfElement.getGroups().isEmpty(),
" OneOf definition groups don't support!"));
}
List<TypeElement> nestedTypes = messageElement.getNestedTypes();
if (!nestedTypes.isEmpty()) {
List<EnumElement> nestedEnumTypes = getEnumElements(nestedTypes);
if (!nestedEnumTypes.isEmpty()) {
checkEnumElements(schemaName, nestedEnumTypes);
}
List<MessageElement> nestedMessageTypes = getMessageTypes(nestedTypes);
checkMessageElements(schemaName, nestedMessageTypes);
}
});
}
}
private ProtoFileElement getTransportProtoSchema(String protoSchema) {
return new ProtoParser(LOCATION, protoSchema.toCharArray()).readProtoFile();
}
List<MessageElement> messageTypes = types.stream()
private List<MessageElement> getMessageTypes(List<TypeElement> types) {
return types.stream()
.filter(typeElement -> typeElement instanceof MessageElement)
.map(typeElement -> (MessageElement) typeElement)
.collect(Collectors.toList());
}
if (!enumTypes.isEmpty()) {
enumTypes.forEach(enumElement -> {
List<EnumConstantElement> enumElementTypeConstants = enumElement.getConstants();
EnumDefinition.Builder enumDefinitionBuilder = EnumDefinition.newBuilder(enumElement.getName());
if (!enumElementTypeConstants.isEmpty()) {
enumElementTypeConstants.forEach(constantElement -> enumDefinitionBuilder.addValue(constantElement.getName(), constantElement.getTag()));
}
EnumDefinition enumDefinition = enumDefinitionBuilder.build();
schemaBuilder.addEnumDefinition(enumDefinition);
});
}
private List<EnumElement> getEnumElements(List<TypeElement> types) {
return types.stream()
.filter(typeElement -> typeElement instanceof EnumElement)
.map(typeElement -> (EnumElement) typeElement)
.collect(Collectors.toList());
}
if (!messageTypes.isEmpty()) {
messageTypes.forEach(messageElement -> {
List<FieldElement> messageElementFields = messageElement.getFields();
private List<MessageDefinition> getMessageDefinitions(List<MessageElement> messageElementsList) {
if (!messageElementsList.isEmpty()) {
List<MessageDefinition> messageDefinitions = new ArrayList<>();
messageElementsList.forEach(messageElement -> {
MessageDefinition.Builder messageDefinitionBuilder = MessageDefinition.newBuilder(messageElement.getName());
List<FieldElement> messageElementFields = messageElement.getFields();
List<OneOfElement> oneOfs = messageElement.getOneOfs();
List<TypeElement> nestedTypes = messageElement.getNestedTypes();
if (!messageElementFields.isEmpty()) {
messageElementFields.forEach(fieldElement -> {
Field.Label label = fieldElement.getLabel();
String labelStr = label != null ? label.name() : null;
messageDefinitionBuilder.addField(
labelStr,
fieldElement.getType(),
fieldElement.getName(),
fieldElement.getTag(),
fieldElement.getDefaultValue());
});
addMessageFieldsToTheMessageDefinition(messageElementFields, messageDefinitionBuilder);
}
if (!oneOfs.isEmpty()) {
for (OneOfElement oneOfelement: oneOfs) {
MessageDefinition.OneofBuilder oneofBuilder = messageDefinitionBuilder.addOneof(oneOfelement.getName());
addMessageFieldsToTheOneOfDefinition(oneOfelement.getFields(), oneofBuilder);
}
}
MessageDefinition messageDefinition = messageDefinitionBuilder.build();
schemaBuilder.addMessageDefinition(messageDefinition);
if (!nestedTypes.isEmpty()) {
List<EnumElement> nestedEnumTypes = getEnumElements(nestedTypes);
if (!nestedEnumTypes.isEmpty()) {
nestedEnumTypes.forEach(enumElement -> {
EnumDefinition nestedEnumDefinition = getEnumDefinition(enumElement);
messageDefinitionBuilder.addEnumDefinition(nestedEnumDefinition);
});
}
List<MessageElement> nestedMessageTypes = getMessageTypes(nestedTypes);
List<MessageDefinition> nestedMessageDefinitions = getMessageDefinitions(nestedMessageTypes);
nestedMessageDefinitions.forEach(messageDefinitionBuilder::addMessageDefinition);
}
messageDefinitions.add(messageDefinitionBuilder.build());
});
MessageElement lastMsg = messageTypes.stream().reduce((previous, last) -> last).get();
try {
DynamicSchema dynamicSchema = schemaBuilder.build();
DynamicMessage.Builder builder = dynamicSchema.newMessageBuilder(lastMsg.getName());
return builder.getDescriptorForType();
} catch (Descriptors.DescriptorValidationException e) {
log.error("Failed to create dynamic schema due to: ", e);
return null;
}
return messageDefinitions;
} else {
log.error("Failed to get Message Descriptor! Message types is empty for {} schema!", schemaName);
return null;
return Collections.emptyList();
}
}
private ProtoFileElement getTransportProtoSchema(String protoSchema) {
return new ProtoParser(LOCATION, protoSchema.toCharArray()).readProtoFile();
private EnumDefinition getEnumDefinition(EnumElement enumElement) {
List<EnumConstantElement> enumElementTypeConstants = enumElement.getConstants();
EnumDefinition.Builder enumDefinitionBuilder = EnumDefinition.newBuilder(enumElement.getName());
if (!enumElementTypeConstants.isEmpty()) {
enumElementTypeConstants.forEach(constantElement -> enumDefinitionBuilder.addValue(constantElement.getName(), constantElement.getTag()));
}
return enumDefinitionBuilder.build();
}
private void addMessageFieldsToTheMessageDefinition(List<FieldElement> messageElementFields, MessageDefinition.Builder messageDefinitionBuilder) {
messageElementFields.forEach(fieldElement -> {
String labelStr = null;
if (fieldElement.getLabel() != null) {
labelStr = fieldElement.getLabel().name().toLowerCase();
}
messageDefinitionBuilder.addField(
labelStr,
fieldElement.getType(),
fieldElement.getName(),
fieldElement.getTag(),
fieldElement.getDefaultValue());
});
}
private void addMessageFieldsToTheOneOfDefinition(List<FieldElement> oneOfsElementFields, MessageDefinition.OneofBuilder oneofBuilder) {
oneOfsElementFields.forEach(fieldElement -> oneofBuilder.addField(
fieldElement.getType(),
fieldElement.getName(),
fieldElement.getTag(),
fieldElement.getDefaultValue()));
oneofBuilder.msgDefBuilder();
}
private boolean isEmptyStr(String str) {

2
pom.xml

@ -106,7 +106,7 @@
<commons-collections.version>3.2.2</commons-collections.version>
<java-websocket.version>1.5.0</java-websocket.version>
<micrometer.version>1.5.2</micrometer.version>
<protobuf-dynamic.version>1.0.1</protobuf-dynamic.version>
<protobuf-dynamic.version>1.0.2</protobuf-dynamic.version>
<wire-schema.version>3.4.0</wire-schema.version>
</properties>

Loading…
Cancel
Save