Browse Source

Changed type for isDeviceProfileMqttTransportType to avoid extra check for every message and added constant metadata object to avoid new object creation for every message

pull/9082/head
imbeacon 3 years ago
parent
commit
ababd11a5d
  1. 39
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  2. 9
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java

39
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -137,6 +137,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
private final UUID sessionId;
private final TbMsgMetaData msgMetaData = new TbMsgMetaData();
protected final MqttTransportContext context;
private final TransportService transportService;
private final SchedulerComponent scheduler;
@ -441,16 +443,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
Matcher fwMatcher;
MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
TbMsgMetaData md = null;
if (deviceSessionCtx.isMqttTransportType()) {
md = createMetadataWithTopic(topicName);
}
if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
@ -471,22 +471,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
@ -530,10 +536,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private static TbMsgMetaData createMetadataWithTopic(String topicName) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue(DataConstants.MQTT_TOPIC, topicName);
return md;
private TbMsgMetaData getMetadata(DeviceSessionCtx ctx, String topicName) {
if (ctx.isDeviceProfileMqttTransportType()) {
msgMetaData.putValue(DataConstants.MQTT_TOPIC, topicName);
}
return msgMetaData;
}
private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) {

9
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java

@ -83,7 +83,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
private volatile DeviceTransportType transportType = DeviceTransportType.DEFAULT;
private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
private volatile Descriptors.Descriptor rpcResponseDynamicMessageDescriptor;
@ -93,10 +92,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private volatile boolean useJsonPayloadFormatForDefaultDownlinkTopics;
private volatile boolean sendAckOnValidationException;
@Getter
private volatile boolean deviceProfileMqttTransportType;
@Getter
@Setter
private TransportPayloadType provisionPayloadType = payloadType;
public DeviceSessionCtx(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, MqttTransportContext context) {
super(sessionId, mqttQoSMap);
this.context = context;
@ -127,8 +130,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
return payloadType.equals(TransportPayloadType.JSON);
}
public boolean isMqttTransportType() { return DeviceTransportType.MQTT.equals(transportType); }
public boolean isSendAckOnValidationException() {
return sendAckOnValidationException;
}
@ -168,7 +169,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
MqttDeviceProfileTransportConfiguration mqttConfig = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttConfig.getTransportPayloadTypeConfiguration();
payloadType = transportPayloadTypeConfiguration.getTransportPayloadType();
transportType = DeviceTransportType.MQTT;
deviceProfileMqttTransportType = true;
telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic());
attributesPublishTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
attributesSubscribeTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesSubscribeTopic());

Loading…
Cancel
Save