From 1684b924e833c5ce648ff48443065168e669c832 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 16 Feb 2023 11:28:48 +0200 Subject: [PATCH] sparkplug: comments --- ...ctMqttV5ClientSparkplugConnectionTest.java | 4 +-- .../transport/mqtt/MqttTransportHandler.java | 13 +--------- .../AbstractGatewaySessionHandler.java | 7 +++--- .../session/SparkplugNodeSessionHandler.java | 25 +++++++++++-------- ...ate.java => SparkplugConnectionState.java} | 2 +- 5 files changed, 21 insertions(+), 30 deletions(-) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/{SparkplugMessageTypeSate.java => SparkplugConnectionState.java} (96%) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index ebf2c7248e..abe562d4c4 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -40,8 +40,8 @@ import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.ONLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; /** diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index c930fd460b..0e4fee8f42 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -117,7 +117,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish; @@ -401,9 +401,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement // TODO break; case NBIRTH: - sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList()); - sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); - break; case NCMD: case NDATA: sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); @@ -813,15 +810,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) .build(), null); -// transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); -// attrSubTopicType = TopicType.V1; -// mqttQoSMap.put(new MqttTopicMatcher(MqttTopics.DEVICE_ATTRIBUTES_TOPIC), getMinSupportedQos(reqQoS)); registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, reqQoS); -// transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); -// rpcSubTopicType = TopicType.V2; -// mqttQoSMap.put(new MqttTopicMatcher(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC), getMinSupportedQos(reqQoS)); -// mqttQoSMap.put(new MqttTopicMatcher(MqttTopics.GATEWAY_RPC_TOPIC), getMinSupportedQos(reqQoS)); -// grantedQoSList.add(getMinSupportedQos(reqQoS)); } public void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 291da6564a..bc3f4e5152 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -54,7 +54,7 @@ import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.util.ReturnCode; -import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import javax.annotation.Nullable; import java.util.ArrayList; @@ -76,7 +76,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; /** * Created by ashvayka on 19.01.17. @@ -734,11 +734,10 @@ public abstract class AbstractGatewaySessionHandler { } transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); - System.out.println("Removed device " + deviceName + " from the gateway session"); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); } - public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugMessageTypeSate typeSate, long ts) { + public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState typeSate, long ts) { TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder(); keyValueProtoBuilder.setKey(STATE.name()); keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 830d4b462d..1350ad8499 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2022 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -55,7 +55,7 @@ import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.ONLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric; @@ -104,16 +104,19 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { checkDeviceName(deviceName); ListenableFuture contextListenableFuture = topic.isNode() ? Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName); - - if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { - try { + try { + if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { // add Msg Telemetry: key STATE type: String value: ONLINE ts: sparkplugBProto.getTimestamp() sendSparkplugStateOnTelemetry(contextListenableFuture.get().getSessionInfo(), deviceName, ONLINE, sparkplugBProto.getTimestamp()); + } + if (topic.isType(NBIRTH)) { + setNodeBirthMetrics(sparkplugBProto.getMetricsList()); + } else if (topic.isType(DBIRTH)) { contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList()); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed add Metrics. MessageType *BIRTH.", e); } + } catch (InterruptedException | ExecutionException e) { + log.error("Failed add Metrics or change SparkplugConnectionState. MessageType *BIRTH.", e); } Set attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx .getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames(); @@ -220,7 +223,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - private List convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set attributesMetricNames, String topicTypeName) throws AdaptorException { + private List convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set attributesMetricNames, String topicTypeName) throws AdaptorException { try { List msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java similarity index 96% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java index f9a18adbd3..daf2312e36 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.transport.mqtt.util.sparkplug; -public enum SparkplugMessageTypeSate { +public enum SparkplugConnectionState { /** * The EoN node should examine the payload of this * message to ensure that it is a value of “ONLINE”