Browse Source

Merge pull request #7763 from thingsboard/sparkPlugHandlerMqtt

[WIP][3.4.5] sparkPlug: start add Handler connect
pull/7839/head
Andrew Shvayka 4 years ago
committed by GitHub
parent
commit
873b4c7dfe
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  2. 1
      common/cluster-api/src/main/proto/queue.proto
  3. 1
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  4. 13
      common/transport/mqtt/pom.xml
  5. 175
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  6. 7
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java
  7. 401
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java
  8. 146
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java
  9. 110
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java
  10. 160
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java
  11. 114
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java
  12. 204
      common/transport/mqtt/src/main/proto/sparkplug.proto
  13. 26
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  14. 12
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java

15
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -47,6 +47,8 @@ import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfigu
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.PowerMode;
import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
@ -65,7 +67,6 @@ import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceProvisionService;
import org.thingsboard.server.dao.device.DeviceService;
@ -95,6 +96,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@ -290,6 +292,7 @@ public class DefaultTransportApiService implements TransportApiService {
device.setType(requestMsg.getDeviceType());
device.setCustomerId(gateway.getCustomerId());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
@ -305,7 +308,8 @@ public class DefaultTransportApiService implements TransportApiService {
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("gatewayId", gatewayId.toString());
String deviceIdStr = requestMsg.getSparkplug() ? "sparkplugId" : "gatewayId";
metaData.putValue(deviceIdStr, gatewayId.toString());
DeviceId deviceId = device.getId();
ObjectNode entityNode = mapper.valueToTree(device);
@ -316,11 +320,12 @@ public class DefaultTransportApiService implements TransportApiService {
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
String lastConnectedStr = requestMsg.getSparkplug() ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY;
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) {
(!deviceAdditionalInfo.has(lastConnectedStr)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(lastConnectedStr).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
newDeviceAdditionalInfo.put(lastConnectedStr, gatewayId.toString());
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device);
}

1
common/cluster-api/src/main/proto/queue.proto

@ -186,6 +186,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg {
int64 gatewayIdLSB = 2;
string deviceName = 3;
string deviceType = 4;
bool sparkplug = 5;
}
message GetOrCreateDeviceFromGatewayResponseMsg {

1
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -118,4 +118,5 @@ public class DataConstants {
public static final String MSG_SOURCE_KEY = "source";
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";
public static final String LAST_CONNECTED_SPARKPLUG = "lastConnectedSparkplug";
}

13
common/transport/mqtt/pom.xml

@ -97,6 +97,19 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

175
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -41,11 +41,11 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
@ -67,11 +67,14 @@ import org.thingsboard.server.common.transport.util.SslUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.IOException;
@ -103,6 +106,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic;
/**
* @author Andrew Shvayka
@ -128,6 +132,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
final DeviceSessionCtx deviceSessionCtx;
volatile InetSocketAddress address;
volatile GatewaySessionHandler gatewaySessionHandler;
volatile SparkplugNodeSessionHandler sparkplugSessionHandler;
private final ConcurrentHashMap<String, String> otaPackSessions;
private final ConcurrentHashMap<String, Integer> chunkSizes;
@ -320,7 +325,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
int msgId = mqttMsg.variableHeader().packetId();
log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (sparkplugSessionHandler != null) {
handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
} else if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
@ -366,6 +374,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) {
try {
sparkplugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg);
} catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ctx.close();
} catch (Exception e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId);
}
}
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
Matcher fwMatcher;
@ -623,69 +643,74 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
try {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
if (sparkplugSessionHandler != null) {
SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName());
sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS);
} else {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC:
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC:
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
break;
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
@ -953,6 +978,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void checkSparkplugSession(MqttConnectMessage connectMessage) {
try {
SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic());
// Test proto
SparkplugBProto.Payload payloadBProto = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
//
if (sparkplugSessionHandler == null) {
sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString());
} else {
log.warn("SparkPlugNodeReConnected [{}] [{}]", sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
}
} catch (Exception e) {
log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e);
}
}
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
log.trace("[{}] Channel closed!", sessionId);
@ -987,7 +1028,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess(Void msg) {
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
checkGatewaySession(sessionMetaData);
if (deviceSessionCtx.isSparkplug()) {
checkSparkplugSession(connectMessage);
} else {
checkGatewaySession(sessionMetaData);
}
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true);
log.debug("[{}] Client connected!", sessionId);

7
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java

@ -16,14 +16,7 @@
package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter;
import org.thingsboard.server.transport.mqtt.util.MqttTopicFilterFactory;
import java.util.List;
import java.util.Map;

401
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java

@ -0,0 +1,401 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.annotation.Nullable;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic;
/**
* Created by nickAS21 on 12.12.22
*/
@Slf4j
public class SparkplugNodeSessionHandler {
private static final String DEFAULT_DEVICE_TYPE = "default";
private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
private static final String DEVICE_PROPERTY = "device";
private final MqttTransportContext context;
private final TransportService transportService;
private final TransportDeviceInfo nodeSparkplugInfo;
private final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, SparkplugSessionCtx> devices = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ListenableFuture<SparkplugSessionCtx>> deviceFutures = new ConcurrentHashMap<>();
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final ChannelHandlerContext channel;
private final DeviceSessionCtx deviceSessionCtx;
private String nodeTopic;
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, String nodeTopic) {
this.context = deviceSessionCtx.getContext();
this.transportService = context.getTransportService();
this.deviceSessionCtx = deviceSessionCtx;
this.nodeSparkplugInfo = deviceSessionCtx.getDeviceInfo();
this.sessionId = sessionId;
this.deviceCreationLockMap = createWeakMap();
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
this.channel = deviceSessionCtx.getChannel();
this.nodeTopic = nodeTopic;
}
ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
return new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
}
public String getNodeId() {
return context.getNodeId();
}
public UUID getSessionId() {
return sessionId;
}
public String getNodeTopic() {
return nodeTopic;
}
public int nextMsgId() {
return deviceSessionCtx.nextMsgId();
}
public void deregisterSession(String deviceName) {
SparkplugSessionCtx deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
deregisterSession(deviceName, deviceSessionCtx);
} else {
log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
}
}
private void deregisterSession(String deviceName, SparkplugSessionCtx deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
}
public void onDeviceDeleted(String deviceName) {
deregisterSession(deviceName);
}
private int getMsgId(MqttPublishMessage mqttMsg) {
return mqttMsg.variableHeader().packetId();
}
public void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId();
String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType();
processOnConnect(mqttMsg, deviceName, deviceType);
} catch (Exception e) {
throw new AdaptorException(e);
}
}
public void onPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) throws Exception {
SparkplugTopic sparkplugTopic = parseTopic(topicName);
log.error("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case NBIRTH:
// TODO
break;
case NCMD:
// TODO
break;
case NDATA:
// TODO
break;
case NDEATH:
onNodeDisconnectProto(mqttMsg);
break;
case NRECORD:
// TODO
break;
default:
}
} else {
// A device topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case DBIRTH:
onDeviceConnectProto(mqttMsg);
break;
case DCMD:
// TODO
break;
case DDATA:
// TODO
break;
case DDEATH:
onDeviceDisconnectProto(mqttMsg);
break;
case DRECORD:
// TODO
break;
default:
}
}
}
private void onNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
processOnDisconnect(mqttMsg, deviceName);
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
}
}
private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
// TODO disconnect device without disconnect Node
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
}
}
private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg);
}
public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) {
String topicName = sparkplugTopic.toString();
log.error("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case NBIRTH:
// TODO
break;
case NCMD:
// TODO
break;
case NDATA:
// TODO
break;
case NDEATH:
// TODO
break;
case NRECORD:
// TODO
break;
default:
}
} else {
// A device topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case DBIRTH:
// TODO
break;
case DCMD:
// TODO
break;
case DDATA:
// TODO
break;
case DDEATH:
// TODO
break;
case DRECORD:
// TODO
break;
default:
}
}
}
private byte[] getBytes(ByteBuf payload) {
return ProtoMqttAdaptor.toBytes(payload);
}
private void ack(MqttPublishMessage msg) {
int msgId = getMsgId(msg);
if (msgId > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId));
}
}
ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
return channel.writeAndFlush(mqttMessage);
}
private String checkDeviceName(String deviceName) {
if (StringUtils.isEmpty(deviceName)) {
throw new RuntimeException("Device name is empty!");
} else {
return deviceName;
}
}
private String getDeviceName(JsonElement json) {
return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
}
private String getDeviceType(JsonElement json) {
JsonElement type = json.getAsJsonObject().get("type");
return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
}
private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable SparkplugSessionCtx result) {
ack(msg);
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
}
private ListenableFuture<SparkplugSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
SparkplugSessionCtx result = devices.get(deviceName);
if (result == null) {
Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
deviceCreationLock.lock();
try {
result = devices.get(deviceName);
if (result == null) {
return getDeviceCreationFuture(deviceName, deviceType);
} else {
return Futures.immediateFuture(result);
}
} finally {
deviceCreationLock.unlock();
}
} else {
return Futures.immediateFuture(result);
}
}
private ListenableFuture<SparkplugSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
final SettableFuture<SparkplugSessionCtx> futureToSet = SettableFuture.create();
ListenableFuture<SparkplugSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
if (future != null) {
return future;
}
try {
transportService.process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits())
.setSparkplug(true)
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
if (msg.getDeviceInfo() == null) {
System.out.println("DeviceInfo == null");
}
SparkplugSessionCtx nodeSparkplugSessionCtx = new SparkplugSessionCtx(SparkplugNodeSessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
if (devices.putIfAbsent(deviceName, nodeSparkplugSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = nodeSparkplugSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, nodeSparkplugSessionCtx);
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo)
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
}
futureToSet.set(devices.get(deviceName));
deviceFutures.remove(deviceName);
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
futureToSet.setException(e);
deviceFutures.remove(deviceName);
}
});
return futureToSet;
} catch (Throwable e) {
deviceFutures.remove(deviceName);
throw e;
}
}
}

