Browse Source

sparkplug: add Gateway as extends abstract

pull/7850/head
nickAS21 3 years ago
parent
commit
e4d2d833bd
  1. 6
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  2. 2
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java
  3. 108
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java
  4. 37
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java
  5. 51
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java

6
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -71,7 +71,7 @@ import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.AbstractGatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
@ -131,7 +131,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
final DeviceSessionCtx deviceSessionCtx;
volatile InetSocketAddress address;
volatile AbstractGatewaySessionHandler gatewaySessionHandler;
volatile GatewaySessionHandler gatewaySessionHandler;
volatile SparkplugNodeSessionHandler sparkplugSessionHandler;
private final ConcurrentHashMap<String, String> otaPackSessions;
@ -967,7 +967,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (infoNode != null) {
JsonNode gatewayNode = infoNode.get("gateway");
if (gatewayNode != null && gatewayNode.asBoolean()) {
gatewaySessionHandler = new AbstractGatewaySessionHandler(deviceSessionCtx, sessionId);
gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId);
if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
}

2
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java

@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentMap;
* Created by ashvayka on 19.01.17.
*/
@Slf4j
public class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener {
public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener {
private final AbstractGatewaySessionHandler parent;
private final TransportService transportService;

108
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java

@ -76,7 +76,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
* Created by ashvayka on 19.01.17.
*/
@Slf4j
public class AbstractGatewaySessionHandler {
public abstract class AbstractGatewaySessionHandler {
private static final String DEFAULT_DEVICE_TYPE = "default";
private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
@ -87,8 +87,8 @@ public class AbstractGatewaySessionHandler {
private final TransportDeviceInfo gateway;
private final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, AbstractGatewayDeviceSessionContext> devices;
private final ConcurrentMap<String, ListenableFuture<AbstractGatewayDeviceSessionContext>> deviceFutures;
private final ConcurrentMap<String, MqttDeviceAwareSessionContext> devices;
private final ConcurrentMap<String, ListenableFuture<MqttDeviceAwareSessionContext>> deviceFutures;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final ChannelHandlerContext channel;
private final DeviceSessionCtx deviceSessionCtx;
@ -110,14 +110,6 @@ public class AbstractGatewaySessionHandler {
return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK);
}
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
if (isJsonPayloadType()) {
onDeviceConnectJson(mqttMsg);
} else {
onDeviceConnectProto(mqttMsg);
}
}
public void onDeviceDisconnect(MqttPublishMessage mqttMsg) throws AdaptorException {
if (isJsonPayloadType()) {
onDeviceDisconnectJson(mqttMsg);
@ -126,16 +118,6 @@ public class AbstractGatewaySessionHandler {
}
}
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
int msgId = getMsgId(mqttMsg);
ByteBuf payload = mqttMsg.payload();
if (isJsonPayloadType()) {
onDeviceTelemetryJson(msgId, payload);
} else {
onDeviceTelemetryProto(msgId, payload);
}
}
public void onDeviceClaim(MqttPublishMessage mqttMsg) throws AdaptorException {
int msgId = getMsgId(mqttMsg);
ByteBuf payload = mqttMsg.payload();
@ -195,7 +177,7 @@ public class AbstractGatewaySessionHandler {
}
void deregisterSession(String deviceName) {
AbstractGatewayDeviceSessionContext deviceSessionCtx = devices.remove(deviceName);
MqttDeviceAwareSessionContext deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
deregisterSession(deviceName, deviceSessionCtx);
} else {
@ -211,15 +193,15 @@ public class AbstractGatewaySessionHandler {
return deviceSessionCtx.nextMsgId();
}
private boolean isJsonPayloadType() {
protected boolean isJsonPayloadType() {
return deviceSessionCtx.isJsonPayloadType();
}
private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<AbstractGatewayDeviceSessionContext>() {
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<MqttDeviceAwareSessionContext>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext result) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext result) {
ack(msg);
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
}
@ -232,8 +214,8 @@ public class AbstractGatewaySessionHandler {
}, context.getExecutor());
}
private ListenableFuture<AbstractGatewayDeviceSessionContext> onDeviceConnect(String deviceName, String deviceType) {
AbstractGatewayDeviceSessionContext result = devices.get(deviceName);
private ListenableFuture<MqttDeviceAwareSessionContext> onDeviceConnect(String deviceName, String deviceType) {
MqttDeviceAwareSessionContext result = devices.get(deviceName);
if (result == null) {
Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
deviceCreationLock.lock();
@ -252,9 +234,9 @@ public class AbstractGatewaySessionHandler {
}
}
private ListenableFuture<AbstractGatewayDeviceSessionContext> getDeviceCreationFuture(String deviceName, String deviceType) {
final SettableFuture<AbstractGatewayDeviceSessionContext> futureToSet = SettableFuture.create();
ListenableFuture<AbstractGatewayDeviceSessionContext> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
private ListenableFuture<MqttDeviceAwareSessionContext> getDeviceCreationFuture(String deviceName, String deviceType) {
final SettableFuture<MqttDeviceAwareSessionContext> futureToSet = SettableFuture.create();
ListenableFuture<MqttDeviceAwareSessionContext> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
if (future != null) {
return future;
}
@ -263,11 +245,13 @@ public class AbstractGatewaySessionHandler {
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()).build(),
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits())
.setSparkplug(this.deviceSessionCtx.isSparkplug())
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
AbstractGatewayDeviceSessionContext deviceSessionCtx = new AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
GatewayDeviceSessionContext deviceSessionCtx = new GatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
@ -297,18 +281,18 @@ public class AbstractGatewaySessionHandler {
}
}
private int getMsgId(MqttPublishMessage mqttMsg) {
protected int getMsgId(MqttPublishMessage mqttMsg) {
return mqttMsg.variableHeader().packetId();
}
private void onDeviceConnectJson(MqttPublishMessage mqttMsg) throws AdaptorException {
protected void onDeviceConnectJson(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = getJson(mqttMsg);
String deviceName = checkDeviceName(getDeviceName(json));
String deviceType = getDeviceType(json);
processOnConnect(mqttMsg, deviceName, deviceType);
}
private void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
protected void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.ConnectMsg connectProto = TransportApiProtos.ConnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
@ -339,7 +323,7 @@ public class AbstractGatewaySessionHandler {
ack(msg);
}
private void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
@ -348,7 +332,7 @@ public class AbstractGatewaySessionHandler {
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
if (!deviceEntry.getValue().isJsonArray()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
@ -372,7 +356,7 @@ public class AbstractGatewaySessionHandler {
}
}
private void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException {
protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException {
try {
TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload));
List<TransportApiProtos.TelemetryMsg> deviceMsgList = telemetryMsgProto.getMsgList();
@ -380,9 +364,9 @@ public class AbstractGatewaySessionHandler {
deviceMsgList.forEach(telemetryMsg -> {
String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
TransportProtos.PostTelemetryMsg msg = telemetryMsg.getMsg();
try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
@ -408,7 +392,7 @@ public class AbstractGatewaySessionHandler {
}
}
private void processPostTelemetryMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) {
private void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) {
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
}
@ -419,9 +403,9 @@ public class AbstractGatewaySessionHandler {
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
@ -453,9 +437,9 @@ public class AbstractGatewaySessionHandler {
claimMsgList.forEach(claimDeviceMsg -> {
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
TransportApiProtos.ClaimDevice claimRequest = claimDeviceMsg.getClaimRequest();
if (claimRequest == null) {
throw new IllegalArgumentException("Claim request for device: " + deviceName + " is null!");
@ -484,7 +468,7 @@ public class AbstractGatewaySessionHandler {
}
}
private void processClaimDeviceMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) {
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) {
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
}
@ -495,9 +479,9 @@ public class AbstractGatewaySessionHandler {
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
@ -524,9 +508,9 @@ public class AbstractGatewaySessionHandler {
attributesMsgList.forEach(attributesMsg -> {
String deviceName = checkDeviceName(attributesMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
if (kvListProto == null) {
throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!");
@ -554,7 +538,7 @@ public class AbstractGatewaySessionHandler {
}
}
private void processPostAttributesMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
}
@ -603,9 +587,9 @@ public class AbstractGatewaySessionHandler {
JsonObject jsonObj = json.getAsJsonObject();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
@ -628,9 +612,9 @@ public class AbstractGatewaySessionHandler {
TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload));
String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
Integer requestId = gatewayRpcResponseMsg.getId();
String data = gatewayRpcResponseMsg.getData();
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
@ -648,16 +632,16 @@ public class AbstractGatewaySessionHandler {
}
}
private void processRpcResponseMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) {
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) {
transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg));
}
private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
int msgId = getMsgId(mqttMsg);
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg));
}
@ -681,8 +665,8 @@ public class AbstractGatewaySessionHandler {
return result.build();
}
private ListenableFuture<AbstractGatewayDeviceSessionContext> checkDeviceConnected(String deviceName) {
AbstractGatewayDeviceSessionContext ctx = devices.get(deviceName);
private ListenableFuture<MqttDeviceAwareSessionContext> checkDeviceConnected(String deviceName) {
MqttDeviceAwareSessionContext ctx = devices.get(deviceName);
if (ctx == null) {
log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName);
return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
@ -723,7 +707,7 @@ public class AbstractGatewaySessionHandler {
}
}
private void deregisterSession(String deviceName, AbstractGatewayDeviceSessionContext deviceSessionCtx) {
private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);

37
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import java.util.concurrent.ConcurrentMap;
/**
* Created by nickAS21 on 26.12.22
*/
public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext{
public GatewayDeviceSessionContext(AbstractGatewaySessionHandler parent,
TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile,
ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
TransportService transportService) {
super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService);
}
}

51
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java

@ -0,0 +1,51 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import java.util.UUID;
/**
* Created by nickAS21 on 26.12.22
*/
public class GatewaySessionHandler extends AbstractGatewaySessionHandler {
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
super(deviceSessionCtx, sessionId);
}
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
if (isJsonPayloadType()) {
onDeviceConnectJson(mqttMsg);
} else {
onDeviceConnectProto(mqttMsg);
}
}
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
int msgId = getMsgId(mqttMsg);
ByteBuf payload = mqttMsg.payload();
if (isJsonPayloadType()) {
onDeviceTelemetryJson(msgId, payload);
} else {
onDeviceTelemetryProto(msgId, payload);
}
}
}
Loading…
Cancel
Save