diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fc41989c82..4d98662be2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -71,7 +71,7 @@ import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; -import org.thingsboard.server.transport.mqtt.session.AbstractGatewaySessionHandler; +import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; @@ -131,7 +131,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement final DeviceSessionCtx deviceSessionCtx; volatile InetSocketAddress address; - volatile AbstractGatewaySessionHandler gatewaySessionHandler; + volatile GatewaySessionHandler gatewaySessionHandler; volatile SparkplugNodeSessionHandler sparkplugSessionHandler; private final ConcurrentHashMap otaPackSessions; @@ -967,7 +967,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (infoNode != null) { JsonNode gatewayNode = infoNode.get("gateway"); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionHandler = new AbstractGatewaySessionHandler(deviceSessionCtx, sessionId); + gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId); if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java index 09b169f567..695873f677 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java @@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentMap; * Created by ashvayka on 19.01.17. */ @Slf4j -public class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { +public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { private final AbstractGatewaySessionHandler parent; private final TransportService transportService; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 7819191638..fc7b205be5 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -76,7 +76,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe * Created by ashvayka on 19.01.17. */ @Slf4j -public class AbstractGatewaySessionHandler { +public abstract class AbstractGatewaySessionHandler { private static final String DEFAULT_DEVICE_TYPE = "default"; private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; @@ -87,8 +87,8 @@ public class AbstractGatewaySessionHandler { private final TransportDeviceInfo gateway; private final UUID sessionId; private final ConcurrentMap deviceCreationLockMap; - private final ConcurrentMap devices; - private final ConcurrentMap> deviceFutures; + private final ConcurrentMap devices; + private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; private final ChannelHandlerContext channel; private final DeviceSessionCtx deviceSessionCtx; @@ -110,14 +110,6 @@ public class AbstractGatewaySessionHandler { return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK); } - public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { - if (isJsonPayloadType()) { - onDeviceConnectJson(mqttMsg); - } else { - onDeviceConnectProto(mqttMsg); - } - } - public void onDeviceDisconnect(MqttPublishMessage mqttMsg) throws AdaptorException { if (isJsonPayloadType()) { onDeviceDisconnectJson(mqttMsg); @@ -126,16 +118,6 @@ public class AbstractGatewaySessionHandler { } } - public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { - int msgId = getMsgId(mqttMsg); - ByteBuf payload = mqttMsg.payload(); - if (isJsonPayloadType()) { - onDeviceTelemetryJson(msgId, payload); - } else { - onDeviceTelemetryProto(msgId, payload); - } - } - public void onDeviceClaim(MqttPublishMessage mqttMsg) throws AdaptorException { int msgId = getMsgId(mqttMsg); ByteBuf payload = mqttMsg.payload(); @@ -195,7 +177,7 @@ public class AbstractGatewaySessionHandler { } void deregisterSession(String deviceName) { - AbstractGatewayDeviceSessionContext deviceSessionCtx = devices.remove(deviceName); + MqttDeviceAwareSessionContext deviceSessionCtx = devices.remove(deviceName); if (deviceSessionCtx != null) { deregisterSession(deviceName, deviceSessionCtx); } else { @@ -211,15 +193,15 @@ public class AbstractGatewaySessionHandler { return deviceSessionCtx.nextMsgId(); } - private boolean isJsonPayloadType() { + protected boolean isJsonPayloadType() { return deviceSessionCtx.isJsonPayloadType(); } private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) { log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName); - Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback() { + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext result) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext result) { ack(msg); log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName); } @@ -232,8 +214,8 @@ public class AbstractGatewaySessionHandler { }, context.getExecutor()); } - private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { - AbstractGatewayDeviceSessionContext result = devices.get(deviceName); + private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { + MqttDeviceAwareSessionContext result = devices.get(deviceName); if (result == null) { Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); deviceCreationLock.lock(); @@ -252,9 +234,9 @@ public class AbstractGatewaySessionHandler { } } - private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { - final SettableFuture futureToSet = SettableFuture.create(); - ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); + private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { + final SettableFuture futureToSet = SettableFuture.create(); + ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); if (future != null) { return future; } @@ -263,11 +245,13 @@ public class AbstractGatewaySessionHandler { .setDeviceName(deviceName) .setDeviceType(deviceType) .setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits()) - .setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()).build(), - new TransportServiceCallback() { + .setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()) + .setSparkplug(this.deviceSessionCtx.isSparkplug()) + .build(), + new TransportServiceCallback<>() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - AbstractGatewayDeviceSessionContext deviceSessionCtx = new AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + GatewayDeviceSessionContext deviceSessionCtx = new GatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); @@ -297,18 +281,18 @@ public class AbstractGatewaySessionHandler { } } - private int getMsgId(MqttPublishMessage mqttMsg) { + protected int getMsgId(MqttPublishMessage mqttMsg) { return mqttMsg.variableHeader().packetId(); } - private void onDeviceConnectJson(MqttPublishMessage mqttMsg) throws AdaptorException { + protected void onDeviceConnectJson(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = getJson(mqttMsg); String deviceName = checkDeviceName(getDeviceName(json)); String deviceType = getDeviceType(json); processOnConnect(mqttMsg, deviceName, deviceType); } - private void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + protected void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { try { TransportApiProtos.ConnectMsg connectProto = TransportApiProtos.ConnectMsg.parseFrom(getBytes(mqttMsg.payload())); String deviceName = checkDeviceName(connectProto.getDeviceName()); @@ -339,7 +323,7 @@ public class AbstractGatewaySessionHandler { ack(msg); } - private void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException { + protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException { JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); if (json.isJsonObject()) { JsonObject jsonObj = json.getAsJsonObject(); @@ -348,7 +332,7 @@ public class AbstractGatewaySessionHandler { Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { if (!deviceEntry.getValue().isJsonArray()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } @@ -372,7 +356,7 @@ public class AbstractGatewaySessionHandler { } } - private void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { + protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { try { TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload)); List deviceMsgList = telemetryMsgProto.getMsgList(); @@ -380,9 +364,9 @@ public class AbstractGatewaySessionHandler { deviceMsgList.forEach(telemetryMsg -> { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { TransportProtos.PostTelemetryMsg msg = telemetryMsg.getMsg(); try { TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); @@ -408,7 +392,7 @@ public class AbstractGatewaySessionHandler { } } - private void processPostTelemetryMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) { + private void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); } @@ -419,9 +403,9 @@ public class AbstractGatewaySessionHandler { for (Map.Entry deviceEntry : jsonObj.entrySet()) { String deviceName = deviceEntry.getKey(); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { if (!deviceEntry.getValue().isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } @@ -453,9 +437,9 @@ public class AbstractGatewaySessionHandler { claimMsgList.forEach(claimDeviceMsg -> { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { TransportApiProtos.ClaimDevice claimRequest = claimDeviceMsg.getClaimRequest(); if (claimRequest == null) { throw new IllegalArgumentException("Claim request for device: " + deviceName + " is null!"); @@ -484,7 +468,7 @@ public class AbstractGatewaySessionHandler { } } - private void processClaimDeviceMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) { + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); } @@ -495,9 +479,9 @@ public class AbstractGatewaySessionHandler { for (Map.Entry deviceEntry : jsonObj.entrySet()) { String deviceName = deviceEntry.getKey(); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { if (!deviceEntry.getValue().isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } @@ -524,9 +508,9 @@ public class AbstractGatewaySessionHandler { attributesMsgList.forEach(attributesMsg -> { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); if (kvListProto == null) { throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!"); @@ -554,7 +538,7 @@ public class AbstractGatewaySessionHandler { } } - private void processPostAttributesMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { + private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); } @@ -603,9 +587,9 @@ public class AbstractGatewaySessionHandler { JsonObject jsonObj = json.getAsJsonObject(); String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { Integer requestId = jsonObj.get("id").getAsInt(); String data = jsonObj.get("data").toString(); TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() @@ -628,9 +612,9 @@ public class AbstractGatewaySessionHandler { TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload)); String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { Integer requestId = gatewayRpcResponseMsg.getId(); String data = gatewayRpcResponseMsg.getData(); TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() @@ -648,16 +632,16 @@ public class AbstractGatewaySessionHandler { } } - private void processRpcResponseMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) { + private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); } private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { int msgId = getMsgId(mqttMsg); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback<>() { @Override - public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); } @@ -681,8 +665,8 @@ public class AbstractGatewaySessionHandler { return result.build(); } - private ListenableFuture checkDeviceConnected(String deviceName) { - AbstractGatewayDeviceSessionContext ctx = devices.get(deviceName); + private ListenableFuture checkDeviceConnected(String deviceName) { + MqttDeviceAwareSessionContext ctx = devices.get(deviceName); if (ctx == null) { log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName); return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); @@ -723,7 +707,7 @@ public class AbstractGatewaySessionHandler { } } - private void deregisterSession(String deviceName, AbstractGatewayDeviceSessionContext deviceSessionCtx) { + private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) { transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java new file mode 100644 index 0000000000..4c3a20adda --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; + +import java.util.concurrent.ConcurrentMap; + +/** + * Created by nickAS21 on 26.12.22 + */ +public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext{ + + public GatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, + TransportDeviceInfo deviceInfo, + DeviceProfile deviceProfile, + ConcurrentMap mqttQoSMap, + TransportService transportService) { + super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); + } + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java new file mode 100644 index 0000000000..660f081ef2 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import org.thingsboard.server.common.transport.adaptor.AdaptorException; + +import java.util.UUID; + +/** + * Created by nickAS21 on 26.12.22 + */ +public class GatewaySessionHandler extends AbstractGatewaySessionHandler { + + public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { + super(deviceSessionCtx, sessionId); + } + + public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { + if (isJsonPayloadType()) { + onDeviceConnectJson(mqttMsg); + } else { + onDeviceConnectProto(mqttMsg); + } + } + + public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { + int msgId = getMsgId(mqttMsg); + ByteBuf payload = mqttMsg.payload(); + if (isJsonPayloadType()) { + onDeviceTelemetryJson(msgId, payload); + } else { + onDeviceTelemetryProto(msgId, payload); + } + } + +}