146
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java

@ -0,0 +1,146 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
/**
* Created by nickAS21 on 08.12.22
*/
@Slf4j
public class SparkplugSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
private final SparkplugNodeSessionHandler parent;
private final TransportService transportService;
public SparkplugSessionCtx(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
TransportService transportService) {
super(UUID.randomUUID(), mqttQoSMap);
this.parent = parent;
setSessionInfo(SessionInfoProto.newBuilder()
.setNodeId(parent.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits())
.setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
.setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
.setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits())
.setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits())
.setDeviceName(deviceInfo.getDeviceName())
.setDeviceType(deviceInfo.getDeviceType())
.setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
.setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
.setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits())
.setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits())
.build());
setDeviceInfo(deviceInfo);
setConnected(true);
setDeviceProfile(deviceProfile);
this.transportService = transportService;
}
@Override
public UUID getSessionId() {
return sessionId;
}
@Override
public int nextMsgId() {
return parent.nextMsgId();
}
@Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
// try {
// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush);
// } catch (Exception e) {
// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
// }
}
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
// log.trace("[{}] Received attributes update notification to device", sessionId);
// try {
// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
// } catch (Exception e) {
// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
// }
}
@Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
// log.trace("[{}] Received RPC command to device", sessionId);
// try {
// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
// payload -> {
// ChannelFuture channelFuture = parent.writeAndFlush(payload);
// if (request.getPersisted()) {
// channelFuture.addListener(result -> {
// if (result.cause() == null) {
// if (!isAckExpected(payload)) {
// transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
// } else if (request.getPersisted()) {
// transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
//
// }
// }
// });
// }
// }
// );
// } catch (Exception e) {
// transportService.process(getSessionInfo(),
// TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
// .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
// }
}
@Override
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
parent.deregisterSession(getDeviceInfo().getDeviceName());
}
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
// This feature is not supported in the TB IoT Gateway yet.
}
@Override
public void onDeviceDeleted(DeviceId deviceId) {
parent.onDeviceDeleted(this.getSessionInfo().getDeviceName());
}
private boolean isAckExpected(MqttMessage message) {
return message.fixedHeader().qosLevel().value() > 0;
}
}

110
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java

@ -0,0 +1,110 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
/**
* An enumeration of Sparkplug MQTT message types. The type provides an indication as to what the MQTT Payload of
* message will contain.
*/
public enum SparkplugMessageType {
/**
* Birth certificate for MQTT Edge of Network (EoN) Nodes.
*/
NBIRTH,
/**
* Death certificate for MQTT Edge of Network (EoN) Nodes.
*/
NDEATH,
/**
* Birth certificate for MQTT Devices.
*/
DBIRTH,
/**
* Death certificate for MQTT Devices.
*/
DDEATH,
/**
* Edge of Network (EoN) Node data message.
*/
NDATA,
/**
* Device data message.
*/
DDATA,
/**
* Edge of Network (EoN) Node command message.
*/
NCMD,
/**
* Device command message.
*/
DCMD,
/**
* Critical application state message.
*/
STATE,
/**
* Device record message.
*/
DRECORD,
/**
* Edge of Network (EoN) Node record message.
*/
NRECORD;
public static SparkplugMessageType parseMessageType(String type) throws ThingsboardException {
for (SparkplugMessageType messageType : SparkplugMessageType.values()) {
if (messageType.name().equals(type)) {
return messageType;
}
}
throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
public boolean isDeath() {
return this.equals(DDEATH) || this.equals(NDEATH);
}
public boolean isCommand() {
return this.equals(DCMD) || this.equals(NCMD);
}
public boolean isData() {
return this.equals(DDATA) || this.equals(NDATA);
}
public boolean isBirth() {
return this.equals(DBIRTH) || this.equals(NBIRTH);
}
public boolean isRecord() {
return this.equals(DRECORD) || this.equals(NRECORD);
}
}

160
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java

@ -0,0 +1,160 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonInclude;
/**
* Created by nickAS21 on 12.12.22
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SparkplugTopic {
/**
* The Sparkplug namespace version.
* For the Sparkplug A version of the payload definition, the UTF-8 string constant for the namespace element will be:
* spAv1.0
* For the Sparkplug B version of the specification, the UTF-8 string constant for the namespace element will be:
* spBv1.0
*/
private String namespace;
/**
* The ID of the logical grouping of Edge of Network (EoN) Nodes and devices.
*/
private String groupId;
/**
* The ID of the Edge of Network (EoN) Node.
*/
private String edgeNodeId;
/**
* The ID of the device.
*/
private String deviceId;
/**
* The message type.
*/
private SparkplugMessageType type;
/**
* Constructor (device).
*
* @param namespace the namespace.
* @param groupId the group ID.
* @param edgeNodeId the edge node ID.
* @param deviceId the device ID.
* @param type the message type.
*/
public SparkplugTopic(String namespace, String groupId, String edgeNodeId, String deviceId, SparkplugMessageType type) {
super();
this.namespace = namespace;
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.deviceId = deviceId;
this.type = type;
}
/**
* Constructor (node).
*
* @param namespace the namespace.
* @param groupId the group ID.
* @param edgeNodeId the edge node ID.
* @param type the message type.
*/
public SparkplugTopic(String namespace, String groupId, String edgeNodeId, SparkplugMessageType type) {
super();
this.namespace = namespace;
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.deviceId = null;
this.type = type;
}
/**
* Returns the Sparkplug namespace version.
*
* @return the namespace
*/
public String getNamespace() {
return namespace;
}
/**
* Returns the ID of the logical grouping of Edge of Network (EoN) Nodes and devices.
*
* @return the group ID
*/
public String getGroupId() {
return groupId;
}
/**
* Returns the ID of the Edge of Network (EoN) Node.
*
* @return the edge node ID
*/
public String getEdgeNodeId() {
return edgeNodeId;
}
/**
* Returns the ID of the device.
*
* @return the device ID
*/
public String getDeviceId() {
return deviceId;
}
/**
* Returns the message type.
*
* @return the message type
*/
public SparkplugMessageType getType() {
return type;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(getNamespace()).append("/")
.append(getGroupId()).append("/")
.append(getType()).append("/")
.append(getEdgeNodeId());
if (getDeviceId() != null) {
sb.append("/").append(getDeviceId());
}
return sb.toString();
}
/**
* Returns true if this topic's type matches the passes in type, false otherwise.
*
* @param type the type to check
* @return true if this topic's type matches the passes in type, false otherwise
*/
public boolean isType(SparkplugMessageType type) {
return this.type != null && this.type.equals(type);
}
public boolean isNode() {
return this.deviceId == null;
}
}

