diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 16193bcf8f..31c768bd76 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -124,7 +124,7 @@ public class DataConstants { public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway"; - public static final String TOPIC = "topic"; + public static final String MQTT_TOPIC = "mqttTopic"; public static final String MAIN_QUEUE_NAME = "Main"; public static final String MAIN_QUEUE_TOPIC = "tb_rule_engine.main"; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 56f1547eda..6da294443b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -30,7 +30,6 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; @@ -442,7 +441,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { Matcher fwMatcher; MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor(); - TbMsgMetaData md = createMetadataWithTopic(topicName); + TbMsgMetaData md = null; + if (deviceSessionCtx.isMqttTransportType()) { + md = createMetadataWithTopic(topicName); + } if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg)); @@ -530,7 +532,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static TbMsgMetaData createMetadataWithTopic(String topicName) { TbMsgMetaData md = new TbMsgMetaData(); - md.putValue(DataConstants.TOPIC, topicName); + md.putValue(DataConstants.MQTT_TOPIC, topicName); return md; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 3816556732..243ab44d5a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -83,6 +83,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile TransportPayloadType payloadType = TransportPayloadType.JSON; + private volatile DeviceTransportType transportType = DeviceTransportType.DEFAULT; private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor; private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor; private volatile Descriptors.Descriptor rpcResponseDynamicMessageDescriptor; @@ -126,6 +127,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { return payloadType.equals(TransportPayloadType.JSON); } + public boolean isMqttTransportType() { return DeviceTransportType.MQTT.equals(transportType); } + public boolean isSendAckOnValidationException() { return sendAckOnValidationException; } @@ -165,6 +168,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { MqttDeviceProfileTransportConfiguration mqttConfig = (MqttDeviceProfileTransportConfiguration) transportConfiguration; TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttConfig.getTransportPayloadTypeConfiguration(); payloadType = transportPayloadTypeConfiguration.getTransportPayloadType(); + transportType = DeviceTransportType.MQTT; telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic()); attributesPublishTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic()); attributesSubscribeTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesSubscribeTopic());