diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index f8b42ecc1e..f7ff7ec1e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -47,6 +47,8 @@ import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfigu import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.PowerMode; import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration; +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -65,7 +67,6 @@ import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.dao.device.DeviceCredentialsService; import org.thingsboard.server.dao.device.DeviceProvisionService; import org.thingsboard.server.dao.device.DeviceService; @@ -95,6 +96,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; @@ -290,6 +292,7 @@ public class DefaultTransportApiService implements TransportApiService { device.setType(requestMsg.getDeviceType()); device.setCustomerId(gateway.getCustomerId()); DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); + device.setDeviceProfileId(deviceProfile.getId()); ObjectNode additionalInfo = JacksonUtil.newObjectNode(); additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); @@ -305,7 +308,8 @@ public class DefaultTransportApiService implements TransportApiService { if (customerId != null && !customerId.isNullUid()) { metaData.putValue("customerId", customerId.toString()); } - metaData.putValue("gatewayId", gatewayId.toString()); + String deviceIdStr = requestMsg.getSparkplug() ? "sparkplugId" : "gatewayId"; + metaData.putValue(deviceIdStr, gatewayId.toString()); DeviceId deviceId = device.getId(); ObjectNode entityNode = mapper.valueToTree(device); @@ -316,11 +320,12 @@ public class DefaultTransportApiService implements TransportApiService { if (deviceAdditionalInfo == null) { deviceAdditionalInfo = JacksonUtil.newObjectNode(); } + String lastConnectedStr = requestMsg.getSparkplug() ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY; if (deviceAdditionalInfo.isObject() && - (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY) - || !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) { + (!deviceAdditionalInfo.has(lastConnectedStr) + || !gatewayId.toString().equals(deviceAdditionalInfo.get(lastConnectedStr).asText()))) { ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; - newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); + newDeviceAdditionalInfo.put(lastConnectedStr, gatewayId.toString()); Device savedDevice = deviceService.saveDevice(device); tbClusterService.onDeviceUpdated(savedDevice, device); } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 5a28d42546..309d812b41 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -186,6 +186,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg { int64 gatewayIdLSB = 2; string deviceName = 3; string deviceType = 4; + bool sparkplug = 5; } message GetOrCreateDeviceFromGatewayResponseMsg { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 970fd6177e..3ff3c42c47 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -118,4 +118,5 @@ public class DataConstants { public static final String MSG_SOURCE_KEY = "source"; public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway"; + public static final String LAST_CONNECTED_SPARKPLUG = "lastConnectedSparkplug"; } diff --git a/common/transport/mqtt/pom.xml b/common/transport/mqtt/pom.xml index 9442a66646..d8fd475ee7 100644 --- a/common/transport/mqtt/pom.xml +++ b/common/transport/mqtt/pom.xml @@ -97,6 +97,19 @@ awaitility test + + com.google.protobuf + protobuf-java + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + + diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 2a320cc2ac..4d98662be2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -41,11 +41,11 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.id.DeviceId; @@ -67,11 +67,14 @@ import org.thingsboard.server.common.transport.util.SslUtil; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; +import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; +import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; @@ -103,6 +106,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic; /** * @author Andrew Shvayka @@ -128,6 +132,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement final DeviceSessionCtx deviceSessionCtx; volatile InetSocketAddress address; volatile GatewaySessionHandler gatewaySessionHandler; + volatile SparkplugNodeSessionHandler sparkplugSessionHandler; private final ConcurrentHashMap otaPackSessions; private final ConcurrentHashMap chunkSizes; @@ -320,7 +325,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement int msgId = mqttMsg.variableHeader().packetId(); log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId); - if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { + if (sparkplugSessionHandler != null) { + handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg); + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); + } else if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { if (gatewaySessionHandler != null) { handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); transportService.reportActivity(deviceSessionCtx.getSessionInfo()); @@ -366,6 +374,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) { + try { + sparkplugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg); + } catch (RuntimeException e) { + log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); + ctx.close(); + } catch (Exception e) { + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); + sendAckOrCloseSession(ctx, topicName, msgId); + } + } + private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { Matcher fwMatcher; @@ -623,69 +643,74 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); try { - switch (topic) { - case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); - activityReported = true; - break; - } - case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); - activityReported = true; - break; - } - case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); - activityReported = true; - break; - } - case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); - activityReported = true; - break; + if (sparkplugSessionHandler != null) { + SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName()); + sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS); + } else { + switch (topic) { + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: + case MqttTopics.GATEWAY_RPC_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: + case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: + case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: + case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: + case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: + case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + default: + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(FAILURE.value()); + break; } - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC: - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC: - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC: - case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: - case MqttTopics.GATEWAY_RPC_TOPIC: - case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: - case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: - case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: - case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: - case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: - registerSubQoS(topic, grantedQoSList, reqQoS); - break; - default: - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(FAILURE.value()); - break; } } catch (Exception e) { log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); @@ -953,6 +978,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void checkSparkplugSession(MqttConnectMessage connectMessage) { + try { + SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic()); + // Test proto + SparkplugBProto.Payload payloadBProto = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); + // + if (sparkplugSessionHandler == null) { + sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString()); + } else { + log.warn("SparkPlugNodeReConnected [{}] [{}]", sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + } + } catch (Exception e) { + log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); + } + } + @Override public void operationComplete(Future future) throws Exception { log.trace("[{}] Channel closed!", sessionId); @@ -987,7 +1028,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onSuccess(Void msg) { SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); - checkGatewaySession(sessionMetaData); + if (deviceSessionCtx.isSparkplug()) { + checkSparkplugSession(connectMessage); + } else { + checkGatewaySession(sessionMetaData); + } ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); deviceSessionCtx.setConnected(true); log.debug("[{}] Client connected!", sessionId); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java index 74e555ae6b..8c8437be51 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -16,14 +16,7 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.handler.codec.mqtt.MqttQoS; -import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.DeviceTransportType; -import org.thingsboard.server.common.data.TransportPayloadType; -import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; -import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter; -import org.thingsboard.server.transport.mqtt.util.MqttTopicFilterFactory; import java.util.List; import java.util.Map; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java new file mode 100644 index 0000000000..f63b899515 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -0,0 +1,401 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.gen.transport.TransportApiProtos; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.transport.mqtt.MqttTransportContext; +import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic; + +/** + * Created by nickAS21 on 12.12.22 + */ +@Slf4j +public class SparkplugNodeSessionHandler { + + private static final String DEFAULT_DEVICE_TYPE = "default"; + private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; + private static final String DEVICE_PROPERTY = "device"; + + private final MqttTransportContext context; + private final TransportService transportService; + private final TransportDeviceInfo nodeSparkplugInfo; + private final UUID sessionId; + private final ConcurrentMap deviceCreationLockMap; + private final ConcurrentMap devices = new ConcurrentHashMap<>(); + private final ConcurrentMap> deviceFutures = new ConcurrentHashMap<>(); + private final ConcurrentMap mqttQoSMap; + private final ChannelHandlerContext channel; + private final DeviceSessionCtx deviceSessionCtx; + private String nodeTopic; + + public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, String nodeTopic) { + this.context = deviceSessionCtx.getContext(); + this.transportService = context.getTransportService(); + this.deviceSessionCtx = deviceSessionCtx; + this.nodeSparkplugInfo = deviceSessionCtx.getDeviceInfo(); + this.sessionId = sessionId; + this.deviceCreationLockMap = createWeakMap(); + this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); + this.channel = deviceSessionCtx.getChannel(); + this.nodeTopic = nodeTopic; + } + + ConcurrentReferenceHashMap createWeakMap() { + return new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + } + + public String getNodeId() { + return context.getNodeId(); + } + + public UUID getSessionId() { + return sessionId; + } + + public String getNodeTopic() { + return nodeTopic; + } + + public int nextMsgId() { + return deviceSessionCtx.nextMsgId(); + } + + public void deregisterSession(String deviceName) { + SparkplugSessionCtx deviceSessionCtx = devices.remove(deviceName); + if (deviceSessionCtx != null) { + deregisterSession(deviceName, deviceSessionCtx); + } else { + log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName); + } + } + + private void deregisterSession(String deviceName, SparkplugSessionCtx deviceSessionCtx) { + transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); + log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); + } + + public void onDeviceDeleted(String deviceName) { + deregisterSession(deviceName); + } + + private int getMsgId(MqttPublishMessage mqttMsg) { + return mqttMsg.variableHeader().packetId(); + } + + public void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + try { + String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId(); + String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType(); + processOnConnect(mqttMsg, deviceName, deviceType); + } catch (Exception e) { + throw new AdaptorException(e); + } + } + + public void onPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) throws Exception { + SparkplugTopic sparkplugTopic = parseTopic(topicName); + log.error("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + if (sparkplugTopic.isNode()) { + // A node topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case NBIRTH: + // TODO + break; + case NCMD: + // TODO + break; + case NDATA: + // TODO + break; + case NDEATH: + onNodeDisconnectProto(mqttMsg); + break; + case NRECORD: + // TODO + break; + default: + } + } else { + // A device topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case DBIRTH: + onDeviceConnectProto(mqttMsg); + break; + case DCMD: + // TODO + break; + case DDATA: + // TODO + break; + case DDEATH: + onDeviceDisconnectProto(mqttMsg); + break; + case DRECORD: + // TODO + break; + default: + } + } + } + + private void onNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + try { + TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); + String deviceName = checkDeviceName(connectProto.getDeviceName()); + processOnDisconnect(mqttMsg, deviceName); + } catch (RuntimeException | InvalidProtocolBufferException e) { + throw new AdaptorException(e); + } + } + + private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + try { + TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); + String deviceName = checkDeviceName(connectProto.getDeviceName()); + // TODO disconnect device without disconnect Node + } catch (RuntimeException | InvalidProtocolBufferException e) { + throw new AdaptorException(e); + } + } + + private void processOnDisconnect(MqttPublishMessage msg, String deviceName) { + deregisterSession(deviceName); + ack(msg); + } + + public void handleSparkplugSubscribeMsg(List grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) { + String topicName = sparkplugTopic.toString(); + log.error("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + + if (sparkplugTopic.isNode()) { + // A node topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case NBIRTH: + // TODO + break; + case NCMD: + // TODO + break; + case NDATA: + // TODO + break; + case NDEATH: + // TODO + break; + case NRECORD: + // TODO + break; + default: + } + } else { + // A device topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case DBIRTH: + // TODO + break; + case DCMD: + // TODO + break; + case DDATA: + // TODO + break; + case DDEATH: + // TODO + break; + case DRECORD: + // TODO + break; + default: + } + } + } + + + private byte[] getBytes(ByteBuf payload) { + return ProtoMqttAdaptor.toBytes(payload); + } + + private void ack(MqttPublishMessage msg) { + int msgId = getMsgId(msg); + if (msgId > 0) { + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId)); + } + } + + ChannelFuture writeAndFlush(MqttMessage mqttMessage) { + return channel.writeAndFlush(mqttMessage); + } + + private String checkDeviceName(String deviceName) { + if (StringUtils.isEmpty(deviceName)) { + throw new RuntimeException("Device name is empty!"); + } else { + return deviceName; + } + } + + private String getDeviceName(JsonElement json) { + return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString(); + } + + + private String getDeviceType(JsonElement json) { + JsonElement type = json.getAsJsonObject().get("type"); + return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString(); + } + + + private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) { + log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName); + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable SparkplugSessionCtx result) { + ack(msg); + log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t); + + } + }, context.getExecutor()); + } + + + private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { + SparkplugSessionCtx result = devices.get(deviceName); + if (result == null) { + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); + deviceCreationLock.lock(); + try { + result = devices.get(deviceName); + if (result == null) { + return getDeviceCreationFuture(deviceName, deviceType); + } else { + return Futures.immediateFuture(result); + } + } finally { + deviceCreationLock.unlock(); + } + } else { + return Futures.immediateFuture(result); + } + } + + private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { + final SettableFuture futureToSet = SettableFuture.create(); + ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); + if (future != null) { + return future; + } + try { + transportService.process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits()) + .setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits()) + .setSparkplug(true) + .build(), + new TransportServiceCallback<>() { + @Override + public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { + if (msg.getDeviceInfo() == null) { + System.out.println("DeviceInfo == null"); + } + SparkplugSessionCtx nodeSparkplugSessionCtx = new SparkplugSessionCtx(SparkplugNodeSessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + if (devices.putIfAbsent(deviceName, nodeSparkplugSessionCtx) == null) { + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); + SessionInfoProto deviceSessionInfo = nodeSparkplugSessionCtx.getSessionInfo(); + transportService.registerAsyncSession(deviceSessionInfo, nodeSparkplugSessionCtx); + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() + .setSessionInfo(deviceSessionInfo) + .setSessionEvent(SESSION_EVENT_MSG_OPEN) + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) + .build(), null); + } + futureToSet.set(devices.get(deviceName)); + deviceFutures.remove(deviceName); + } + + @Override + public void onError(Throwable e) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); + futureToSet.setException(e); + deviceFutures.remove(deviceName); + } + }); + return futureToSet; + } catch (Throwable e) { + deviceFutures.remove(deviceName); + throw e; + } + } + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java new file mode 100644 index 0000000000..0879a635c1 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java @@ -0,0 +1,146 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import io.netty.handler.codec.mqtt.MqttMessage; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +/** + * Created by nickAS21 on 08.12.22 + */ +@Slf4j +public class SparkplugSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { + + private final SparkplugNodeSessionHandler parent; + private final TransportService transportService; + + public SparkplugSessionCtx(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo, + DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, + TransportService transportService) { + super(UUID.randomUUID(), mqttQoSMap); + this.parent = parent; + setSessionInfo(SessionInfoProto.newBuilder() + .setNodeId(parent.getNodeId()) + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits()) + .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits()) + .setDeviceName(deviceInfo.getDeviceName()) + .setDeviceType(deviceInfo.getDeviceType()) + .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits()) + .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits()) + .setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits()) + .setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits()) + .build()); + setDeviceInfo(deviceInfo); + setConnected(true); + setDeviceProfile(deviceProfile); + this.transportService = transportService; + } + + @Override + public UUID getSessionId() { + return sessionId; + } + + @Override + public int nextMsgId() { + return parent.nextMsgId(); + } + + @Override + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { +// try { +// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush); +// } catch (Exception e) { +// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); +// } + } + + @Override + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { +// log.trace("[{}] Received attributes update notification to device", sessionId); +// try { +// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); +// } catch (Exception e) { +// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); +// } + } + + @Override + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) { +// log.trace("[{}] Received RPC command to device", sessionId); +// try { +// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( +// payload -> { +// ChannelFuture channelFuture = parent.writeAndFlush(payload); +// if (request.getPersisted()) { +// channelFuture.addListener(result -> { +// if (result.cause() == null) { +// if (!isAckExpected(payload)) { +// transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); +// } else if (request.getPersisted()) { +// transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY); +// +// } +// } +// }); +// } +// } +// ); +// } catch (Exception e) { +// transportService.process(getSessionInfo(), +// TransportProtos.ToDeviceRpcResponseMsg.newBuilder() +// .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY); +// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); +// } + } + + @Override + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); + parent.deregisterSession(getDeviceInfo().getDeviceName()); + } + + @Override + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) { + // This feature is not supported in the TB IoT Gateway yet. + } + + @Override + public void onDeviceDeleted(DeviceId deviceId) { + parent.onDeviceDeleted(this.getSessionInfo().getDeviceName()); + } + + private boolean isAckExpected(MqttMessage message) { + return message.fixedHeader().qosLevel().value() > 0; + } + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java new file mode 100644 index 0000000000..8370d2273e --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java @@ -0,0 +1,110 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; + +/** + * An enumeration of Sparkplug MQTT message types. The type provides an indication as to what the MQTT Payload of + * message will contain. + */ +public enum SparkplugMessageType { + + /** + * Birth certificate for MQTT Edge of Network (EoN) Nodes. + */ + NBIRTH, + + /** + * Death certificate for MQTT Edge of Network (EoN) Nodes. + */ + NDEATH, + + /** + * Birth certificate for MQTT Devices. + */ + DBIRTH, + + /** + * Death certificate for MQTT Devices. + */ + DDEATH, + + /** + * Edge of Network (EoN) Node data message. + */ + NDATA, + + /** + * Device data message. + */ + DDATA, + + /** + * Edge of Network (EoN) Node command message. + */ + NCMD, + + /** + * Device command message. + */ + DCMD, + + /** + * Critical application state message. + */ + STATE, + + /** + * Device record message. + */ + DRECORD, + + /** + * Edge of Network (EoN) Node record message. + */ + NRECORD; + + public static SparkplugMessageType parseMessageType(String type) throws ThingsboardException { + for (SparkplugMessageType messageType : SparkplugMessageType.values()) { + if (messageType.name().equals(type)) { + return messageType; + } + } + throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS); + } + + public boolean isDeath() { + return this.equals(DDEATH) || this.equals(NDEATH); + } + + public boolean isCommand() { + return this.equals(DCMD) || this.equals(NCMD); + } + + public boolean isData() { + return this.equals(DDATA) || this.equals(NDATA); + } + + public boolean isBirth() { + return this.equals(DBIRTH) || this.equals(NBIRTH); + } + + public boolean isRecord() { + return this.equals(DRECORD) || this.equals(NRECORD); + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java new file mode 100644 index 0000000000..373e253ed5 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java @@ -0,0 +1,160 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.fasterxml.jackson.annotation.JsonInclude; + +/** + * Created by nickAS21 on 12.12.22 + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SparkplugTopic { + + /** + * The Sparkplug namespace version. + * For the Sparkplug™ A version of the payload definition, the UTF-8 string constant for the namespace element will be: + * “spAv1.0” + * For the Sparkplug™ B version of the specification, the UTF-8 string constant for the namespace element will be: + * “spBv1.0” + */ + private String namespace; + + /** + * The ID of the logical grouping of Edge of Network (EoN) Nodes and devices. + */ + private String groupId; + + /** + * The ID of the Edge of Network (EoN) Node. + */ + private String edgeNodeId; + + /** + * The ID of the device. + */ + private String deviceId; + + /** + * The message type. + */ + private SparkplugMessageType type; + + /** + * Constructor (device). + * + * @param namespace the namespace. + * @param groupId the group ID. + * @param edgeNodeId the edge node ID. + * @param deviceId the device ID. + * @param type the message type. + */ + public SparkplugTopic(String namespace, String groupId, String edgeNodeId, String deviceId, SparkplugMessageType type) { + super(); + this.namespace = namespace; + this.groupId = groupId; + this.edgeNodeId = edgeNodeId; + this.deviceId = deviceId; + this.type = type; + } + + /** + * Constructor (node). + * + * @param namespace the namespace. + * @param groupId the group ID. + * @param edgeNodeId the edge node ID. + * @param type the message type. + */ + public SparkplugTopic(String namespace, String groupId, String edgeNodeId, SparkplugMessageType type) { + super(); + this.namespace = namespace; + this.groupId = groupId; + this.edgeNodeId = edgeNodeId; + this.deviceId = null; + this.type = type; + } + + /** + * Returns the Sparkplug namespace version. + * + * @return the namespace + */ + public String getNamespace() { + return namespace; + } + + /** + * Returns the ID of the logical grouping of Edge of Network (EoN) Nodes and devices. + * + * @return the group ID + */ + public String getGroupId() { + return groupId; + } + + /** + * Returns the ID of the Edge of Network (EoN) Node. + * + * @return the edge node ID + */ + public String getEdgeNodeId() { + return edgeNodeId; + } + + /** + * Returns the ID of the device. + * + * @return the device ID + */ + public String getDeviceId() { + return deviceId; + } + + /** + * Returns the message type. + * + * @return the message type + */ + public SparkplugMessageType getType() { + return type; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(getNamespace()).append("/") + .append(getGroupId()).append("/") + .append(getType()).append("/") + .append(getEdgeNodeId()); + if (getDeviceId() != null) { + sb.append("/").append(getDeviceId()); + } + return sb.toString(); + } + + /** + * Returns true if this topic's type matches the passes in type, false otherwise. + * + * @param type the type to check + * @return true if this topic's type matches the passes in type, false otherwise + */ + public boolean isType(SparkplugMessageType type) { + return this.type != null && this.type.equals(type); + } + + public boolean isNode() { + return this.deviceId == null; + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java new file mode 100644 index 0000000000..c2e93cb269 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java @@ -0,0 +1,114 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; + +import java.util.HashMap; +import java.util.Map; + +/** + * Provides utility methods for handling Sparkplug MQTT message topics. + */ +public class SparkplugTopicUtil { + + private static final Map SPLIT_TOPIC_CACHE = new HashMap(); + + public static String[] getSplitTopic(String topic) { + String[] splitTopic = SPLIT_TOPIC_CACHE.get(topic); + if (splitTopic == null) { + splitTopic = topic.split("/"); + SPLIT_TOPIC_CACHE.put(topic, splitTopic); + } + + return splitTopic; + } + + /** + * Serializes a {@link SparkplugTopic} instance in to a JSON string. + * + * @param topic a {@link SparkplugTopic} instance + * @return a JSON string + * @throws JsonProcessingException + */ + public static String sparkplugTopicToString(SparkplugTopic topic) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(topic); + } + + /** + * Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance. + * + * @param topic a topic string + * @return a {@link SparkplugTopic} instance + * @throws ThingsboardException if an error occurs while parsing + */ + public static SparkplugTopic parseTopic(String topic) throws ThingsboardException { + topic = topic.indexOf("#") > 0 ? topic.substring(0, topic.indexOf("#")) : topic; + return parseTopic(SparkplugTopicUtil.getSplitTopic(topic)); + } + + /** + * Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance. + * + * @param splitTopic a topic split into tokens + * @return a {@link SparkplugTopic} instance + * @throws Exception if an error occurs while parsing + */ + @SuppressWarnings("incomplete-switch") + public static SparkplugTopic parseTopic(String[] splitTopic) throws ThingsboardException { + SparkplugMessageType type; + String namespace, edgeNodeId, groupId; + int length = splitTopic.length; + + if (length < 4 || length > 5) { + throw new ThingsboardException("Invalid number of topic elements: " + length, ThingsboardErrorCode.INVALID_ARGUMENTS); + } + + namespace = splitTopic[0]; + groupId = splitTopic[1]; + type = SparkplugMessageType.parseMessageType(splitTopic[2]); + edgeNodeId = splitTopic[3]; + + if (length == 4) { + // A node topic + switch (type) { + case STATE: + case NBIRTH: + case NCMD: + case NDATA: + case NDEATH: + case NRECORD: + return new SparkplugTopic(namespace, groupId, edgeNodeId, type); + } + } else { + // A device topic + switch (type) { + case STATE: + case DBIRTH: + case DCMD: + case DDATA: + case DDEATH: + case DRECORD: + return new SparkplugTopic(namespace, groupId, edgeNodeId, splitTopic[4], type); + } + } + throw new ThingsboardException("Invalid number of topic elements " + length + " for topic type " + type, ThingsboardErrorCode.INVALID_ARGUMENTS); + } +} diff --git a/common/transport/mqtt/src/main/proto/sparkplug.proto b/common/transport/mqtt/src/main/proto/sparkplug.proto new file mode 100644 index 0000000000..a06a10325b --- /dev/null +++ b/common/transport/mqtt/src/main/proto/sparkplug.proto @@ -0,0 +1,204 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +import "google/protobuf/any.proto"; + + +option java_package = "org.thingsboard.server.gen.transport.mqtt"; +option java_outer_classname = "SparkplugBProto"; + +message Payload { + /* + // Indexes of Data Types + // Unknown placeholder for future expansion. + Unknown = 0; + // Basic Types + Int8 = 1; + Int16 = 2; + Int32 = 3; + Int64 = 4; + UInt8 = 5; + UInt16 = 6; + UInt32 = 7; + UInt64 = 8; + Float = 9; + Double = 10; + Boolean = 11; + String = 12; + DateTime = 13; + Text = 14; + // Additional Metric Types + UUID = 15; + DataSet = 16; + Bytes = 17; + File = 18; + Template = 19; + + // Additional PropertyValue Types + PropertySet = 20; + PropertySetList = 21; + */ + + message Template { + + message Parameter { + optional string name = 1; + optional uint32 type = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + ParameterValueExtension extension_value = 9; + } + + message ParameterValueExtension { + google.protobuf.Any extensions = 1; + } + } + + optional string version = 1; // The version of the Template to prevent mismatches + repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value + repeated Parameter parameters = 3; + optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance + optional bool is_definition = 5; + google.protobuf.Any extensions = 6; + } + + message DataSet { + + message DataSetValue { + + oneof value { + uint32 int_value = 1; + uint64 long_value = 2; + float float_value = 3; + double double_value = 4; + bool boolean_value = 5; + string string_value = 6; + DataSetValueExtension extension_value = 7; + } + + message DataSetValueExtension { + google.protobuf.Any extensions = 1; + } + } + + message Row { + repeated DataSetValue elements = 1; + google.protobuf.Any extensions = 2; // For third party extensions + } + + optional uint64 num_of_columns = 1; + repeated string columns = 2; + repeated uint32 types = 3; + repeated Row rows = 4; + google.protobuf.Any extensions = 5; // For third party extensions + } + + message PropertyValue { + + optional uint32 type = 1; + optional bool is_null = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + PropertySet propertyset_value = 9; + PropertySetList propertysets_value = 10; // List of Property Values + PropertyValueExtension extension_value = 11; + } + + message PropertyValueExtension { + google.protobuf.Any extensions = 1; + } + } + + message PropertySet { + repeated string keys = 1; // Names of the properties + repeated PropertyValue values = 2; + google.protobuf.Any extensions = 3; + } + + message PropertySetList { + repeated PropertySet propertyset = 1; + google.protobuf.Any extensions = 2; + } + + message MetaData { + // Bytes specific metadata + optional bool is_multi_part = 1; + + // General metadata + optional string content_type = 2; // Content/Media type + optional uint64 size = 3; // File size, String size, Multi-part size, etc + optional uint64 seq = 4; // Sequence number for multi-part messages + + // File metadata + optional string file_name = 5; // File name + optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) + optional string md5 = 7; // md5 of data + + // Catchalls and future expansion + optional string description = 8; // Could be anything such as json or xml of custom properties + google.protobuf.Any extensions = 9; + } + + message Metric { + + optional string name = 1; // Metric name - should only be included on birth + optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages + optional uint64 timestamp = 3; // Timestamp associated with data acquisition time + optional uint32 datatype = 4; // DataType of the metric/tag value + optional bool is_historical = 5; // If this is historical data and should not update real time tag + optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag + optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. + optional MetaData metadata = 8; // Metadata for the payload + optional PropertySet properties = 9; + + oneof value { + uint32 int_value = 10; + uint64 long_value = 11; + float float_value = 12; + double double_value = 13; + bool boolean_value = 14; + string string_value = 15; + bytes bytes_value = 16; // Bytes, File + DataSet dataset_value = 17; + Template template_value = 18; + MetricValueExtension extension_value = 19; + } + + message MetricValueExtension { + google.protobuf.Any extensions = 1; + } + } + + optional uint64 timestamp = 1; // Timestamp at message sending time + repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs + optional uint64 seq = 3; // Sequence number + optional string uuid = 4; // UUID to track message type in terms of schema definitions + optional bytes body = 5; // To optionally bypass the whole definition above + google.protobuf.Any extensions = 6; +} \ No newline at end of file diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index bf03122b18..4300747238 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -301,8 +301,8 @@ public class DefaultTransportService implements TransportService { @Override public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) { - TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); try { TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); return response.getValue().getEntityProfileResponseMsg(); @@ -313,8 +313,8 @@ public class DefaultTransportService implements TransportService { @Override public List getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg) { - TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build()); + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build()); try { TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); return response.getValue().getGetQueueRoutingInfoResponseMsgsList(); @@ -325,8 +325,8 @@ public class DefaultTransportService implements TransportService { @Override public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) { - TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build()); + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build()); try { TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); return response.getValue().getResourceResponseMsg(); @@ -337,8 +337,8 @@ public class DefaultTransportService implements TransportService { @Override public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(TransportProtos.GetSnmpDevicesRequestMsg requestMsg) { - TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>( - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() + TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>( + UUID.randomUUID(), TransportApiRequestMsg.newBuilder() .setSnmpDevicesRequestMsg(requestMsg) .build() ); @@ -354,7 +354,7 @@ public class DefaultTransportService implements TransportService { @Override public TransportProtos.GetDeviceResponseMsg getDevice(TransportProtos.GetDeviceRequestMsg requestMsg) { TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>( - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() + UUID.randomUUID(), TransportApiRequestMsg.newBuilder() .setDeviceRequestMsg(requestMsg) .build() ); @@ -374,7 +374,7 @@ public class DefaultTransportService implements TransportService { @Override public TransportProtos.GetDeviceCredentialsResponseMsg getDeviceCredentials(TransportProtos.GetDeviceCredentialsRequestMsg requestMsg) { TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>( - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() + UUID.randomUUID(), TransportApiRequestMsg.newBuilder() .setDeviceCredentialsRequestMsg(requestMsg) .build() ); @@ -720,8 +720,8 @@ public class DefaultTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetOtaPackageRequestMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { - TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build()); + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build()); AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> { callback.onSuccess(response.getValue().getOtaPackageResponseMsg()); @@ -864,7 +864,7 @@ public class DefaultTransportService implements TransportService { } } - protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) { + protected void processToTransportMsg(ToTransportMsg toSessionMsg) { UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); SessionMetaData md = sessions.get(sessionId); if (md != null) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 469ae1034d..1ec98aac9f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -20,6 +20,8 @@ import lombok.Getter; import lombok.Setter; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -81,4 +83,14 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public void setDisconnected() { this.connected = false; } + + public boolean isSparkplug() { + DeviceProfileTransportConfiguration transportConfiguration = this.deviceProfile.getProfileData().getTransportConfiguration(); + if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) { + return ((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug(); + } else { + return false; + } + } + }