Browse Source

Moved sending topic to MQTT transport only, parameter renamed to mqttTopic in metadata

pull/9082/head
imbeacon 3 years ago
parent
commit
fac2e013e2
  1. 2
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  2. 8
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  3. 4
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java

2
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -124,7 +124,7 @@ public class DataConstants {
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";
public static final String TOPIC = "topic";
public static final String MQTT_TOPIC = "mqttTopic";
public static final String MAIN_QUEUE_NAME = "Main";
public static final String MAIN_QUEUE_TOPIC = "tb_rule_engine.main";

8
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -30,7 +30,6 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
@ -442,7 +441,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
Matcher fwMatcher;
MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
TbMsgMetaData md = createMetadataWithTopic(topicName);
TbMsgMetaData md = null;
if (deviceSessionCtx.isMqttTransportType()) {
md = createMetadataWithTopic(topicName);
}
if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
@ -530,7 +532,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private static TbMsgMetaData createMetadataWithTopic(String topicName) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue(DataConstants.TOPIC, topicName);
md.putValue(DataConstants.MQTT_TOPIC, topicName);
return md;
}

4
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java

@ -83,6 +83,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
private volatile DeviceTransportType transportType = DeviceTransportType.DEFAULT;
private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
private volatile Descriptors.Descriptor rpcResponseDynamicMessageDescriptor;
@ -126,6 +127,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
return payloadType.equals(TransportPayloadType.JSON);
}
public boolean isMqttTransportType() { return DeviceTransportType.MQTT.equals(transportType); }
public boolean isSendAckOnValidationException() {
return sendAckOnValidationException;
}
@ -165,6 +168,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
MqttDeviceProfileTransportConfiguration mqttConfig = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttConfig.getTransportPayloadTypeConfiguration();
payloadType = transportPayloadTypeConfiguration.getTransportPayloadType();
transportType = DeviceTransportType.MQTT;
telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic());
attributesPublishTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
attributesSubscribeTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesSubscribeTopic());

Loading…
Cancel
Save