114
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java

@ -0,0 +1,114 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import java.util.HashMap;
import java.util.Map;
/**
* Provides utility methods for handling Sparkplug MQTT message topics.
*/
public class SparkplugTopicUtil {
private static final Map<String, String[]> SPLIT_TOPIC_CACHE = new HashMap<String, String[]>();
public static String[] getSplitTopic(String topic) {
String[] splitTopic = SPLIT_TOPIC_CACHE.get(topic);
if (splitTopic == null) {
splitTopic = topic.split("/");
SPLIT_TOPIC_CACHE.put(topic, splitTopic);
}
return splitTopic;
}
/**
* Serializes a {@link SparkplugTopic} instance in to a JSON string.
*
* @param topic a {@link SparkplugTopic} instance
* @return a JSON string
* @throws JsonProcessingException
*/
public static String sparkplugTopicToString(SparkplugTopic topic) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(topic);
}
/**
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
*
* @param topic a topic string
* @return a {@link SparkplugTopic} instance
* @throws ThingsboardException if an error occurs while parsing
*/
public static SparkplugTopic parseTopic(String topic) throws ThingsboardException {
topic = topic.indexOf("#") > 0 ? topic.substring(0, topic.indexOf("#")) : topic;
return parseTopic(SparkplugTopicUtil.getSplitTopic(topic));
}
/**
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
*
* @param splitTopic a topic split into tokens
* @return a {@link SparkplugTopic} instance
* @throws Exception if an error occurs while parsing
*/
@SuppressWarnings("incomplete-switch")
public static SparkplugTopic parseTopic(String[] splitTopic) throws ThingsboardException {
SparkplugMessageType type;
String namespace, edgeNodeId, groupId;
int length = splitTopic.length;
if (length < 4 || length > 5) {
throw new ThingsboardException("Invalid number of topic elements: " + length, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
namespace = splitTopic[0];
groupId = splitTopic[1];
type = SparkplugMessageType.parseMessageType(splitTopic[2]);
edgeNodeId = splitTopic[3];
if (length == 4) {
// A node topic
switch (type) {
case STATE:
case NBIRTH:
case NCMD:
case NDATA:
case NDEATH:
case NRECORD:
return new SparkplugTopic(namespace, groupId, edgeNodeId, type);
}
} else {
// A device topic
switch (type) {
case STATE:
case DBIRTH:
case DCMD:
case DDATA:
case DDEATH:
case DRECORD:
return new SparkplugTopic(namespace, groupId, edgeNodeId, splitTopic[4], type);
}
}
throw new ThingsboardException("Invalid number of topic elements " + length + " for topic type " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}

204
common/transport/mqtt/src/main/proto/sparkplug.proto

@ -0,0 +1,204 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
import "google/protobuf/any.proto";
option java_package = "org.thingsboard.server.gen.transport.mqtt";
option java_outer_classname = "SparkplugBProto";
message Payload {
/*
// Indexes of Data Types
// Unknown placeholder for future expansion.
Unknown = 0;
// Basic Types
Int8 = 1;
Int16 = 2;
Int32 = 3;
Int64 = 4;
UInt8 = 5;
UInt16 = 6;
UInt32 = 7;
UInt64 = 8;
Float = 9;
Double = 10;
Boolean = 11;
String = 12;
DateTime = 13;
Text = 14;
// Additional Metric Types
UUID = 15;
DataSet = 16;
Bytes = 17;
File = 18;
Template = 19;
// Additional PropertyValue Types
PropertySet = 20;
PropertySetList = 21;
*/
message Template {
message Parameter {
optional string name = 1;
optional uint32 type = 2;
oneof value {
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
ParameterValueExtension extension_value = 9;
}
message ParameterValueExtension {
google.protobuf.Any extensions = 1;
}
}
optional string version = 1; // The version of the Template to prevent mismatches
repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
repeated Parameter parameters = 3;
optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
optional bool is_definition = 5;
google.protobuf.Any extensions = 6;
}
message DataSet {
message DataSetValue {
oneof value {
uint32 int_value = 1;
uint64 long_value = 2;
float float_value = 3;
double double_value = 4;
bool boolean_value = 5;
string string_value = 6;
DataSetValueExtension extension_value = 7;
}
message DataSetValueExtension {
google.protobuf.Any extensions = 1;
}
}
message Row {
repeated DataSetValue elements = 1;
google.protobuf.Any extensions = 2; // For third party extensions
}
optional uint64 num_of_columns = 1;
repeated string columns = 2;
repeated uint32 types = 3;
repeated Row rows = 4;
google.protobuf.Any extensions = 5; // For third party extensions
}
message PropertyValue {
optional uint32 type = 1;
optional bool is_null = 2;
oneof value {
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
PropertySet propertyset_value = 9;
PropertySetList propertysets_value = 10; // List of Property Values
PropertyValueExtension extension_value = 11;
}
message PropertyValueExtension {
google.protobuf.Any extensions = 1;
}
}
message PropertySet {
repeated string keys = 1; // Names of the properties
repeated PropertyValue values = 2;
google.protobuf.Any extensions = 3;
}
message PropertySetList {
repeated PropertySet propertyset = 1;
google.protobuf.Any extensions = 2;
}
message MetaData {
// Bytes specific metadata
optional bool is_multi_part = 1;
// General metadata
optional string content_type = 2; // Content/Media type
optional uint64 size = 3; // File size, String size, Multi-part size, etc
optional uint64 seq = 4; // Sequence number for multi-part messages
// File metadata
optional string file_name = 5; // File name
optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
optional string md5 = 7; // md5 of data
// Catchalls and future expansion
optional string description = 8; // Could be anything such as json or xml of custom properties
google.protobuf.Any extensions = 9;
}
message Metric {
optional string name = 1; // Metric name - should only be included on birth
optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
optional uint32 datatype = 4; // DataType of the metric/tag value
optional bool is_historical = 5; // If this is historical data and should not update real time tag
optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
optional MetaData metadata = 8; // Metadata for the payload
optional PropertySet properties = 9;
oneof value {
uint32 int_value = 10;
uint64 long_value = 11;
float float_value = 12;
double double_value = 13;
bool boolean_value = 14;
string string_value = 15;
bytes bytes_value = 16; // Bytes, File
DataSet dataset_value = 17;
Template template_value = 18;
MetricValueExtension extension_value = 19;
}
message MetricValueExtension {
google.protobuf.Any extensions = 1;
}
}
optional uint64 timestamp = 1; // Timestamp at message sending time
repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
optional uint64 seq = 3; // Sequence number
optional string uuid = 4; // UUID to track message type in terms of schema definitions
optional bytes body = 5; // To optionally bypass the whole definition above
google.protobuf.Any extensions = 6;
}

26
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -301,8 +301,8 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getEntityProfileResponseMsg();
@ -313,8 +313,8 @@ public class DefaultTransportService implements TransportService {
@Override
public List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getGetQueueRoutingInfoResponseMsgsList();
@ -325,8 +325,8 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getResourceResponseMsg();
@ -337,8 +337,8 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(TransportProtos.GetSnmpDevicesRequestMsg requestMsg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder()
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
.setSnmpDevicesRequestMsg(requestMsg)
.build()
);
@ -354,7 +354,7 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetDeviceResponseMsg getDevice(TransportProtos.GetDeviceRequestMsg requestMsg) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder()
UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
.setDeviceRequestMsg(requestMsg)
.build()
);
@ -374,7 +374,7 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetDeviceCredentialsResponseMsg getDeviceCredentials(TransportProtos.GetDeviceCredentialsRequestMsg requestMsg) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder()
UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
.setDeviceCredentialsRequestMsg(requestMsg)
.build()
);
@ -720,8 +720,8 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetOtaPackageRequestMsg msg, TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build());
AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> {
callback.onSuccess(response.getValue().getOtaPackageResponseMsg());
@ -864,7 +864,7 @@ public class DefaultTransportService implements TransportService {
}
}
protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
protected void processToTransportMsg(ToTransportMsg toSessionMsg) {
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
SessionMetaData md = sessions.get(sessionId);
if (md != null) {

12
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java

@ -20,6 +20,8 @@ import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -81,4 +83,14 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
public void setDisconnected() {
this.connected = false;
}
public boolean isSparkplug() {
DeviceProfileTransportConfiguration transportConfiguration = this.deviceProfile.getProfileData().getTransportConfiguration();
if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) {
return ((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug();
} else {
return false;
}
}
}

Loading…
Cancel
Save