Browse Source

sparkplug: comments

pull/8084/head
nickAS21 3 years ago
parent
commit
1684b924e8
  1. 4
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java
  2. 13
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  3. 7
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java
  4. 25
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java
  5. 2
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java

4
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java

@ -40,8 +40,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
/**

13
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -117,7 +117,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish;
@ -401,9 +401,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// TODO
break;
case NBIRTH:
sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList());
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
break;
case NCMD:
case NDATA:
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
@ -813,15 +810,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
// transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
// attrSubTopicType = TopicType.V1;
// mqttQoSMap.put(new MqttTopicMatcher(MqttTopics.DEVICE_ATTRIBUTES_TOPIC), getMinSupportedQos(reqQoS));
registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, reqQoS);
// transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
// rpcSubTopicType = TopicType.V2;
// mqttQoSMap.put(new MqttTopicMatcher(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC), getMinSupportedQos(reqQoS));
// mqttQoSMap.put(new MqttTopicMatcher(MqttTopics.GATEWAY_RPC_TOPIC), getMinSupportedQos(reqQoS));
// grantedQoSList.add(getMinSupportedQos(reqQoS));
}
public void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {

7
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java

@ -54,7 +54,7 @@ import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -76,7 +76,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
/**
* Created by ashvayka on 19.01.17.
@ -734,11 +734,10 @@ public abstract class AbstractGatewaySessionHandler {
}
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
System.out.println("Removed device " + deviceName + " from the gateway session");
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
}
public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugMessageTypeSate typeSate, long ts) {
public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState typeSate, long ts) {
TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder();
keyValueProtoBuilder.setKey(STATE.name());
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);

25
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -55,7 +55,7 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric;
@ -104,16 +104,19 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
checkDeviceName(deviceName);
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = topic.isNode() ?
Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName);
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
try {
try {
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
// add Msg Telemetry: key STATE type: String value: ONLINE ts: sparkplugBProto.getTimestamp()
sendSparkplugStateOnTelemetry(contextListenableFuture.get().getSessionInfo(), deviceName, ONLINE,
sparkplugBProto.getTimestamp());
}
if (topic.isType(NBIRTH)) {
setNodeBirthMetrics(sparkplugBProto.getMetricsList());
} else if (topic.isType(DBIRTH)) {
contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed add Metrics. MessageType *BIRTH.", e);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed add Metrics or change SparkplugConnectionState. MessageType *BIRTH.", e);
}
Set<String> attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx
.getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames();
@ -220,7 +223,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set<String> attributesMetricNames, String topicTypeName) throws AdaptorException {
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set<String> attributesMetricNames, String topicTypeName) throws AdaptorException {
try {
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {

2
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java → common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java

@ -15,7 +15,7 @@
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
public enum SparkplugMessageTypeSate {
public enum SparkplugConnectionState {
/**
* The EoN node should examine the payload of this
* message to ensure that it is a value of ONLINE
Loading…
Cancel
Save