Browse Source

sparkplug: connection

pull/7763/head
nickAS21 4 years ago
parent
commit
1bf02b75f2
  1. 76
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  2. 14
      common/cluster-api/src/main/proto/queue.proto
  3. 1
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  4. 72
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  5. 288
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java
  6. 175
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java
  7. 371
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java
  8. 545
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java
  9. 17414
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java
  10. 107
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java
  11. 37
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadDecoder.java
  12. 39
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadEncoder.java
  13. 113
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java
  14. 129
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DataSetDeserializer.java
  15. 38
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModifier.java
  16. 39
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModule.java
  17. 52
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/FileSerializer.java
  18. 70
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/JsonValidator.java
  19. 70
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/MetricDeserializer.java
  20. 234
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSet.java
  21. 117
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSetDataType.java
  22. 64
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/File.java
  23. 267
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetaData.java
  24. 320
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Metric.java
  25. 138
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetricDataType.java
  26. 107
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Parameter.java
  27. 125
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/ParameterDataType.java
  28. 134
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyDataType.java
  29. 169
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertySet.java
  30. 86
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyValue.java
  31. 93
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Row.java
  32. 161
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugTopic.java
  33. 53
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugValue.java
  34. 202
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Template.java
  35. 5
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  36. 29
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/GetOrCreateDeviceFromSparkplugResponse.java
  37. 48
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

76
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -65,7 +65,6 @@ import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceProvisionService;
import org.thingsboard.server.dao.device.DeviceService;
@ -95,6 +94,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@ -161,6 +161,8 @@ public class DefaultTransportApiService implements TransportApiService {
result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
} else if (transportApiRequestMsg.hasGetOrCreateDeviceSparlplugRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceSparlplugRequestMsg());
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
result = handle(transportApiRequestMsg.getEntityProfileRequestMsg());
} else if (transportApiRequestMsg.hasLwM2MRequestMsg()) {
@ -345,6 +347,78 @@ public class DefaultTransportApiService implements TransportApiService {
}, dbCallbackExecutorService);
}
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg requestMsg) {
DeviceId sparkplugNodeId = new DeviceId(new UUID(requestMsg.getSparkplugIdMSB(), requestMsg.getSparkplugIdLSB()));
ListenableFuture<Device> sparkplugNodeFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, sparkplugNodeId);
return Futures.transform(sparkplugNodeFuture, sparkplugNode -> {
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
deviceCreationLock.lock();
try {
Device device = deviceService.findDeviceByTenantIdAndName(sparkplugNode.getTenantId(), requestMsg.getDeviceName());
if (device == null) {
TenantId tenantId = sparkplugNode.getTenantId();
device = new Device();
device.setTenantId(tenantId);
device.setName(requestMsg.getDeviceName());
device.setType(requestMsg.getDeviceType());
device.setCustomerId(sparkplugNode.getCustomerId());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(sparkplugNode.getTenantId(), requestMsg.getDeviceType());
device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, sparkplugNodeId.toString());
device.setAdditionalInfo(additionalInfo);
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, null);
device = savedDevice;
relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(sparkplugNode.getId(), device.getId(), "Created"));
TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = sparkplugNode.getCustomerId();
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("sparkplugNodeId", sparkplugNodeId.toString());
DeviceId deviceId = device.getId();
ObjectNode entityNode = mapper.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
} else {
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_SPARKPLUG)
|| !sparkplugNodeId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_SPARKPLUG).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_SPARKPLUG, sparkplugNodeId.toString());
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device);
}
}
TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.Builder builder =
TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceSparkResponseMsg(builder.build())
.build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by sparkplug Node id and name: [{}]", sparkplugNodeId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} finally {
deviceCreationLock.unlock();
}
}, dbCallbackExecutorService);
}
private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) {
ListenableFuture<ProvisionResponse> provisionResponseFuture = null;
try {

14
common/cluster-api/src/main/proto/queue.proto

@ -193,6 +193,18 @@ message GetOrCreateDeviceFromGatewayResponseMsg {
bytes profileBody = 2;
}
message GetOrCreateDeviceFromSparkplugRequestMsg {
string deviceName = 1;
string deviceType = 2;
int64 sparkplugIdMSB = 3;
int64 sparkplugIdLSB = 4;
}
message GetOrCreateDeviceFromSparkplugResponseMsg {
DeviceInfoProto deviceInfo = 1;
bytes profileBody = 2;
}
message GetEntityProfileRequestMsg {
string entityType = 1;
int64 entityIdMSB = 2;
@ -897,6 +909,7 @@ message TransportApiRequestMsg {
GetDeviceRequestMsg deviceRequestMsg = 12;
GetDeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 13;
GetAllQueueRoutingInfoRequestMsg getAllQueueRoutingInfoRequestMsg = 14;
GetOrCreateDeviceFromSparkplugRequestMsg getOrCreateDeviceSparlplugRequestMsg = 15;
}
/* Response from ThingsBoard Core Service to Transport Service */
@ -912,6 +925,7 @@ message TransportApiResponseMsg {
GetDeviceResponseMsg deviceResponseMsg = 9;
GetDeviceCredentialsResponseMsg deviceCredentialsResponseMsg = 10;
repeated GetQueueRoutingInfoResponseMsg getQueueRoutingInfoResponseMsgs = 11;
GetOrCreateDeviceFromSparkplugResponseMsg getOrCreateDeviceSparkResponseMsg = 12;
}
/* Messages that are handled by ThingsBoard Core Service */

1
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -118,4 +118,5 @@ public class DataConstants {
public static final String MSG_SOURCE_KEY = "source";
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";
public static final String LAST_CONNECTED_SPARKPLUG = "lastConnectedSparkplug";
}

72
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -41,12 +41,13 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.OtaPackageId;
@ -72,6 +73,8 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugTopic;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.IOException;
@ -103,6 +106,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic;
/**
* @author Andrew Shvayka
@ -128,6 +132,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
final DeviceSessionCtx deviceSessionCtx;
volatile InetSocketAddress address;
volatile GatewaySessionHandler gatewaySessionHandler;
volatile SparkplugNodeSessionHandler sparkPlugSessionHandler;
private final ConcurrentHashMap<String, String> otaPackSessions;
private final ConcurrentHashMap<String, Integer> chunkSizes;
@ -325,6 +330,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
}
} else if (sparkPlugSessionHandler != null) {
try {
SparkplugTopic sparkplugTopic = parseTopic(topicName);
log.error("Publish [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
handleSparkplugPublishMsg(ctx, sparkplugTopic, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
} catch (Exception e) {
e.printStackTrace();
}
} else {
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
@ -366,6 +380,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, SparkplugTopic sparkplugTopic, int msgId, MqttPublishMessage mqttMsg) {
String topicName = sparkplugTopic.toString();
try {
switch(sparkplugTopic.getType()) {
case NBIRTH:
// TODO regular publish
// sparkPlugSessionHandler.onNodeConnectProto(mqttMsg);
break;
case DBIRTH:
sparkPlugSessionHandler.onDeviceConnectProto(mqttMsg);
break;
default:
}
} catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ctx.close();
} catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId);
}
}
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
Matcher fwMatcher;
@ -617,6 +655,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
try {
SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName());
log.error("Subscribe [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
} catch (Exception e) {
e.printStackTrace();
}
List<Integer> grantedQoSList = new ArrayList<>();
boolean activityReported = false;
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
@ -953,6 +998,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void checkSparkPlugConnected(SessionMetaData sessionMetaData, MqttConnectMessage connectMessage) {
if (((MqttDeviceProfileTransportConfiguration) deviceSessionCtx.getDeviceProfile().getProfileData().getTransportConfiguration())
.isSparkPlug()) {
TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
try {
JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
if (infoNode != null) {
SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic());
if (sparkPlugSessionHandler == null) {
log.error("Connected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
sparkPlugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString());
} else {
log.error("ReConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
}
if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
}
}
} catch (Exception e) {
log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, device.getDeviceName(), e);
}
}
}
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
log.trace("[{}] Channel closed!", sessionId);
@ -988,6 +1057,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onSuccess(Void msg) {
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
checkGatewaySession(sessionMetaData);
checkSparkPlugConnected(sessionMetaData, connectMessage);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true);
log.debug("[{}] Client connected!", sessionId);

288
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java

@ -0,0 +1,288 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import javax.annotation.Nullable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic;
/**
* Created by nickAS21 on 12.12.22
*/
@Slf4j
public class SparkplugNodeSessionHandler {
private static final String DEFAULT_DEVICE_TYPE = "default";
private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
private static final String DEVICE_PROPERTY = "device";
private final MqttTransportContext context;
private final TransportService transportService;
private final TransportDeviceInfo nodeSparkplugInfo;
private final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, SparkplugSessionCtx> devices = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ListenableFuture<SparkplugSessionCtx>> deviceFutures = new ConcurrentHashMap<>();
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final ChannelHandlerContext channel;
private final DeviceSessionCtx deviceSessionCtx;
private String nodeTopic;
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, String nodeTopic) {
this.context = deviceSessionCtx.getContext();
this.transportService = context.getTransportService();
this.deviceSessionCtx = deviceSessionCtx;
this.nodeSparkplugInfo = deviceSessionCtx.getDeviceInfo();
this.sessionId = sessionId;
this.deviceCreationLockMap = createWeakMap();
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
this.channel = deviceSessionCtx.getChannel();
this.nodeTopic = nodeTopic;
}
ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
return new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
}
public String getNodeId() {
return context.getNodeId();
}
public UUID getSessionId() {
return sessionId;
}
public String getNodeTopic() {
return nodeTopic;
}
public int nextMsgId() {
return deviceSessionCtx.nextMsgId();
}
public void deregisterSession(String deviceName) {
SparkplugSessionCtx deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
deregisterSession(deviceName, deviceSessionCtx);
} else {
log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
}
}
private void deregisterSession(String deviceName, SparkplugSessionCtx deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
}
public void onDeviceDeleted(String deviceName) {
deregisterSession(deviceName);
}
private int getMsgId(MqttPublishMessage mqttMsg) {
return mqttMsg.variableHeader().packetId();
}
public void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId();
String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType();
processOnConnect(mqttMsg, deviceName, deviceType);
} catch (Exception e) {
throw new AdaptorException(e);
}
}
private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
processOnDisconnect(mqttMsg, deviceName);
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
}
}
private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg);
}
private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
return JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
}
private byte[] getBytes(ByteBuf payload) {
return ProtoMqttAdaptor.toBytes(payload);
}
private void ack(MqttPublishMessage msg) {
int msgId = getMsgId(msg);
if (msgId > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId));
}
}
ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
return channel.writeAndFlush(mqttMessage);
}
private String checkDeviceName(String deviceName) {
if (StringUtils.isEmpty(deviceName)) {
throw new RuntimeException("Device name is empty!");
} else {
return deviceName;
}
}
private String getDeviceName(JsonElement json) {
return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
}
private String getDeviceType(JsonElement json) {
JsonElement type = json.getAsJsonObject().get("type");
return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
}
private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable SparkplugSessionCtx result) {
ack(msg);
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
}
private ListenableFuture<SparkplugSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
SparkplugSessionCtx result = devices.get(deviceName);
if (result == null) {
Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
deviceCreationLock.lock();
try {
result = devices.get(deviceName);
if (result == null) {
return getDeviceCreationFuture(deviceName, deviceType);
} else {
return Futures.immediateFuture(result);
}
} finally {
deviceCreationLock.unlock();
}
} else {
return Futures.immediateFuture(result);
}
}
private ListenableFuture<SparkplugSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
final SettableFuture<SparkplugSessionCtx> futureToSet = SettableFuture.create();
ListenableFuture<SparkplugSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
if (future != null) {
return future;
}
try {
transportService.process(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setSparkplugIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits())
.setSparkplugIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits())
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromSparkplugResponse msg) {
if (msg.getDeviceInfo() == null) {
System.out.println("DeviceInfo == null");
}
SparkplugSessionCtx nodeSparkplugSessionCtx = new SparkplugSessionCtx(SparkplugNodeSessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
if (devices.putIfAbsent(deviceName, nodeSparkplugSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = nodeSparkplugSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, nodeSparkplugSessionCtx);
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo)
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
}
futureToSet.set(devices.get(deviceName));
deviceFutures.remove(deviceName);
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
futureToSet.setException(e);
deviceFutures.remove(deviceName);
}
});
return futureToSet;
} catch (Throwable e) {
deviceFutures.remove(deviceName);
throw e;
}
}
}

175
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java

@ -0,0 +1,175 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
/**
* Created by nickAS21 on 13.12.22
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SparkplugBPayload {
private Date timestamp;
private List<Metric> metrics;
private long seq = -1;
private String uuid;
private byte[] body;
public SparkplugBPayload() {};
public SparkplugBPayload(Date timestamp, List<Metric> metrics, long seq, String uuid, byte[] body) {
this.timestamp = timestamp;
this.metrics = metrics;
this.seq = seq;
this.uuid = uuid;
this.body = body;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public void addMetric(Metric metric) {
metrics.add(metric);
}
public void addMetric(int index, Metric metric) {
metrics.add(index, metric);
}
public void addMetrics(List<Metric> metrics) {
this.metrics.addAll(metrics);
}
public Metric removeMetric(int index) {
return metrics.remove(index);
}
public boolean removeMetric(Metric metric) {
return metrics.remove(metric);
}
public List<Metric> getMetrics() {
return metrics;
}
@JsonIgnore
public Integer getMetricCount() {
return metrics.size();
}
public void setMetrics(List<Metric> metrics) {
this.metrics = metrics;
}
public long getSeq() {
return seq;
}
public void setSeq(long seq) {
this.seq = seq;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
@Override
public String toString() {
return "SparkplugBPayload [timestamp=" + timestamp + ", metrics=" + metrics + ", seq=" + seq + ", uuid=" + uuid
+ ", body=" + Arrays.toString(body) + "]";
}
/**
* A builder for creating a {@link SparkplugBPayload} instance.
*/
public static class SparkplugBPayloadBuilder {
private Date timestamp;
private List<Metric> metrics;
private long seq = -1;
private String uuid;
private byte[] body;
public SparkplugBPayloadBuilder(long sequenceNumber) {
this.seq = sequenceNumber;
metrics = new ArrayList<Metric>();
}
public SparkplugBPayloadBuilder() {
metrics = new ArrayList<Metric>();
}
public SparkplugBPayloadBuilder addMetric(Metric metric) {
this.metrics.add(metric);
return this;
}
public SparkplugBPayloadBuilder addMetrics(Collection<Metric> metrics) {
this.metrics.addAll(metrics);
return this;
}
public SparkplugBPayloadBuilder setTimestamp(Date timestamp) {
this.timestamp = timestamp;
return this;
}
public SparkplugBPayloadBuilder setSeq(long seq) {
this.seq = seq;
return this;
}
public SparkplugBPayloadBuilder setUuid(String uuid) {
this.uuid = uuid;
return this;
}
public SparkplugBPayloadBuilder setBody(byte[] body) {
this.body = body;
return this;
}
public SparkplugBPayload createPayload() {
return new SparkplugBPayload(timestamp, metrics, seq, uuid, body);
}
}
}

371
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java

@ -0,0 +1,371 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* Created by nickAS21 on 13.12.22
*/
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A {@link SparkplugPayloadDecoder} implementation for decoding Sparkplug B payloads.
*/
@Slf4j
public class SparkplugBPayloadDecoder implements SparkplugPayloadDecoder<SparkplugBPayload> {
public SparkplugBPayloadDecoder() {
super();
}
public SparkplugBPayload buildFromByteArray(byte[] bytes) throws Exception {
SparkplugBProto.Payload protoPayload = SparkplugBProto.Payload.parseFrom(bytes);
SparkplugBPayload.SparkplugBPayloadBuilder builder = new SparkplugBPayload.SparkplugBPayloadBuilder(protoPayload.getSeq());
// Set the timestamp
if (protoPayload.hasTimestamp()) {
builder.setTimestamp(new Date(protoPayload.getTimestamp()));
}
// Set the sequence number
if (protoPayload.hasSeq()) {
builder.setSeq(protoPayload.getSeq());
}
// Set the Metrics
for (SparkplugBProto.Payload.Metric protoMetric : protoPayload.getMetricsList()) {
builder.addMetric(convertMetric(protoMetric));
}
// Set the body
if (protoPayload.hasBody()) {
builder.setBody(protoPayload.getBody().toByteArray());
}
// Set the body
if (protoPayload.hasUuid()) {
builder.setUuid(protoPayload.getUuid());
}
return builder.createPayload();
}
private Metric convertMetric(SparkplugBProto.Payload.Metric protoMetric) throws Exception {
// Convert the dataType
MetricDataType dataType = MetricDataType.fromInteger((protoMetric.getDatatype()));
// Build and return the Metric
return new Metric.MetricBuilder(protoMetric.getName(), dataType, getMetricValue(protoMetric))
.isHistorical(
protoMetric.hasIsHistorical() ? protoMetric.getIsHistorical() : null)
.isTransient(protoMetric
.hasIsTransient() ? protoMetric.getIsTransient() : null)
.timestamp(protoMetric.hasTimestamp() ? new Date(protoMetric.getTimestamp()) : null)
.alias(protoMetric.hasAlias() ? protoMetric.getAlias() : null)
.metaData(protoMetric.hasMetadata()
? new MetaData.MetaDataBuilder().contentType(protoMetric.getMetadata().getContentType())
.size(protoMetric.getMetadata().getSize()).seq(protoMetric.getMetadata().getSeq())
.fileName(protoMetric.getMetadata().getFileName())
.fileType(protoMetric.getMetadata().getFileType())
.md5(protoMetric.getMetadata().getMd5())
.description(protoMetric.getMetadata().getDescription()).createMetaData()
: null)
.properties(protoMetric.hasProperties()
? new PropertySet.PropertySetBuilder().addProperties(convertProperties(protoMetric.getProperties()))
.createPropertySet()
: null)
.createMetric();
}
private Map<String, PropertyValue> convertProperties(SparkplugBProto.Payload.PropertySet decodedPropSet)
throws Exception {
Map<String, PropertyValue> map = new HashMap<String, PropertyValue>();
List<String> keys = decodedPropSet.getKeysList();
List<SparkplugBProto.Payload.PropertyValue> values = decodedPropSet.getValuesList();
for (int i = 0; i < keys.size(); i++) {
SparkplugBProto.Payload.PropertyValue value = values.get(i);
map.put(keys.get(i),
new PropertyValue(PropertyDataType.fromInteger(value.getType()), getPropertyValue(value)));
}
return map;
}
private Object getPropertyValue(SparkplugBProto.Payload.PropertyValue value) throws Exception {
PropertyDataType type = PropertyDataType.fromInteger(value.getType());
if (value.getIsNull()) {
return null;
}
switch (type) {
case Boolean:
return value.getBooleanValue();
case DateTime:
return new Date(value.getLongValue());
case Float:
return value.getFloatValue();
case Double:
return value.getDoubleValue();
case Int8:
return (byte) value.getIntValue();
case Int16:
case UInt8:
return (short) value.getIntValue();
case Int32:
case UInt16:
return value.getIntValue();
case UInt32:
case Int64:
return value.getLongValue();
case UInt64:
return BigInteger.valueOf(value.getLongValue());
case String:
case Text:
return value.getStringValue();
case PropertySet:
return new PropertySet.PropertySetBuilder().addProperties(convertProperties(value.getPropertysetValue()))
.createPropertySet();
case PropertySetList:
List<PropertySet> propertySetList = new ArrayList<PropertySet>();
List<SparkplugBProto.Payload.PropertySet> list = value.getPropertysetsValue().getPropertysetList();
for (SparkplugBProto.Payload.PropertySet decodedPropSet : list) {
propertySetList.add(new PropertySet.PropertySetBuilder().addProperties(convertProperties(decodedPropSet))
.createPropertySet());
}
return propertySetList;
case Unknown:
default:
throw new Exception("Failed to decode: Unknown Property Data Type " + type);
}
}
private Object getMetricValue(SparkplugBProto.Payload.Metric protoMetric) throws Exception {
// Check if the null flag has been set indicating that the value is null
if (protoMetric.getIsNull()) {
return null;
}
// Otherwise convert the value based on the type
int metricType = protoMetric.getDatatype();
switch (MetricDataType.fromInteger(metricType)) {
case Boolean:
return protoMetric.getBooleanValue();
case DateTime:
return new Date(protoMetric.getLongValue());
case File:
String filename = protoMetric.getMetadata().getFileName();
byte[] fileBytes = protoMetric.getBytesValue().toByteArray();
return new File(filename, fileBytes);
case Float:
return protoMetric.getFloatValue();
case Double:
return protoMetric.getDoubleValue();
case Int8:
return (byte) protoMetric.getIntValue();
case Int16:
case UInt8:
return (short) protoMetric.getIntValue();
case Int32:
case UInt16:
return protoMetric.getIntValue();
case UInt32:
case Int64:
return protoMetric.getLongValue();
case UInt64:
return BigInteger.valueOf(protoMetric.getLongValue());
case String:
case Text:
case UUID:
return protoMetric.getStringValue();
case Bytes:
return protoMetric.getBytesValue().toByteArray();
case DataSet:
SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue();
// Build the and create the DataSet
return new DataSet.DataSetBuilder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList())
.addTypes(convertDataSetDataTypes(protoDataSet.getTypesList()))
.addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList()))
.createDataSet();
case Template:
SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue();
List<Metric> metrics = new ArrayList<Metric>();
List<Parameter> parameters = new ArrayList<Parameter>();
for (SparkplugBProto.Payload.Template.Parameter protoParameter : protoTemplate.getParametersList()) {
String name = protoParameter.getName();
ParameterDataType type = ParameterDataType.fromInteger(protoParameter.getType());
Object value = getParameterValue(protoParameter);
if (log.isTraceEnabled()) {
log.trace("Setting template parameter name: " + name + ", type: " + type + ", value: "
+ value + ", valueType" + value.getClass());
}
parameters.add(new Parameter(name, type, value));
}
for (SparkplugBProto.Payload.Metric protoTemplateMetric : protoTemplate.getMetricsList()) {
Metric templateMetric = convertMetric(protoTemplateMetric);
if (log.isTraceEnabled()) {
log.trace("Setting template parameter name: " + templateMetric.getName() + ", type: "
+ templateMetric.getDataType() + ", value: " + templateMetric.getValue());
}
metrics.add(templateMetric);
}
Template template = new Template.TemplateBuilder().version(protoTemplate.getVersion())
.templateRef(protoTemplate.getTemplateRef()).definition(protoTemplate.getIsDefinition())
.addMetrics(metrics).addParameters(parameters).createTemplate();
if (log.isTraceEnabled()) {
log.trace(
"Setting template - name: " + protoMetric.getName() + ", version: " + template.getVersion()
+ ", ref: " + template.getTemplateRef() + ", isDef: " + template.isDefinition()
+ ", metrics: " + metrics.size() + ", params: " + parameters.size());
}
return template;
case Unknown:
default:
throw new Exception("Failed to decode: Unknown Metric DataType " + metricType);
}
}
private Collection<Row> convertDataSetRows(List<SparkplugBProto.Payload.DataSet.Row> protoRows,
List<Integer> protoTypes) throws Exception {
Collection<Row> rows = new ArrayList<Row>();
if (protoRows != null) {
for (SparkplugBProto.Payload.DataSet.Row protoRow : protoRows) {
List<SparkplugBProto.Payload.DataSet.DataSetValue> protoValues = protoRow.getElementsList();
List<SparkplugValue<?>> values = new ArrayList<SparkplugValue<?>>();
for (int index = 0; index < protoRow.getElementsCount(); index++) {
values.add(convertDataSetValue(protoTypes.get(index), protoValues.get(index)));
}
// Add the values to the row and the row to the rows
rows.add(new Row.RowBuilder().addValues(values).createRow());
}
}
return rows;
}
private Collection<DataSetDataType> convertDataSetDataTypes(List<Integer> protoTypes) {
List<DataSetDataType> types = new ArrayList<DataSetDataType>();
// Build up a List of column types
for (int type : protoTypes) {
types.add(DataSetDataType.fromInteger(type));
}
return types;
}
private Object getParameterValue(SparkplugBProto.Payload.Template.Parameter protoParameter) throws Exception {
// Otherwise convert the value based on the type
int type = protoParameter.getType();
switch (MetricDataType.fromInteger(type)) {
case Boolean:
return protoParameter.getBooleanValue();
case DateTime:
return new Date(protoParameter.getLongValue());
case Float:
return protoParameter.getFloatValue();
case Double:
return protoParameter.getDoubleValue();
case Int8:
return (byte) protoParameter.getIntValue();
case Int16:
case UInt8:
return (short) protoParameter.getIntValue();
case Int32:
case UInt16:
return protoParameter.getIntValue();
case UInt32:
case Int64:
return protoParameter.getLongValue();
case UInt64:
return BigInteger.valueOf(protoParameter.getLongValue());
case String:
case Text:
return protoParameter.getStringValue();
case Unknown:
default:
throw new Exception("Failed to decode: Unknown Parameter Type " + type);
}
}
private SparkplugValue<?> convertDataSetValue(int protoType, SparkplugBProto.Payload.DataSet.DataSetValue protoValue)
throws Exception {
DataSetDataType type = DataSetDataType.fromInteger(protoType);
switch (type) {
case Boolean:
return new SparkplugValue<Boolean>(type, protoValue.getBooleanValue());
case DateTime:
// FIXME - remove after is_null is supported for dataset values
if (protoValue.getLongValue() == -9223372036854775808L) {
return new SparkplugValue<Date>(type, null);
} else {
return new SparkplugValue<Date>(type, new Date(protoValue.getLongValue()));
}
case Float:
return new SparkplugValue<Float>(type, protoValue.getFloatValue());
case Double:
return new SparkplugValue<Double>(type, protoValue.getDoubleValue());
case Int8:
return new SparkplugValue<Byte>(type, (byte) protoValue.getIntValue());
case UInt8:
case Int16:
return new SparkplugValue<Short>(type, (short) protoValue.getIntValue());
case UInt16:
case Int32:
return new SparkplugValue<Integer>(type, protoValue.getIntValue());
case UInt32:
case Int64:
return new SparkplugValue<Long>(type, protoValue.getLongValue());
case UInt64:
return new SparkplugValue<BigInteger>(type, BigInteger.valueOf(protoValue.getLongValue()));
case String:
case Text:
if (protoValue.getStringValue().equals("null")) {
return new SparkplugValue<String>(type, null);
} else {
return new SparkplugValue<String>(type, protoValue.getStringValue());
}
case Unknown:
default:
log.error("Unknown DataType: " + protoType);
throw new Exception("Failed to decode");
}
}
}

545
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java

@ -0,0 +1,545 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.google.protobuf.ByteString;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* Created by nickAS21 on 13.12.22
*/
public class SparkplugBPayloadEncoder implements SparkplugPayloadEncoder <SparkplugBPayload> {
private static Logger logger = LogManager.getLogger(SparkplugBPayloadEncoder.class.getName());
public SparkplugBPayloadEncoder() {
super();
}
public byte[] getBytes(SparkplugBPayload payload) throws IOException {
SparkplugBProto.Payload.Builder protoMsg = SparkplugBProto.Payload.newBuilder();
// Set the timestamp
if (payload.getTimestamp() != null) {
protoMsg.setTimestamp(payload.getTimestamp().getTime());
}
// Set the sequence number
protoMsg.setSeq(payload.getSeq());
// Set the UUID if defined
if (payload.getUuid() != null) {
protoMsg.setUuid(payload.getUuid());
}
// Set the metrics
for (Metric metric : payload.getMetrics()) {
try {
protoMsg.addMetrics(convertMetric(metric));
} catch(Exception e) {
logger.error("Failed to add metric: " + metric.getName());
throw new RuntimeException(e);
}
}
// Set the body
if (payload.getBody() != null) {
protoMsg.setBody(ByteString.copyFrom(payload.getBody()));
}
return protoMsg.build().toByteArray();
}
private SparkplugBProto.Payload.Metric.Builder convertMetric(Metric metric) throws Exception {
// build a metric
SparkplugBProto.Payload.Metric.Builder builder = SparkplugBProto.Payload.Metric.newBuilder();
// set the basic parameters
builder.setDatatype(metric.getDataType().toIntValue());
builder = setMetricValue(builder, metric);
// Set the name, data type, and value
if (metric.hasName()) {
builder.setName(metric.getName());
}
// Set the alias
if (metric.hasAlias()) {
builder.setAlias(metric.getAlias());
}
// Set the timestamp
if (metric.getTimestamp() != null) {
builder.setTimestamp(metric.getTimestamp().getTime());
}
// Set isHistorical
if (metric.getIsHistorical() != null) {
builder.setIsHistorical(metric.isHistorical());
}
// Set isTransient
if (metric.getIsTransient() != null) {
builder.setIsTransient(metric.isTransient());
}
// Set isNull
if (metric.getIsNull() != null) {
builder.setIsNull(metric.isNull());
}
// Set the metadata
if (metric.getMetaData() != null) {
builder = setMetaData(builder, metric);
}
// Set the property set
if (metric.getProperties() != null) {
builder.setProperties(convertPropertySet(metric.getProperties()));
}
return builder;
}
private SparkplugBProto.Payload.Template.Parameter.Builder convertParameter(Parameter parameter) throws Exception {
// build a metric
SparkplugBProto.Payload.Template.Parameter.Builder builder =
SparkplugBProto.Payload.Template.Parameter.newBuilder();
if (logger.isTraceEnabled()) {
logger.trace("Adding parameter: " + parameter.getName());
logger.trace(" type: " + parameter.getType());
}
// Set the name
builder.setName(parameter.getName());
// Set the type and value
builder = setParameterValue(builder, parameter);
return builder;
}
private SparkplugBProto.Payload.PropertySet.Builder convertPropertySet(PropertySet propertySet) throws Exception {
SparkplugBProto.Payload.PropertySet.Builder setBuilder = SparkplugBProto.Payload.PropertySet.newBuilder();
Map<String, PropertyValue> map = propertySet.getPropertyMap();
for (String key : map.keySet()) {
SparkplugBProto.Payload.PropertyValue.Builder builder = SparkplugBProto.Payload.PropertyValue.newBuilder();
PropertyValue value = map.get(key);
PropertyDataType type = value.getType();
builder.setType(type.toIntValue());
if (value.getValue() == null) {
builder.setIsNull(true);
} else {
switch (type) {
case Boolean:
builder.setBooleanValue((Boolean) value.getValue());
break;
case DateTime:
builder.setLongValue(((Date) value.getValue()).getTime());
break;
case Double:
builder.setDoubleValue((Double) value.getValue());
break;
case Float:
builder.setFloatValue((Float) value.getValue());
break;
case Int8:
builder.setIntValue((Byte) value.getValue());
break;
case Int16:
case UInt8:
builder.setIntValue((Short) value.getValue());
break;
case Int32:
case UInt16:
builder.setIntValue((Integer) value.getValue());
break;
case Int64:
case UInt32:
builder.setLongValue((Long) value.getValue());
break;
case UInt64:
builder.setLongValue(((BigInteger) value.getValue()).longValue());
break;
case String:
case Text:
builder.setStringValue((String) value.getValue());
break;
case PropertySet:
builder.setPropertysetValue(convertPropertySet((PropertySet) value.getValue()));
break;
case PropertySetList:
List<?> setList = (List<?>) value.getValue();
SparkplugBProto.Payload.PropertySetList.Builder listBuilder =
SparkplugBProto.Payload.PropertySetList.newBuilder();
for (Object obj : setList) {
listBuilder.addPropertyset(convertPropertySet((PropertySet) obj));
}
builder.setPropertysetsValue(listBuilder);
break;
case Unknown:
default:
logger.error("Unknown DataType: " + value.getType());
throw new Exception("Failed to convert value " + value.getType());
}
}
setBuilder.addKeys(key);
setBuilder.addValues(builder);
}
return setBuilder;
}
private SparkplugBProto.Payload.Template.Parameter.Builder setParameterValue(
SparkplugBProto.Payload.Template.Parameter.Builder builder, Parameter parameter) throws Exception {
ParameterDataType type = parameter.getType();
builder.setType(type.toIntValue());
Object value = parameter.getValue();
switch (type) {
case Boolean:
builder.setBooleanValue(toBoolean(value));
break;
case DateTime:
builder.setLongValue(((Date) value).getTime());
break;
case Double:
builder.setDoubleValue((Double) value);
break;
case Float:
builder.setFloatValue((Float) value);
break;
case Int8:
builder.setIntValue((Byte) value);
break;
case Int16:
case UInt8:
builder.setIntValue((Short) value);
break;
case Int32:
case UInt16:
builder.setIntValue((Integer) value);
break;
case Int64:
case UInt32:
builder.setLongValue((Long) value);
break;
case UInt64:
builder.setLongValue(((BigInteger) value).longValue());
break;
case Text:
case String:
if (value == null) {
builder.setStringValue("");
} else {
builder.setStringValue((String) value);
}
break;
case Unknown:
default:
logger.error("Unknown Type: " + type);
throw new Exception("Failed to encode");
}
return builder;
}
private SparkplugBProto.Payload.Metric.Builder setMetricValue(SparkplugBProto.Payload.Metric.Builder metricBuilder,
Metric metric) throws Exception {
// Set the data type
metricBuilder.setDatatype(metric.getDataType().toIntValue());
if (metric.getValue() == null) {
metricBuilder.setIsNull(true);
} else {
switch (metric.getDataType()) {
case Boolean:
metricBuilder.setBooleanValue(toBoolean(metric.getValue()));
break;
case DateTime:
metricBuilder.setLongValue(((Date) metric.getValue()).getTime());
break;
case File:
metricBuilder.setBytesValue(ByteString.copyFrom(((File) metric.getValue()).getBytes()));
SparkplugBProto.Payload.MetaData.Builder metaDataBuilder =
SparkplugBProto.Payload.MetaData.newBuilder();
metaDataBuilder.setFileName(((File) metric.getValue()).getFileName());
metricBuilder.setMetadata(metaDataBuilder);
break;
case Float:
metricBuilder.setFloatValue((Float) metric.getValue());
break;
case Double:
metricBuilder.setDoubleValue((Double) metric.getValue());
break;
case Int8:
metricBuilder.setIntValue(((Byte)metric.getValue()).intValue());
break;
case Int16:
case UInt8:
metricBuilder.setIntValue(((Short)metric.getValue()).intValue());
break;
case Int32:
case UInt16:
metricBuilder.setIntValue((int) metric.getValue());
break;
case UInt32:
case Int64:
metricBuilder.setLongValue((Long) metric.getValue());
break;
case UInt64:
metricBuilder.setLongValue(((BigInteger) metric.getValue()).longValue());
break;
case String:
case Text:
case UUID:
metricBuilder.setStringValue((String) metric.getValue());
break;
case Bytes:
metricBuilder.setBytesValue(ByteString.copyFrom((byte[]) metric.getValue()));
break;
case DataSet:
DataSet dataSet = (DataSet) metric.getValue();
SparkplugBProto.Payload.DataSet.Builder dataSetBuilder =
SparkplugBProto.Payload.DataSet.newBuilder();
dataSetBuilder.setNumOfColumns(dataSet.getNumOfColumns());
// Column names
List<String> columnNames = dataSet.getColumnNames();
if (columnNames != null && !columnNames.isEmpty()) {
for (String name : columnNames) {
// Add the column name
dataSetBuilder.addColumns(name);
}
}
// Column types
List<DataSetDataType> columnTypes = dataSet.getTypes();
if (columnTypes != null && !columnTypes.isEmpty()) {
for (DataSetDataType type : columnTypes) {
// Add the column type
dataSetBuilder.addTypes(type.toIntValue());
}
}
// Dataset rows
List<Row> rows = dataSet.getRows();
if (rows != null && !rows.isEmpty()) {
for (Row row : rows) {
SparkplugBProto.Payload.DataSet.Row.Builder protoRowBuilder =
SparkplugBProto.Payload.DataSet.Row.newBuilder();
List<SparkplugValue<?>> values = row.getValues();
if (values != null && !values.isEmpty()) {
for (SparkplugValue<?> value : values) {
// Add the converted element
protoRowBuilder.addElements(convertDataSetValue(value));
}
dataSetBuilder.addRows(protoRowBuilder);
}
}
}
// Finally add the dataset
metricBuilder.setDatasetValue(dataSetBuilder);
break;
case Template:
Template template = (Template) metric.getValue();
SparkplugBProto.Payload.Template.Builder templateBuilder =
SparkplugBProto.Payload.Template.newBuilder();
// Set isDefinition
templateBuilder.setIsDefinition(template.isDefinition());
// Set Version
if (template.getVersion() != null) {
templateBuilder.setVersion(template.getVersion());
}
// Set Template Reference
if (template.getTemplateRef() != null) {
templateBuilder.setTemplateRef(template.getTemplateRef());
}
// Set the template metrics
if (template.getMetrics() != null) {
for (Metric templateMetric : template.getMetrics()) {
templateBuilder.addMetrics(convertMetric(templateMetric));
}
}
// Set the template parameters
if (template.getParameters() != null) {
for (Parameter parameter : template.getParameters()) {
templateBuilder.addParameters(convertParameter(parameter));
}
}
// Add the template to the metric
metricBuilder.setTemplateValue(templateBuilder);
break;
case Unknown:
default:
logger.error("Unknown DataType: " + metric.getDataType());
throw new Exception("Failed to encode");
}
}
return metricBuilder;
}
private SparkplugBProto.Payload.Metric.Builder setMetaData(SparkplugBProto.Payload.Metric.Builder metricBuilder,
Metric metric) throws Exception {
// If the builder has been built already - use it
SparkplugBProto.Payload.MetaData.Builder metaDataBuilder = metricBuilder.getMetadataBuilder() != null
? metricBuilder.getMetadataBuilder()
: SparkplugBProto.Payload.MetaData.newBuilder();
MetaData metaData = metric.getMetaData();
if (metaData.getContentType() != null) {
metaDataBuilder.setContentType(metaData.getContentType());
}
if (metaData.getSize() != null) {
metaDataBuilder.setSize(metaData.getSize());
}
if (metaData.getSeq() != null) {
metaDataBuilder.setSeq(metaData.getSeq());
}
if (metaData.getFileName() != null) {
metaDataBuilder.setFileName(metaData.getFileName());
}
if (metaData.getFileType() != null) {
metaDataBuilder.setFileType(metaData.getFileType());
}
if (metaData.getMd5() != null) {
metaDataBuilder.setMd5(metaData.getMd5());
}
if (metaData.getDescription() != null) {
metaDataBuilder.setDescription(metaData.getDescription());
}
metricBuilder.setMetadata(metaDataBuilder);
return metricBuilder;
}
private SparkplugBProto.Payload.DataSet.DataSetValue.Builder convertDataSetValue(SparkplugValue<?> value) throws Exception {
SparkplugBProto.Payload.DataSet.DataSetValue.Builder protoValueBuilder =
SparkplugBProto.Payload.DataSet.DataSetValue.newBuilder();
// Set the value
DataSetDataType type = value.getType();
switch (type) {
case Int8:
protoValueBuilder.setIntValue((Byte) value.getValue());
break;
case Int16:
case UInt8:
protoValueBuilder.setIntValue((Short) value.getValue());
break;
case Int32:
case UInt16:
protoValueBuilder.setIntValue((Integer) value.getValue());
break;
case Int64:
case UInt32:
protoValueBuilder.setLongValue((Long) value.getValue());
break;
case UInt64:
protoValueBuilder.setLongValue(((BigInteger) value.getValue()).longValue());
break;
case Float:
protoValueBuilder.setFloatValue((Float) value.getValue());
break;
case Double:
protoValueBuilder.setDoubleValue((Double) value.getValue());
break;
case String:
case Text:
if (value.getValue() != null) {
protoValueBuilder.setStringValue((String) value.getValue());
} else {
logger.warn("String value for dataset is null");
protoValueBuilder.setStringValue("null");
}
break;
case Boolean:
protoValueBuilder.setBooleanValue(toBoolean(value.getValue()));
break;
case DateTime:
try {
protoValueBuilder.setLongValue(((Date) value.getValue()).getTime());
} catch (NullPointerException npe) {
// FIXME - remove after is_null is supported for dataset values
logger.debug("Date in dataset was null - leaving it -9223372036854775808L");
protoValueBuilder.setLongValue(-9223372036854775808L);
}
break;
default:
logger.error("Unknown DataType: " + value.getType());
throw new Exception("Failed to convert value " + value.getType());
}
return protoValueBuilder;
}
private Boolean toBoolean(Object value) {
if (value == null) {
return null;
}
if (value instanceof Integer) {
return ((Integer)value).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Long) {
return ((Long)value).longValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Float) {
return ((Float)value).floatValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Double) {
return ((Double)value).doubleValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Short) {
return ((Short)value).shortValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Byte) {
return ((Byte)value).byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof String) {
return Boolean.parseBoolean(value.toString());
}
return (Boolean)value;
}
}

17414
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java

File diff suppressed because it is too large

107
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java

@ -0,0 +1,107 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* An enumeration of Sparkplug MQTT message types. The type provides an indication as to what the MQTT Payload of
* message will contain.
*/
public enum SparkplugMessageType {
/**
* Birth certificate for MQTT Edge of Network (EoN) Nodes.
*/
NBIRTH,
/**
* Death certificate for MQTT Edge of Network (EoN) Nodes.
*/
NDEATH,
/**
* Birth certificate for MQTT Devices.
*/
DBIRTH,
/**
* Death certificate for MQTT Devices.
*/
DDEATH,
/**
* Edge of Network (EoN) Node data message.
*/
NDATA,
/**
* Device data message.
*/
DDATA,
/**
* Edge of Network (EoN) Node command message.
*/
NCMD,
/**
* Device command message.
*/
DCMD,
/**
* Critical application state message.
*/
STATE,
/**
* Device record message.
*/
DRECORD,
/**
* Edge of Network (EoN) Node record message.
*/
NRECORD;
public static SparkplugMessageType parseMessageType(String type) throws Exception {
for (SparkplugMessageType messageType : SparkplugMessageType.values()) {
if (messageType.name().equals(type)) {
return messageType;
}
}
throw new Exception("Invalid message type: " + type);
}
public boolean isDeath() {
return this.equals(DDEATH) || this.equals(NDEATH);
}
public boolean isCommand() {
return this.equals(DCMD) || this.equals(NCMD);
}
public boolean isData() {
return this.equals(DDATA) || this.equals(NDATA);
}
public boolean isBirth() {
return this.equals(DBIRTH) || this.equals(NBIRTH);
}
public boolean isRecord() {
return this.equals(DRECORD) || this.equals(NRECORD);
}
}

37
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadDecoder.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* Created by nickAS21 on 13.12.22
*/
/**
* An interface for decoding payloads.
*
* @param <P> the type of payload.
*/
public interface SparkplugPayloadDecoder <P> {
/**
* Builds a payload from a supplied byte array.
*
* @param bytes the bytes representing the payload
* @return a payload object built from the byte array
* @throws Exception
*/
public P buildFromByteArray(byte[] bytes) throws Exception;
}

39
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadEncoder.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* Created by nickAS21 on 13.12.22
*/
import java.io.IOException;
/**
* An interface for decoding payloads.
*
* @param <P> the type of payload.
*/
public interface SparkplugPayloadEncoder<P> {
/**
* Converts a payload object into a byte array.
*
* @param payload a payload object
* @return the byte array representing the payload
* @throws IOException
*/
public byte[] getBytes(P payload) throws IOException;
}

113
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java

@ -0,0 +1,113 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugTopic;
import java.util.HashMap;
import java.util.Map;
/**
* Provides utility methods for handling Sparkplug MQTT message topics.
*/
public class SparkplugTopicUtil {
private static final Map<String, String[]> SPLIT_TOPIC_CACHE = new HashMap<String, String[]>();
public static String[] getSplitTopic(String topic) {
String[] splitTopic = SPLIT_TOPIC_CACHE.get(topic);
if (splitTopic == null) {
splitTopic = topic.split("/");
SPLIT_TOPIC_CACHE.put(topic, splitTopic);
}
return splitTopic;
}
/**
* Serializes a {@link SparkplugTopic} instance in to a JSON string.
*
* @param topic a {@link SparkplugTopic} instance
* @return a JSON string
* @throws JsonProcessingException
*/
public static String sparkplugTopicToString(SparkplugTopic topic) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(topic);
}
/**
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
*
* @param topic a topic string
* @return a {@link SparkplugTopic} instance
* @throws Exception if an error occurs while parsing
*/
public static SparkplugTopic parseTopic(String topic) throws Exception {
topic = topic.indexOf("#") > 0 ? topic.substring(0, topic.indexOf("#")) : topic;
return parseTopic(SparkplugTopicUtil.getSplitTopic(topic));
}
/**
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
*
* @param splitTopic a topic split into tokens
* @return a {@link SparkplugTopic} instance
* @throws Exception if an error occurs while parsing
*/
@SuppressWarnings("incomplete-switch")
public static SparkplugTopic parseTopic(String[] splitTopic) throws Exception {
SparkplugMessageType type;
String namespace, edgeNodeId, groupId;
int length = splitTopic.length;
if (length < 4 || length > 5) {
throw new Exception("Invalid number of topic elements: " + length);
}
namespace = splitTopic[0];
groupId = splitTopic[1];
type = SparkplugMessageType.parseMessageType(splitTopic[2]);
edgeNodeId = splitTopic[3];
if (length == 4) {
// A node topic
switch (type) {
case STATE:
case NBIRTH:
case NCMD:
case NDATA:
case NDEATH:
case NRECORD:
return new SparkplugTopic(namespace, groupId, edgeNodeId, type);
}
} else {
// A device topic
switch (type) {
case STATE:
case DBIRTH:
case DCMD:
case DDATA:
case DDEATH:
case DRECORD:
return new SparkplugTopic(namespace, groupId, edgeNodeId, splitTopic[4], type);
}
}
throw new Exception("Invalid number of topic elements " + length + " for topic type " + type);
}
}

129
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DataSetDeserializer.java

@ -0,0 +1,129 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* A JSON deserializer for {@link DataSet} instances.
*/
public class DataSetDeserializer extends StdDeserializer<DataSet> {
private static Logger logger = LogManager.getLogger(DataSetDeserializer.class.getName());
private static final String FIELD_SIZE = "numberOfColumns";
private static final String FIELD_TYPES = "types";
private static final String FIELD_NAMES = "columnNames";
private static final String FIELD_ROWS = "rows";
/**
* Constructor.
*
* @param clazz
*/
protected DataSetDeserializer(Class<DataSet> clazz) {
super(clazz);
}
@Override
public DataSet deserialize(JsonParser parser, DeserializationContext context)
throws IOException, JsonProcessingException {
JsonNode node = parser.getCodec().readTree(parser);
long size = (Long) node.get(FIELD_SIZE).numberValue();
DataSet.DataSetBuilder builder = new DataSet.DataSetBuilder(size);
JsonNode namesNode = node.get(FIELD_NAMES);
if (namesNode.isArray()) {
for (JsonNode nameNode : namesNode) {
builder.addColumnName(nameNode.textValue());
}
}
JsonNode typesNode = node.get(FIELD_TYPES);
List<DataSetDataType> typesList = new ArrayList<DataSetDataType>();
if (typesNode.isArray()) {
for (JsonNode typeNode : typesNode) {
typesList.add(DataSetDataType.valueOf(typeNode.textValue()));
}
builder.addTypes(typesList);
}
JsonNode rowsNode = node.get(FIELD_ROWS);
if (rowsNode.isArray()) {
for (JsonNode rowNode : rowsNode) {
List<SparkplugValue<?>> values = new ArrayList<SparkplugValue<?>>();
for (int i = 0; i < size; i++) {
JsonNode value = rowNode.get(i);
DataSetDataType type = typesList.get(i);
values.add(getValueFromNode(value, type));
}
builder.addRow(new Row(values));
}
}
try {
return builder.createDataSet();
} catch (Exception e) {
logger.error("Error deserializing DataSet ", e);
}
return null;
}
/*
* Creates and returns a Value instance
*/
private SparkplugValue<?> getValueFromNode(JsonNode nodeValue, DataSetDataType type) {
switch (type) {
case Boolean:
return new SparkplugValue<Boolean>(type, (boolean)nodeValue.asBoolean());
case DateTime:
return new SparkplugValue<Date>(type, new Date(nodeValue.asLong()));
case Double:
return new SparkplugValue<Double>(type, nodeValue.asDouble());
case Float:
return new SparkplugValue<Float>(type, (float)nodeValue.asDouble());
case Int16:
case UInt8:
return new SparkplugValue<Byte>(type, (byte)nodeValue.asInt());
case UInt16:
case Int32:
return new SparkplugValue<Integer>(type, nodeValue.asInt());
case UInt32:
case Int64:
return new SparkplugValue<Long>(type, (long)nodeValue.asLong());
case Text:
case String:
return new SparkplugValue<String>(type, nodeValue.asText());
case UInt64:
return new SparkplugValue<BigInteger>(type, BigInteger.valueOf(nodeValue.asLong()));
case Unknown:
default:
return null;
}
}
}

38
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModifier.java

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
/**
*
*/
public class DeserializerModifier extends BeanDeserializerModifier {
@Override
public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer<?> deserializer) {
if (Metric.class.equals(beanDesc.getBeanClass())) {
return new MetricDeserializer(deserializer);
}
return super.modifyDeserializer(config, beanDesc, deserializer);
}
}

39
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModule.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
/**
* Used to register the {@link DeserializerModifier} instance.
*/
public class DeserializerModule extends SimpleModule {
private BeanDeserializerModifier deserializerModifier;
public DeserializerModule(BeanDeserializerModifier deserializerModifier) {
super("DeserializerModule", Version.unknownVersion());
this.deserializerModifier = deserializerModifier;
}
@Override
public void setupModule(SetupContext context) {
super.setupModule(context);
context.addBeanDeserializerModifier(deserializerModifier);
}
}

52
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/FileSerializer.java

@ -0,0 +1,52 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.eclipse.californium.elements.util.Base64;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import java.io.IOException;
/**
* Serializes a {@link File} instance.
*/
public class FileSerializer extends StdSerializer<File> {
/**
* Constructor.
*/
protected FileSerializer() {
super(File.class);
}
/**
* Constructor.
*
* @param clazz class.
*/
protected FileSerializer(Class<File> clazz) {
super(clazz);
}
@Override
public void serialize(File value, JsonGenerator generator, SerializerProvider provider) throws IOException {
generator.writeString(Base64.encodeBytes(value.getBytes()));
}
}

70
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/JsonValidator.java

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
/**
* Validates JSON.
*/
public class JsonValidator {
protected static final String JSON_SCHEMA_FILENAME = "payload.json";
private static JsonValidator instance = null;
/**
* Constructor.
*/
protected JsonValidator() {
}
/**
* Returns the {@link JsonValidator} instance.
*
* @return the {@link JsonValidator} instance.
*/
public static JsonValidator getInstance() {
if (instance == null) {
instance = new JsonValidator();
}
return instance;
}
/**
* Returns loads and returns the {@link JsonSchema} instance associated with this validator.
*
* @return the {@link JsonSchema} instance associated with this validator.
* @throws IOException
* @throws ProcessingException
*/
/* protected JsonSchema getSchema() throws IOException, ProcessingException {
//Get file from resources folder
ClassLoader classLoader = getClass().getClassLoader();
File schemaFile = new File(classLoader.getResource(JSON_SCHEMA_FILENAME).getFile());
return JsonSchemaFactory.byDefault().getJsonSchema(JsonLoader.fromFile(schemaFile));
}*/
/**
* Returns true if the supplied JSON text is valid, false otherwise.
*
* @param jsonText a {@link String} representing JSON text.
* @return true if the supplied JSON text is valid, false otherwise.
* @throws ProcessingException
* @throws IOException
*/
/* public boolean isJsonValid(String jsonText) throws ProcessingException, IOException {
return getSchema().validate(JsonLoader.fromString(jsonText)).isSuccess();
}*/
}

70
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/MetricDeserializer.java

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetricDataType;
import java.io.IOException;
import java.util.Base64;
/**
* A custom JSON deserializer for {@link Metric} instances.
*/
public class MetricDeserializer extends StdDeserializer<Metric> implements ResolvableDeserializer {
private final JsonDeserializer<?> defaultDeserializer;
/**
* Constructor.
*/
protected MetricDeserializer(JsonDeserializer<?> defaultDeserializer) {
super(Metric.class);
this.defaultDeserializer = defaultDeserializer;
}
@Override
public Metric deserialize(JsonParser parser, DeserializationContext ctxt) throws IOException, JsonProcessingException {
Metric metric = (Metric) defaultDeserializer.deserialize(parser, ctxt);
System.out.println(metric);
// Check if the data type is a File
if (metric.getDataType().equals(MetricDataType.File)) {
// Perform the custom logic for File types by building up the File object.
MetaData metaData = metric.getMetaData();
String fileName = metaData == null ? null : metaData.getFileName();
File file = new File(fileName, Base64.getDecoder().decode((String)metric.getValue()));
metric.setValue(file);
}
return metric;
}
@Override
public void resolve(DeserializationContext ctxt) throws JsonMappingException {
((ResolvableDeserializer) defaultDeserializer).resolve(ctxt);
}
}

234
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSet.java

@ -0,0 +1,234 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.transport.mqtt.util.sparkplug.json.DataSetDeserializer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A data set that represents a table of data.
*/
@Slf4j
@JsonDeserialize(using = DataSetDeserializer.class)
public class DataSet {
/**
* The number of columns
*/
@JsonProperty("numberOfColumns")
private long numOfColumns;
/**
* A list containing the names of each column
*/
@JsonProperty("columnNames")
private List<String> columnNames;
/**
* A list containing the data types of each column
*/
@JsonProperty("types")
private List<DataSetDataType> types;
/**
* A list containing the rows in the data set
*/
private List<Row> rows;
public DataSet(long numOfColumns, List<String> columnNames, List<DataSetDataType> types, List<Row> rows) {
this.numOfColumns = numOfColumns;
this.columnNames = columnNames;
this.types = types;
this.rows = rows;
}
public long getNumOfColumns() {
return numOfColumns;
}
public void setNumOfColumns(long numOfColumns) {
this.numOfColumns = numOfColumns;
}
public List<String> getColumnNames() {
return columnNames;
}
public void setColumnNames(List<String> columnNames) {
this.columnNames = columnNames;
}
public void addColumnName(String columnName) {
this.columnNames.add(columnName);
}
public List<Row> getRows() {
return rows;
}
@JsonGetter("rows")
public List<List<Object>> getRowsAsLists() {
List<List<Object>> list = new ArrayList<List<Object>>(getRows().size());
for (Row row : getRows()) {
list.add(Row.toValues(row));
}
return list;
}
public void addRow(Row row) {
this.rows.add(row);
}
public void addRow(int index, Row row) {
this.rows.add(index, row);
}
public Row removeRow(int index) {
return rows.remove(index);
}
public boolean removeRow(Row row) {
return rows.remove(row);
}
public void setRows(List<Row> rows) {
this.rows = rows;
}
public List<DataSetDataType> getTypes() {
return types;
}
public void setTypes(List<DataSetDataType> types) {
this.types = types;
}
public void addType(DataSetDataType type) {
this.types.add(type);
}
public void addType(int index, DataSetDataType type) {
this.types.add(index, type);
}
@Override
public String toString() {
return "DataSet [numOfColumns=" + numOfColumns + ", columnNames=" + columnNames + ", types=" + types + ", rows="
+ rows + "]";
}
/**
* A builder for creating a {@link DataSet} instance.
*/
public static class DataSetBuilder {
private long numOfColumns;
private List<String> columnNames;
private List<DataSetDataType> types;
private List<Row> rows;
public DataSetBuilder(long numOfColumns) {
this.numOfColumns = numOfColumns;
this.columnNames = new ArrayList<String>();
this.types = new ArrayList<DataSetDataType>();
this.rows = new ArrayList<Row>();
}
public DataSetBuilder(DataSet dataSet) {
this.numOfColumns = dataSet.getNumOfColumns();
this.columnNames = new ArrayList<String>(dataSet.getColumnNames());
this.types = new ArrayList<DataSetDataType>(dataSet.getTypes());
this.rows = new ArrayList<Row>(dataSet.getRows().size());
for (Row row : dataSet.getRows()) {
rows.add(new Row.RowBuilder(row).createRow());
}
}
public DataSetBuilder addColumnNames(Collection<String> columnNames) {
this.columnNames.addAll(columnNames);
return this;
}
public DataSetBuilder addColumnName(String columnName) {
this.columnNames.add(columnName);
return this;
}
public DataSetBuilder addType(DataSetDataType type) {
this.types.add(type);
return this;
}
public DataSetBuilder addTypes(Collection<DataSetDataType> types) {
this.types.addAll(types);
return this;
}
public DataSetBuilder addRow(Row row) {
this.rows.add(row);
return this;
}
public DataSetBuilder addRows(Collection<Row> rows) {
this.rows.addAll(rows);
return this;
}
public DataSet createDataSet() throws Exception {
log.trace("Number of columns: " + numOfColumns);
for (String columnName : columnNames) {
log.trace("\tcolumnName: " + columnName);
}
for (DataSetDataType type : types) {
log.trace("\ttypes: " + type);
}
for (Row row : rows) {
log.trace("\t\trow: " + row);
}
validate();
return new DataSet(numOfColumns, columnNames, types, rows);
}
public void validate() throws Exception {
if (columnNames.size() != numOfColumns) {
throw new Exception("Invalid number of columns in data set column names: " +
columnNames.size() + " vs expected " + numOfColumns);
}
if (types.size() != numOfColumns) {
throw new Exception("Invalid number of columns in data set types: " +
types.size() + " vs expected: " + numOfColumns);
}
for (int i = 0; i < types.size(); i++) {
for (Row row : rows) {
List<SparkplugValue<?>> values = row.getValues();
if (values.size() != numOfColumns) {
throw new Exception("Invalid number of columns in data set row: " +
values.size() + " vs expected: " + numOfColumns);
}
types.get(i).checkType(row.getValues().get(i).getValue());
}
}
}
}
}

117
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSetDataType.java

@ -0,0 +1,117 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import java.math.BigInteger;
import java.util.Date;
/**
* A enumeration of data types of values in a {@link DataSet}
*/
public enum DataSetDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Unknown
Unknown(0, Object.class);
private Class<?> clazz = null;
private int intValue = 0;
private DataSetDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
throw new Exception(value.getClass().getName());
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link DataSetDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link DataSetDataType} instance.
*/
public static DataSetDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

64
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/File.java

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import org.thingsboard.server.transport.mqtt.util.sparkplug.json.FileSerializer;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Arrays;
@JsonIgnoreProperties(value = { "fileName" })
@JsonSerialize(using = FileSerializer.class)
public class File {
private String fileName;
private byte[] bytes;
public File() {
super();
}
public File(String fileName, byte[] bytes) {
super();
this.fileName = fileName == null
? null
: fileName.replace("/", System.getProperty("file.separator"))
.replace("\\", System.getProperty("file.separator"));
this.bytes = Arrays.copyOf(bytes, bytes.length);
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
@Override
public String toString() {
return "File [fileName=" + fileName + ", bytes length=" + bytes.length + "]";
}
}

267
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetaData.java

@ -0,0 +1,267 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import java.util.Objects;
/**
* A class to represent the meta data associated with a metric.
*/
@JsonInclude(Include.NON_NULL)
public class MetaData {
/**
* Indicates if the metric represents one of multiple parts.
*/
private Boolean isMultiPart;
/**
* A content type associated with the metric.
*/
private String contentType;
/**
* A size associated with the metric.
*/
private Long size;
/**
* A sequence associated with the metric.
*/
private Long seq;
/**
* A file name associated with the metric.
*/
private String fileName;
/**
* A file type associated with the metric.
*/
private String fileType;
/**
* A MD5 sum associated with the metric.
*/
private String md5;
/**
* A description associated with the metric.
*/
private String description;
/**
* Default no-arg constructor.
*/
public MetaData() {}
/**
* Constructor with fields.
*
* @param isMultiPart if the metric represents one of multiple parts.
* @param contentType a content type associated with the metric.
* @param size a size associated with the metric.
* @param seq a sequence associated with the metric.
* @param fileName a file name associated with the metric.
* @param fileType a file type associated with the metric.
* @param md5 a MD5 sum associated with the metric.
* @param description a description associated with the metric
*/
public MetaData(Boolean isMultiPart, String contentType, Long size, Long seq, String fileName,
String fileType, String md5, String description) {
this.isMultiPart = isMultiPart;
this.contentType = contentType;
this.size = size;
this.seq = seq;
this.fileName = fileName;
this.fileType = fileType;
this.md5 = md5;
this.description = description;
}
public Boolean isMultiPart() {
return isMultiPart;
}
public MetaData setMultiPart(Boolean isMultiPart) {
this.isMultiPart = isMultiPart;
return this;
}
public String getContentType() {
return contentType;
}
public MetaData setContentType(String contentType) {
this.contentType = contentType;
return this;
}
public Long getSize() {
return size;
}
public MetaData setSize(Long size) {
this.size = size;
return this;
}
public Long getSeq() {
return seq;
}
public MetaData setSeq(Long seq) {
this.seq = seq;
return this;
}
public String getFileName() {
return fileName;
}
public MetaData setFileName(String fileName) {
this.fileName = fileName;
return this;
}
public String getFileType() {
return fileType;
}
public MetaData setFileType(String fileType) {
this.fileType = fileType;
return this;
}
public String getMd5() {
return md5;
}
public MetaData setMd5(String md5) {
this.md5 = md5;
return this;
}
public String getDescription() {
return description;
}
public MetaData setDescription(String description) {
this.description = description;
return this;
}
@Override
public String toString() {
return "MetaData [isMultiPart=" + isMultiPart + ", contentType=" + contentType + ", size=" + size + ", seq="
+ seq + ", fileName=" + fileName + ", fileType=" + fileType + ", md5=" + md5 + ", description="
+ description + "]";
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || this.getClass() != object.getClass()) {
return false;
}
MetaData meta = (MetaData) object;
return Objects.equals(isMultiPart, meta.isMultiPart())
&& Objects.equals(contentType, meta.getContentType())
&& Objects.equals(size, meta.getSize())
&& Objects.equals(seq, meta.getSeq())
&& Objects.equals(fileName, meta.getFileName())
&& Objects.equals(fileType, meta.getFileType())
&& Objects.equals(md5, meta.getMd5())
&& Objects.equals(description, meta.getDescription());
}
/**
* A Builder for a MetaData instance.
*/
public static class MetaDataBuilder {
private Boolean isMultiPart;
private String contentType;
private Long size;
private Long seq;
private String fileName;
private String fileType;
private String md5;
private String description;
public MetaDataBuilder() {};
public MetaDataBuilder(MetaData metaData) {
this.isMultiPart = metaData.isMultiPart();
this.contentType = metaData.getContentType();
this.size = metaData.getSize();
this.seq = metaData.getSeq();
this.fileName = metaData.getFileName();
this.fileType = metaData.getFileType();
this.md5 = metaData.getMd5();
this.description = metaData.getDescription();
}
public MetaDataBuilder multiPart(Boolean isMultiPart) {
this.isMultiPart = isMultiPart;
return this;
}
public MetaDataBuilder contentType(String contentType) {
this.contentType = contentType;
return this;
}
public MetaDataBuilder size(Long size) {
this.size = size;
return this;
}
public MetaDataBuilder seq(Long seq) {
this.seq = seq;
return this;
}
public MetaDataBuilder fileName(String fileName) {
this.fileName = fileName;
return this;
}
public MetaDataBuilder fileType(String fileType) {
this.fileType = fileType;
return this;
}
public MetaDataBuilder md5(String md5) {
this.md5 = md5;
return this;
}
public MetaDataBuilder description(String description) {
this.description = description;
return this;
}
public MetaData createMetaData() {
return new MetaData(isMultiPart, contentType, size, seq, fileName, fileType, md5, description);
}
}
}

320
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Metric.java

@ -0,0 +1,320 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet.DataSetBuilder;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData.MetaDataBuilder;
import java.util.Date;
;
/**
* A metric of a Sparkplug Payload.
*/
@JsonIgnoreProperties(value = { "isNull" })
@JsonInclude(Include.NON_NULL)
public class Metric {
@JsonProperty("name")
private String name;
@JsonProperty("alias")
private Long alias;
@JsonProperty("timestamp")
private Date timestamp;
@JsonProperty("dataType")
private MetricDataType dataType;
@JsonProperty("isHistorical")
private Boolean isHistorical;
@JsonProperty("isTransient")
private Boolean isTransient;
@JsonProperty("metaData")
private MetaData metaData;
@JsonProperty("properties")
@JsonInclude(Include.NON_EMPTY)
private PropertySet properties;
@JsonProperty("value")
private Object value;
private Boolean isNull = null;
public Metric() {};
/**
* @param name
* @param alias
* @param timestamp
* @param dataType
* @param isHistorical
* @param isTransient
* @param metaData
* @param properties
* @param value
* @throws Exception
*/
public Metric(String name, Long alias, Date timestamp, MetricDataType dataType, Boolean isHistorical,
Boolean isTransient, MetaData metaData, PropertySet properties, Object value)
throws Exception {
super();
this.name = name;
this.alias = alias;
this.timestamp = timestamp;
this.dataType = dataType;
this.isHistorical = isHistorical;
this.isTransient = isTransient;
isNull = (value == null) ? true : false;
this.metaData = metaData;
this.properties = properties;
this.value = value;
this.dataType.checkType(value);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean hasName() {
return !(name == null);
}
public boolean hasAlias() {
return !(alias == null);
}
public Long getAlias() {
return alias;
}
public void setAlias(long alias) {
this.alias = alias;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public MetricDataType getDataType() {
return dataType;
}
public void setDataType(MetricDataType dataType) {
this.dataType = dataType;
}
@JsonGetter("metaData")
public MetaData getMetaData() {
return metaData;
}
@JsonSetter("metaData")
public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
isNull = (value == null);
}
public PropertySet getProperties() {
return this.properties;
}
public void setProperties(PropertySet properties) {
this.properties = properties;
}
@JsonIgnore
public Boolean isHistorical() {
return isHistorical == null ? false : isHistorical;
}
@JsonGetter("isHistorical")
public Boolean getIsHistorical() {
return isHistorical;
}
@JsonSetter("isHistorical")
public void setHistorical(Boolean isHistorical) {
this.isHistorical = isHistorical;
}
@JsonIgnore
public Boolean isTransient() {
return isTransient == null ? false : isTransient;
}
@JsonGetter("isTransient")
public Boolean getIsTransient() {
return isTransient;
}
@JsonSetter("isTransient")
public void setTransient(Boolean isTransient) {
this.isTransient = isTransient;
}
@JsonIgnore
public Boolean isNull() {
return isNull == null ? false : isNull;
}
@JsonIgnore
public Boolean getIsNull() {
return isNull;
}
@Override
public String toString() {
return "Metric [name=" + name + ", alias=" + alias + ", timestamp=" + timestamp + ", dataType=" + dataType
+ ", isHistorical=" + isHistorical + ", isTransient=" + isTransient + ", isNull=" + isNull
+ ", metaData=" + metaData + ", propertySet=" + properties + ", value=" + value + "]";
}
/**
* A builder for creating a {@link Metric} instance.
*/
public static class MetricBuilder {
private String name;
private Long alias;
private Date timestamp;
private MetricDataType dataType;
private Boolean isHistorical;
private Boolean isTransient;
private MetaData metaData = null;
private PropertySet properties = null;
private Object value;
public MetricBuilder(String name, MetricDataType dataType, Object value) {
this.name = name;
this.timestamp = new Date();
this.dataType = dataType;
this.value = value;
}
public MetricBuilder(Long alias, MetricDataType dataType, Object value) {
this.alias = alias;
this.timestamp = new Date();
this.dataType = dataType;
this.value = value;
}
public MetricBuilder(Metric metric) throws Exception {
this.name = metric.getName();
this.alias = metric.getAlias();
this.timestamp = metric.getTimestamp();
this.dataType = metric.getDataType();
this.isHistorical = metric.isHistorical();
this.isTransient = metric.isTransient();
this.metaData = metric.getMetaData() != null
? new MetaDataBuilder(metric.getMetaData()).createMetaData() : null;
this.properties = metric.getMetaData() != null
? new PropertySet.PropertySetBuilder(metric.getProperties()).createPropertySet() : null;
switch (dataType) {
case DataSet:
this.value = metric.getValue() != null
? new DataSetBuilder((DataSet) metric.getValue()).createDataSet()
: null;
break;
case Template:
this.value = metric.getValue() != null
? new Template.TemplateBuilder((Template) metric.getValue()).createTemplate()
: null;
break;
default:
this.value = metric.getValue();
}
}
public MetricBuilder name(String name) {
this.name = name;
return this;
}
public MetricBuilder alias(Long alias) {
this.alias = alias;
return this;
}
public MetricBuilder timestamp(Date timestamp) {
this.timestamp = timestamp;
return this;
}
public MetricBuilder dataType(MetricDataType dataType) {
this.dataType = dataType;
return this;
}
public MetricBuilder isHistorical(Boolean isHistorical) {
this.isHistorical = isHistorical;
return this;
}
public MetricBuilder isTransient(Boolean isTransient) {
this.isTransient = isTransient;
return this;
}
public MetricBuilder metaData(MetaData metaData) {
this.metaData = metaData;
return this;
}
public MetricBuilder properties(PropertySet properties) {
this.properties = properties;
return this;
}
public MetricBuilder value(Object value) {
this.value = value;
return this;
}
public Metric createMetric() throws Exception {
return new Metric(name, alias, timestamp, dataType, isHistorical, isTransient, metaData,
properties, value);
}
}
}

138
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetricDataType.java

@ -0,0 +1,138 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import lombok.extern.slf4j.Slf4j;
import java.math.BigInteger;
import java.util.Date;
/**
* An enumeration of data types associated with the value of a {@link Metric}
*/
@Slf4j
public enum MetricDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Custom Types for Metrics
UUID(15, String.class),
DataSet(16, DataSet.class),
Bytes(17, byte[].class),
File(18, File.class),
Template(19, Template.class),
// Unknown
Unknown(0, Object.class);
private Class<?> clazz = null;
private int intValue = 0;
private MetricDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
log.warn("Failed type check - " + clazz + " != " + ((value != null) ? value.getClass().toString() : "null"));
throw new Exception(value.getClass().getName());
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link MetricDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link MetricDataType} instance.
*/
public static MetricDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
case 15:
return UUID;
case 16:
return DataSet;
case 17:
return Bytes;
case 18:
return File;
case 19:
return Template;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

107
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Parameter.java

@ -0,0 +1,107 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import java.util.Objects;
/**
* A class to represent a parameter associated with a template.
*/
public class Parameter {
/**
* The name of the parameter
*/
@JsonProperty("name")
private String name;
/**
* The data type of the parameter
*/
@JsonProperty("type")
private ParameterDataType type;
/**
* The value of the parameter
*/
@JsonProperty("value")
private Object value;
/**
* Constructs a Parameter instance.
*
* @param name The name of the parameter.
* @param type The type of the parameter.
* @param value The value of the parameter.
* @throws Exception
*/
public Parameter(String name, ParameterDataType type, Object value) throws Exception {
this.name = name;
this.type = type;
this.value = value;
this.type.checkType(value);
}
@JsonGetter("name")
public String getName() {
return name;
}
@JsonSetter("name")
public void setName(String name) {
this.name = name;
}
public ParameterDataType getType() {
return type;
}
public void setType(ParameterDataType type) {
this.type = type;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || this.getClass() != object.getClass()) {
return false;
}
Parameter param = (Parameter) object;
return Objects.equals(name, param.getName())
&& Objects.equals(type, param.getType())
&& Objects.equals(value, param.getValue());
}
@Override
public String toString() {
return "Parameter [name=" + name + ", type=" + type + ", value=" + value + "]";
}
}

125
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/ParameterDataType.java

@ -0,0 +1,125 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.math.BigInteger;
import java.util.Date;
/**
* An enumeration of data types for the value of a {@link Parameter} for a {@link Template}
*/
@Slf4j
public enum ParameterDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Unknown
Unknown(0, Object.class);
private static Logger logger = LogManager.getLogger(ParameterDataType.class.getName());
private Class<?> clazz = null;
private int intValue = 0;
private ParameterDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
log.warn("Failed type check - " + clazz + " != " + value.getClass().toString());
throw new Exception(value.getClass().getName());
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link ParameterDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link ParameterDataType} instance.
*/
public static ParameterDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

134
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyDataType.java

@ -0,0 +1,134 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import lombok.extern.slf4j.Slf4j;
import java.math.BigInteger;
import java.util.Date;
import java.util.List;
/**
* An enumeration of data types for values of a {@link PropertySet}
*/
@Slf4j
public enum PropertyDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Custom Types for PropertySets
PropertySet(20, PropertySet.class),
PropertySetList(21, List.class),
// Unknown
Unknown(0, Object.class);
private Class<?> clazz = null;
private int intValue = 0;
private PropertyDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
if(clazz == List.class && value instanceof List) {
// Allow List subclasses
} else {
log.warn("Failed type check - " + clazz + " != " + value.getClass().toString());
throw new Exception(value.getClass().getName());
}
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link PropertyDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link PropertyDataType} instance.
*/
public static PropertyDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
case 20:
return PropertySet;
case 21:
return PropertySetList;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

169
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertySet.java

@ -0,0 +1,169 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* A class that maintains a set of properties associated with a {@link Metric}.
*/
public class PropertySet implements Map<String, PropertyValue> {
@JsonIgnore
private Map<String, PropertyValue> map;
public PropertySet() {
this.map = new HashMap<String, PropertyValue>();
}
private PropertySet(Map<String, PropertyValue> propertyMap) {
this.map = propertyMap;
}
@JsonIgnore
public PropertyValue getPropertyValue(String name) {
return this.map.get(name);
}
@JsonIgnore
public void setProperty(String name, PropertyValue value) {
this.map.put(name, value);
}
@JsonIgnore
public void removeProperty(String name) {
this.map.remove(name);
}
@JsonIgnore
public void clear() {
this.map.clear();
}
@JsonIgnore
public Set<String> getNames() {
return map.keySet();
}
@JsonIgnore
public Collection<PropertyValue> getValues() {
return map.values();
}
@JsonIgnore
public Map<String, PropertyValue> getPropertyMap() {
return map;
}
@Override
public String toString() {
return "PropertySet [propertyMap=" + map + "]";
}
@Override
public int size() {
return map.size();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return map.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return map.containsValue(value);
}
@Override
public PropertyValue get(Object key) {
return map.get(key);
}
@Override
public PropertyValue put(String key, PropertyValue value) {
return map.put(key, value);
}
@Override
public PropertyValue remove(Object key) {
return map.remove(key);
}
@Override
public void putAll(Map<? extends String, ? extends PropertyValue> m) {
map.putAll(m);
}
@Override
public Set<String> keySet() {
return map.keySet();
}
@Override
public Collection<PropertyValue> values() {
return map.values();
}
@Override
public Set<Entry<String, PropertyValue>> entrySet() {
return map.entrySet();
}
/**
* A builder for a PropertySet instance
*/
public static class PropertySetBuilder {
private Map<String, PropertyValue> propertyMap;
public PropertySetBuilder() {
this.propertyMap = new HashMap<String, PropertyValue>();
}
public PropertySetBuilder(PropertySet propertySet) throws Exception {
this.propertyMap = new HashMap<String, PropertyValue>();
for (String name : propertySet.getNames()) {
PropertyValue value = propertySet.getPropertyValue(name);
propertyMap.put(name, new PropertyValue(value.getType(), value.getValue()));
}
}
public PropertySetBuilder addProperty(String name, PropertyValue value) {
this.propertyMap.put(name, value);
return this;
}
public PropertySetBuilder addProperties(Map<String, PropertyValue> properties) {
this.propertyMap.putAll(properties);
return this;
}
public PropertySet createPropertySet() {
return new PropertySet(propertyMap);
}
}
}

86
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyValue.java

@ -0,0 +1,86 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Objects;
/**
* The value of a property in a {@link PropertySet}.
*/
public class PropertyValue {
private PropertyDataType type;
private Object value;
private Boolean isNull = null;
public PropertyValue() {}
/**
* A constructor.
*
* @param type the property type
* @param value the property value
* @throws Exception
*/
public PropertyValue(PropertyDataType type, Object value) throws Exception {
this.type = type;
this.value = value;
isNull = (value == null) ? true : false;
type.checkType(value);
}
public PropertyDataType getType() {
return type;
}
public void setType(PropertyDataType type) {
this.type = type;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
isNull = (value == null) ? true : false;
}
@JsonIgnore
public Boolean isNull() {
return isNull;
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || this.getClass() != object.getClass()) {
return false;
}
PropertyValue propValue = (PropertyValue) object;
return Objects.equals(type, propValue.getType())
&& Objects.equals(value, propValue.getValue());
}
@Override
public String toString() {
return "PropertyValue [type=" + type + ", value=" + value + ", isNull=" + isNull + "]";
}
}

93
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Row.java

@ -0,0 +1,93 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A class for representing a row of a data set.
*/
public class Row {
private List<SparkplugValue<?>> values;
public Row(List<SparkplugValue<?>> values) {
this.values = values;
}
public List<SparkplugValue<?>> getValues() {
return values;
}
public void setValues(List<SparkplugValue<?>> values) {
this.values = values;
}
public void addValue(SparkplugValue<?> value) {
this.values.add(value);
}
@Override
public String toString() {
return "Row [values=" + values + "]";
}
/**
* Converts a {@link Row} instance to a {@link List} of Objects representing the values.
*
* @param row a {@link Row} instance.
* @return a {@link List} of Objects.
*/
public static List<Object> toValues(Row row) {
List<Object> list = new ArrayList<Object>(row.getValues().size());
for (SparkplugValue<?> value : row.getValues()) {
list.add(value.getValue());
}
return list;
}
/**
* A builder for creating a {@link Row} instance.
*/
public static class RowBuilder {
private List<SparkplugValue<?>> values;
public RowBuilder() {
this.values = new ArrayList<SparkplugValue<?>>();
}
public RowBuilder(Row row) {
this.values = new ArrayList<SparkplugValue<?>>(row.getValues());
}
public RowBuilder addValue(SparkplugValue<?> value) {
this.values.add(value);
return this;
}
public RowBuilder addValues(Collection<SparkplugValue<?>> values) {
this.values.addAll(values);
return this;
}
public Row createRow() {
return new Row(values);
}
}
}

161
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugTopic.java

@ -0,0 +1,161 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
/**
* Created by nickAS21 on 12.12.22
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SparkplugTopic {
/**
* The Sparkplug namespace version.
* For the Sparkplug A version of the payload definition, the UTF-8 string constant for the namespace element will be:
* spAv1.0
* For the Sparkplug B version of the specification, the UTF-8 string constant for the namespace element will be:
* spBv1.0
*/
private String namespace;
/**
* The ID of the logical grouping of Edge of Network (EoN) Nodes and devices.
*/
private String groupId;
/**
* The ID of the Edge of Network (EoN) Node.
*/
private String edgeNodeId;
/**
* The ID of the device.
*/
private String deviceId;
/**
* The message type.
*/
private SparkplugMessageType type;
/**
* Constructor (device).
*
* @param namespace the namespace.
* @param groupId the group ID.
* @param edgeNodeId the edge node ID.
* @param deviceId the device ID.
* @param type the message type.
*/
public SparkplugTopic(String namespace, String groupId, String edgeNodeId, String deviceId, SparkplugMessageType type) {
super();
this.namespace = namespace;
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.deviceId = deviceId;
this.type = type;
}
/**
* Constructor (node).
*
* @param namespace the namespace.
* @param groupId the group ID.
* @param edgeNodeId the edge node ID.
* @param type the message type.
*/
public SparkplugTopic(String namespace, String groupId, String edgeNodeId, SparkplugMessageType type) {
super();
this.namespace = namespace;
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.deviceId = null;
this.type = type;
}
/**
* Returns the Sparkplug namespace version.
*
* @return the namespace
*/
public String getNamespace() {
return namespace;
}
/**
* Returns the ID of the logical grouping of Edge of Network (EoN) Nodes and devices.
*
* @return the group ID
*/
public String getGroupId() {
return groupId;
}
/**
* Returns the ID of the Edge of Network (EoN) Node.
*
* @return the edge node ID
*/
public String getEdgeNodeId() {
return edgeNodeId;
}
/**
* Returns the ID of the device.
*
* @return the device ID
*/
public String getDeviceId() {
return deviceId;
}
/**
* Returns the message type.
*
* @return the message type
*/
public SparkplugMessageType getType() {
return type;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(getNamespace()).append("/")
.append(getGroupId()).append("/")
.append(getType()).append("/")
.append(getEdgeNodeId());
if (getDeviceId() != null) {
sb.append("/").append(getDeviceId());
}
return sb.toString();
}
/**
* Returns true if this topic's type matches the passes in type, false otherwise.
*
* @param type the type to check
* @return true if this topic's type matches the passes in type, false otherwise
*/
public boolean isType(SparkplugMessageType type) {
return this.type != null && this.type.equals(type);
}
public boolean isNode() {
return this.deviceId == null;
}
}

53
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugValue.java

@ -0,0 +1,53 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
public class SparkplugValue<V> {
private DataSetDataType type;
private V value;
public SparkplugValue() {
super();
}
public SparkplugValue(DataSetDataType type, V value) {
super();
this.type = type;
this.value = value;
}
public DataSetDataType getType() {
return type;
}
public void setType(DataSetDataType type) {
this.type = type;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}
@Override
public String toString() {
return "Value [type=" + type + ", value=" + value + "]";
}
}

202
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Template.java

@ -0,0 +1,202 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A class representing a template associated with a metric
*/
@JsonInclude(Include.NON_NULL)
public class Template {
/**
* The Template version.
*/
@JsonProperty("version")
private String version;
/**
* The template reference
*/
@JsonProperty("reference")
private String templateRef;
/**
* True if the template is a definition, false otherwise.
*/
@JsonProperty("isDefinition")
private boolean isDefinition;
/**
* List of metrics.
*/
@JsonProperty("metrics")
private List<Metric> metrics;
/**
* List of parameters.
*/
@JsonProperty("parameters")
@JsonInclude(Include.NON_EMPTY)
private List<Parameter> parameters;
public Template() {}
public Template(String version, String templateRef, boolean isDefinition, List<Metric> metrics,
List<Parameter> parameters) {
this.version = version;
this.templateRef = templateRef;
this.isDefinition = isDefinition;
this.metrics = metrics;
this.parameters = parameters;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getTemplateRef() {
return templateRef;
}
public void setTemplateRef(String templateRef) {
this.templateRef = templateRef;
}
@JsonGetter("isDefinition")
public boolean isDefinition() {
return isDefinition;
}
@JsonSetter("isDefinition")
public void setDefinition(boolean isDefinition) {
this.isDefinition = isDefinition;
}
public List<Metric> getMetrics() {
return metrics;
}
public void setMetrics(List<Metric> metrics) {
this.metrics = metrics;
}
public void addMetric(Metric metric) {
this.metrics.add(metric);
}
public List<Parameter> getParameters() {
return parameters;
}
public void setParameters(List<Parameter> parameters) {
this.parameters = parameters;
}
public void addParameter(Parameter parameter) {
this.parameters.add(parameter);
}
@Override
public String toString() {
return "Template [version=" + version + ", templateRef=" + templateRef + ", isDefinition=" + isDefinition
+ ", metrics=" + metrics + ", parameters=" + parameters + "]";
}
/**
* A builder for creating a {@link Template} instance.
*/
public static class TemplateBuilder {
private String version;
private String templateRef;
private boolean isDefinition;
private List<Metric> metrics;
private List<Parameter> parameters;
public TemplateBuilder() {
super();
this.metrics = new ArrayList<Metric>();
this.parameters = new ArrayList<Parameter>();
}
public TemplateBuilder(Template template) throws Exception {
this.version = template.getVersion();
this.templateRef = template.getTemplateRef();
this.isDefinition = template.isDefinition();
this.metrics = new ArrayList<Metric>(template.getMetrics().size());
for (Metric metric : template.getMetrics()) {
this.metrics.add(new Metric.MetricBuilder(metric).createMetric());
}
this.parameters = new ArrayList<Parameter>(template.getParameters().size());
for (Parameter parameter : template.getParameters()) {
this.parameters.add(new Parameter(parameter.getName(), parameter.getType(), parameter.getValue()));
}
}
public TemplateBuilder version(String version) {
this.version = version;
return this;
}
public TemplateBuilder templateRef(String templateRef) {
this.templateRef = templateRef;
return this;
}
public TemplateBuilder definition(boolean isDefinition) {
this.isDefinition = isDefinition;
return this;
}
public TemplateBuilder addMetric(Metric metric) {
this.metrics.add(metric);
return this;
}
public TemplateBuilder addMetrics(Collection<Metric> metrics) {
this.metrics.addAll(metrics);
return this;
}
public TemplateBuilder addParameter(Parameter parameter) {
this.parameters.add(parameter);
return this;
}
public TemplateBuilder addParameters(Collection<Parameter> parameters) {
this.parameters.addAll(parameters);
return this;
}
public Template createTemplate() {
return new Template(version, templateRef, isDefinition, metrics, parameters);
}
}
}

5
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -31,6 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceResponseMsg
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg;
@ -93,6 +95,9 @@ public interface TransportService {
void process(GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback);
void process(GetOrCreateDeviceFromSparkplugRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromSparkplugResponse> callback);
void process(ProvisionDeviceRequestMsg msg,
TransportServiceCallback<ProvisionDeviceResponseMsg> callback);

29
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/GetOrCreateDeviceFromSparkplugResponse.java

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.DeviceProfile;
@Data
@Builder
public class GetOrCreateDeviceFromSparkplugResponse implements DeviceProfileAware {
private TransportDeviceInfo deviceInfo;
private DeviceProfile deviceProfile;
}

48
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -69,6 +69,7 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.TransportTenantProfileCache;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
@ -301,8 +302,8 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getEntityProfileResponseMsg();
@ -313,8 +314,8 @@ public class DefaultTransportService implements TransportService {
@Override
public List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getGetQueueRoutingInfoResponseMsgsList();
@ -325,8 +326,8 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getResourceResponseMsg();
@ -337,8 +338,8 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(TransportProtos.GetSnmpDevicesRequestMsg requestMsg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder()
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
.setSnmpDevicesRequestMsg(requestMsg)
.build()
);
@ -354,7 +355,7 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetDeviceResponseMsg getDevice(TransportProtos.GetDeviceRequestMsg requestMsg) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder()
UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
.setDeviceRequestMsg(requestMsg)
.build()
);
@ -374,7 +375,7 @@ public class DefaultTransportService implements TransportService {
@Override
public TransportProtos.GetDeviceCredentialsResponseMsg getDeviceCredentials(TransportProtos.GetDeviceCredentialsRequestMsg requestMsg) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder()
UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
.setDeviceCredentialsRequestMsg(requestMsg)
.build()
);
@ -479,6 +480,27 @@ public class DefaultTransportService implements TransportService {
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromSparkplugResponse> callback) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceSparlplugRequestMsg(requestMsg).build());
log.trace("Processing msg: {}", requestMsg);
ListenableFuture<GetOrCreateDeviceFromSparkplugResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg msg = tmp.getValue().getGetOrCreateDeviceSparkResponseMsg();
GetOrCreateDeviceFromSparkplugResponse.GetOrCreateDeviceFromSparkplugResponseBuilder result = GetOrCreateDeviceFromSparkplugResponse.builder();
if (msg.hasDeviceInfo()) {
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (profileBody != null && !profileBody.isEmpty()) {
result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody));
}
}
return result.build();
}, MoreExecutors.directExecutor());
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TransportProtos.LwM2MRequestMsg msg, TransportServiceCallback<TransportProtos.LwM2MResponseMsg> callback) {
log.trace("Processing msg: {}", msg);
@ -720,8 +742,8 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetOtaPackageRequestMsg msg, TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build());
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build());
AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> {
callback.onSuccess(response.getValue().getOtaPackageResponseMsg());
@ -864,7 +886,7 @@ public class DefaultTransportService implements TransportService {
}
}
protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
protected void processToTransportMsg(ToTransportMsg toSessionMsg) {
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
SessionMetaData md = sessions.get(sessionId);
if (md != null) {

Loading…
Cancel
Save