diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 6da294443b..dfcee39dc3 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -137,6 +137,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; private final UUID sessionId; + private final TbMsgMetaData msgMetaData = new TbMsgMetaData(); + protected final MqttTransportContext context; private final TransportService transportService; private final SchedulerComponent scheduler; @@ -441,16 +443,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { Matcher fwMatcher; MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor(); - TbMsgMetaData md = null; - if (deviceSessionCtx.isMqttTransportType()) { - md = createMetadataWithTopic(topicName); - } if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); @@ -471,22 +471,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE); } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); - transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg)); + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName), + getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); @@ -530,10 +536,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private static TbMsgMetaData createMetadataWithTopic(String topicName) { - TbMsgMetaData md = new TbMsgMetaData(); - md.putValue(DataConstants.MQTT_TOPIC, topicName); - return md; + private TbMsgMetaData getMetadata(DeviceSessionCtx ctx, String topicName) { + if (ctx.isDeviceProfileMqttTransportType()) { + msgMetaData.putValue(DataConstants.MQTT_TOPIC, topicName); + } + return msgMetaData; } private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 243ab44d5a..9f60b0a9ba 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -83,7 +83,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile TransportPayloadType payloadType = TransportPayloadType.JSON; - private volatile DeviceTransportType transportType = DeviceTransportType.DEFAULT; private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor; private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor; private volatile Descriptors.Descriptor rpcResponseDynamicMessageDescriptor; @@ -93,10 +92,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile boolean useJsonPayloadFormatForDefaultDownlinkTopics; private volatile boolean sendAckOnValidationException; + @Getter + private volatile boolean deviceProfileMqttTransportType; + @Getter @Setter private TransportPayloadType provisionPayloadType = payloadType; + public DeviceSessionCtx(UUID sessionId, ConcurrentMap mqttQoSMap, MqttTransportContext context) { super(sessionId, mqttQoSMap); this.context = context; @@ -127,8 +130,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { return payloadType.equals(TransportPayloadType.JSON); } - public boolean isMqttTransportType() { return DeviceTransportType.MQTT.equals(transportType); } - public boolean isSendAckOnValidationException() { return sendAckOnValidationException; } @@ -168,7 +169,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { MqttDeviceProfileTransportConfiguration mqttConfig = (MqttDeviceProfileTransportConfiguration) transportConfiguration; TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttConfig.getTransportPayloadTypeConfiguration(); payloadType = transportPayloadTypeConfiguration.getTransportPayloadType(); - transportType = DeviceTransportType.MQTT; + deviceProfileMqttTransportType = true; telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic()); attributesPublishTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic()); attributesSubscribeTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesSubscribeTopic());