Browse Source

sparklug: comments4

pull/7763/head
nickAS21 4 years ago
parent
commit
f6aa9e6125
  1. 100
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  2. 14
      common/cluster-api/src/main/proto/queue.proto
  3. 13
      common/transport/mqtt/pom.xml
  4. 183
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  5. 4
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
  6. 137
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java
  7. 175
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java
  8. 371
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java
  9. 545
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java
  10. 17414
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java
  11. 7
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java
  12. 37
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadDecoder.java
  13. 39
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadEncoder.java
  14. 3
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java
  15. 13
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java
  16. 129
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DataSetDeserializer.java
  17. 38
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModifier.java
  18. 39
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModule.java
  19. 52
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/FileSerializer.java
  20. 70
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/MetricDeserializer.java
  21. 234
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSet.java
  22. 117
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSetDataType.java
  23. 64
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/File.java
  24. 267
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetaData.java
  25. 320
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Metric.java
  26. 138
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetricDataType.java
  27. 107
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Parameter.java
  28. 125
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/ParameterDataType.java
  29. 134
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyDataType.java
  30. 169
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertySet.java
  31. 86
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyValue.java
  32. 93
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Row.java
  33. 53
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugValue.java
  34. 202
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Template.java
  35. 210
      common/transport/mqtt/src/main/proto/sparkplug.proto
  36. 5
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  37. 29
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/GetOrCreateDeviceFromSparkplugResponse.java
  38. 22
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

100
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -47,6 +47,7 @@ import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfigu
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.PowerMode;
import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
@ -161,8 +162,6 @@ public class DefaultTransportApiService implements TransportApiService {
result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
} else if (transportApiRequestMsg.hasGetOrCreateDeviceSparlplugRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceSparlplugRequestMsg());
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
result = handle(transportApiRequestMsg.getEntityProfileRequestMsg());
} else if (transportApiRequestMsg.hasLwM2MRequestMsg()) {
@ -283,6 +282,8 @@ public class DefaultTransportApiService implements TransportApiService {
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
deviceCreationLock.lock();
try {
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
boolean isSparkplug = ((MqttDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration()).isSparkPlug();
Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
if (device == null) {
TenantId tenantId = gateway.getTenantId();
@ -291,7 +292,7 @@ public class DefaultTransportApiService implements TransportApiService {
device.setName(requestMsg.getDeviceName());
device.setType(requestMsg.getDeviceType());
device.setCustomerId(gateway.getCustomerId());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
@ -307,7 +308,8 @@ public class DefaultTransportApiService implements TransportApiService {
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("gatewayId", gatewayId.toString());
String deviceIdStr = isSparkplug ? "sparkplugId" : "gatewayId";
metaData.putValue(deviceIdStr, gatewayId.toString());
DeviceId deviceId = device.getId();
ObjectNode entityNode = mapper.valueToTree(device);
@ -318,23 +320,19 @@ public class DefaultTransportApiService implements TransportApiService {
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
String lastConnectedStr = isSparkplug ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY;
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) {
(!deviceAdditionalInfo.has(lastConnectedStr)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(lastConnectedStr).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
newDeviceAdditionalInfo.put(lastConnectedStr, gatewayId.toString());
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device);
}
}
GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceResponseMsg(builder.build())
.build();
@ -347,82 +345,6 @@ public class DefaultTransportApiService implements TransportApiService {
}, dbCallbackExecutorService);
}
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg requestMsg) {
DeviceId sparkplugNodeId = new DeviceId(new UUID(requestMsg.getSparkplugIdMSB(), requestMsg.getSparkplugIdLSB()));
ListenableFuture<Device> sparkplugNodeFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, sparkplugNodeId);
return Futures.transform(sparkplugNodeFuture, sparkplugNode -> {
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
deviceCreationLock.lock();
try {
Device device = deviceService.findDeviceByTenantIdAndName(sparkplugNode.getTenantId(), requestMsg.getDeviceName());
if (device == null) {
TenantId tenantId = sparkplugNode.getTenantId();
device = new Device();
device.setTenantId(tenantId);
device.setName(requestMsg.getDeviceName());
device.setType(requestMsg.getDeviceType());
device.setCustomerId(sparkplugNode.getCustomerId());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(sparkplugNode.getTenantId(), requestMsg.getDeviceType());
device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_SPARKPLUG, sparkplugNodeId.toString());
device.setAdditionalInfo(additionalInfo);
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, null);
device = savedDevice;
relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(device.getId(), sparkplugNode.getId(), "Created"));
TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = sparkplugNode.getCustomerId();
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("sparkplugNodeId", sparkplugNodeId.toString());
DeviceId deviceId = device.getId();
ObjectNode entityNode = mapper.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
} else {
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_SPARKPLUG)
|| !sparkplugNodeId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_SPARKPLUG).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_SPARKPLUG, sparkplugNodeId.toString());
Device savedDevice = deviceService.saveDevice(device);
relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(device.getId(), sparkplugNode.getId(), "Created"));
tbClusterService.onDeviceUpdated(savedDevice, device);
}
}
TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.Builder builder =
TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceSparkResponseMsg(builder.build())
.build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by sparkplug Node id and name: [{}]", sparkplugNodeId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} finally {
deviceCreationLock.unlock();
}
}, dbCallbackExecutorService);
}
private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) {
ListenableFuture<ProvisionResponse> provisionResponseFuture = null;
try {

14
common/cluster-api/src/main/proto/queue.proto

@ -193,18 +193,6 @@ message GetOrCreateDeviceFromGatewayResponseMsg {
bytes profileBody = 2;
}
message GetOrCreateDeviceFromSparkplugRequestMsg {
string deviceName = 1;
string deviceType = 2;
int64 sparkplugIdMSB = 3;
int64 sparkplugIdLSB = 4;
}
message GetOrCreateDeviceFromSparkplugResponseMsg {
DeviceInfoProto deviceInfo = 1;
bytes profileBody = 2;
}
message GetEntityProfileRequestMsg {
string entityType = 1;
int64 entityIdMSB = 2;
@ -909,7 +897,6 @@ message TransportApiRequestMsg {
GetDeviceRequestMsg deviceRequestMsg = 12;
GetDeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 13;
GetAllQueueRoutingInfoRequestMsg getAllQueueRoutingInfoRequestMsg = 14;
GetOrCreateDeviceFromSparkplugRequestMsg getOrCreateDeviceSparlplugRequestMsg = 15;
}
/* Response from ThingsBoard Core Service to Transport Service */
@ -925,7 +912,6 @@ message TransportApiResponseMsg {
GetDeviceResponseMsg deviceResponseMsg = 9;
GetDeviceCredentialsResponseMsg deviceCredentialsResponseMsg = 10;
repeated GetQueueRoutingInfoResponseMsg getQueueRoutingInfoResponseMsgs = 11;
GetOrCreateDeviceFromSparkplugResponseMsg getOrCreateDeviceSparkResponseMsg = 12;
}
/* Messages that are handled by ThingsBoard Core Service */

13
common/transport/mqtt/pom.xml

@ -97,6 +97,19 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

183
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -47,7 +47,6 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.OtaPackageId;
@ -68,13 +67,14 @@ import org.thingsboard.server.common.transport.util.SslUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugTopic;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.IOException;
@ -331,14 +331,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
}
} else if (sparkPlugSessionHandler != null) {
try {
SparkplugTopic sparkplugTopic = parseTopic(topicName);
log.error("Publish [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
handleSparkplugPublishMsg(ctx, sparkplugTopic, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
} catch (Exception e) {
e.printStackTrace();
}
handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
} else {
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
@ -380,25 +374,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, SparkplugTopic sparkplugTopic, int msgId, MqttPublishMessage mqttMsg) {
String topicName = sparkplugTopic.toString();
private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) {
try {
switch(sparkplugTopic.getType()) {
case NBIRTH:
// TODO regular publish
// sparkPlugSessionHandler.onNodeConnectProto(mqttMsg);
break;
case DBIRTH:
sparkPlugSessionHandler.onDeviceConnectProto(mqttMsg);
break;
default:
}
sparkPlugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg);
} catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ctx.close();
} catch (AdaptorException e) {
} catch (Exception e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId);
}
@ -655,82 +637,80 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
try {
SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName());
log.error("Subscribe [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
} catch (Exception e) {
e.printStackTrace();
}
List<Integer> grantedQoSList = new ArrayList<>();
boolean activityReported = false;
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
try {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
if (sparkPlugSessionHandler != null) {
SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName());
sparkPlugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS);
} else {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC:
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC:
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
break;
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
@ -998,22 +978,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void checkSparkPlugConnected(SessionMetaData sessionMetaData, MqttConnectMessage connectMessage) {
if (((MqttDeviceProfileTransportConfiguration) deviceSessionCtx.getDeviceProfile().getProfileData().getTransportConfiguration())
.isSparkPlug()) {
private void checkSparkPlugSession(SessionMetaData sessionMetaData, MqttConnectMessage connectMessage) {
if (deviceSessionCtx.isSparkplug()) {
TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
try {
JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
if (infoNode != null) {
SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic());
SparkplugBProto.Payload payloadBProto = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
if (sparkPlugSessionHandler == null) {
log.error("Connected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
log.error("SparkPlugConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
sparkPlugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString());
} else {
log.error("ReConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType());
}
if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
log.error("SparkPlugReConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
}
}
} catch (Exception e) {
@ -1057,7 +1034,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onSuccess(Void msg) {
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
checkGatewaySession(sessionMetaData);
checkSparkPlugConnected(sessionMetaData, connectMessage);
checkSparkPlugSession(sessionMetaData, connectMessage);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true);
log.debug("[{}] Client connected!", sessionId);

4
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java

@ -255,4 +255,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
return Collections.unmodifiableCollection(msgQueue);
}
public boolean isSparkplug () {
return ((MqttDeviceProfileTransportConfiguration) this.getDeviceProfile().getProfileData().getTransportConfiguration()).isSparkPlug();
}
}

137
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java

@ -27,23 +27,25 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.annotation.Nullable;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -102,7 +104,7 @@ public class SparkplugNodeSessionHandler {
return sessionId;
}
public String getNodeTopic() {
public String getNodeTopic() {
return nodeTopic;
}
@ -138,12 +140,64 @@ public class SparkplugNodeSessionHandler {
String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId();
String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType();
processOnConnect(mqttMsg, deviceName, deviceType);
} catch (Exception e) {
} catch (Exception e) {
throw new AdaptorException(e);
}
}
private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
public void onPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) throws Exception {
SparkplugTopic sparkplugTopic = parseTopic(topicName);
log.error("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case NBIRTH:
// TODO
break;
case NCMD:
// TODO
break;
case NDATA:
// TODO
break;
case NDEATH:
onNodeDisconnectProto(mqttMsg);
break;
case NRECORD:
// TODO
break;
default:
}
} else {
// A device topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case DBIRTH:
onDeviceConnectProto(mqttMsg);
break;
case DCMD:
// TODO
break;
case DDATA:
// TODO
break;
case DDEATH:
onDeviceDisconnectProto(mqttMsg);
break;
case DRECORD:
// TODO
break;
default:
}
}
}
private void onNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
@ -153,16 +207,75 @@ public class SparkplugNodeSessionHandler {
}
}
private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
// TODO disconnect device without disconnect Node
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
}
}
private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg);
}
private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
return JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) {
String topicName = sparkplugTopic.toString();
log.error("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case NBIRTH:
// TODO
break;
case NCMD:
// TODO
break;
case NDATA:
// TODO
break;
case NDEATH:
// TODO
break;
case NRECORD:
// TODO
break;
default:
}
} else {
// A device topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case DBIRTH:
// TODO
break;
case DCMD:
// TODO
break;
case DDATA:
// TODO
break;
case DDEATH:
// TODO
break;
case DRECORD:
// TODO
break;
default:
}
}
}
private byte[] getBytes(ByteBuf payload) {
return ProtoMqttAdaptor.toBytes(payload);
}
@ -242,15 +355,15 @@ public class SparkplugNodeSessionHandler {
return future;
}
try {
transportService.process(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg.newBuilder()
transportService.process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setSparkplugIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits())
.setSparkplugIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits())
.setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits())
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromSparkplugResponse msg) {
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
if (msg.getDeviceInfo() == null) {
System.out.println("DeviceInfo == null");
}

175
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java

@ -1,175 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
/**
* Created by nickAS21 on 13.12.22
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SparkplugBPayload {
private Date timestamp;
private List<Metric> metrics;
private long seq = -1;
private String uuid;
private byte[] body;
public SparkplugBPayload() {};
public SparkplugBPayload(Date timestamp, List<Metric> metrics, long seq, String uuid, byte[] body) {
this.timestamp = timestamp;
this.metrics = metrics;
this.seq = seq;
this.uuid = uuid;
this.body = body;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public void addMetric(Metric metric) {
metrics.add(metric);
}
public void addMetric(int index, Metric metric) {
metrics.add(index, metric);
}
public void addMetrics(List<Metric> metrics) {
this.metrics.addAll(metrics);
}
public Metric removeMetric(int index) {
return metrics.remove(index);
}
public boolean removeMetric(Metric metric) {
return metrics.remove(metric);
}
public List<Metric> getMetrics() {
return metrics;
}
@JsonIgnore
public Integer getMetricCount() {
return metrics.size();
}
public void setMetrics(List<Metric> metrics) {
this.metrics = metrics;
}
public long getSeq() {
return seq;
}
public void setSeq(long seq) {
this.seq = seq;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
@Override
public String toString() {
return "SparkplugBPayload [timestamp=" + timestamp + ", metrics=" + metrics + ", seq=" + seq + ", uuid=" + uuid
+ ", body=" + Arrays.toString(body) + "]";
}
/**
* A builder for creating a {@link SparkplugBPayload} instance.
*/
public static class SparkplugBPayloadBuilder {
private Date timestamp;
private List<Metric> metrics;
private long seq = -1;
private String uuid;
private byte[] body;
public SparkplugBPayloadBuilder(long sequenceNumber) {
this.seq = sequenceNumber;
metrics = new ArrayList<Metric>();
}
public SparkplugBPayloadBuilder() {
metrics = new ArrayList<Metric>();
}
public SparkplugBPayloadBuilder addMetric(Metric metric) {
this.metrics.add(metric);
return this;
}
public SparkplugBPayloadBuilder addMetrics(Collection<Metric> metrics) {
this.metrics.addAll(metrics);
return this;
}
public SparkplugBPayloadBuilder setTimestamp(Date timestamp) {
this.timestamp = timestamp;
return this;
}
public SparkplugBPayloadBuilder setSeq(long seq) {
this.seq = seq;
return this;
}
public SparkplugBPayloadBuilder setUuid(String uuid) {
this.uuid = uuid;
return this;
}
public SparkplugBPayloadBuilder setBody(byte[] body) {
this.body = body;
return this;
}
public SparkplugBPayload createPayload() {
return new SparkplugBPayload(timestamp, metrics, seq, uuid, body);
}
}
}

371
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java

@ -1,371 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* Created by nickAS21 on 13.12.22
*/
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A {@link SparkplugPayloadDecoder} implementation for decoding Sparkplug B payloads.
*/
@Slf4j
public class SparkplugBPayloadDecoder implements SparkplugPayloadDecoder<SparkplugBPayload> {
public SparkplugBPayloadDecoder() {
super();
}
public SparkplugBPayload buildFromByteArray(byte[] bytes) throws Exception {
SparkplugBProto.Payload protoPayload = SparkplugBProto.Payload.parseFrom(bytes);
SparkplugBPayload.SparkplugBPayloadBuilder builder = new SparkplugBPayload.SparkplugBPayloadBuilder(protoPayload.getSeq());
// Set the timestamp
if (protoPayload.hasTimestamp()) {
builder.setTimestamp(new Date(protoPayload.getTimestamp()));
}
// Set the sequence number
if (protoPayload.hasSeq()) {
builder.setSeq(protoPayload.getSeq());
}
// Set the Metrics
for (SparkplugBProto.Payload.Metric protoMetric : protoPayload.getMetricsList()) {
builder.addMetric(convertMetric(protoMetric));
}
// Set the body
if (protoPayload.hasBody()) {
builder.setBody(protoPayload.getBody().toByteArray());
}
// Set the body
if (protoPayload.hasUuid()) {
builder.setUuid(protoPayload.getUuid());
}
return builder.createPayload();
}
private Metric convertMetric(SparkplugBProto.Payload.Metric protoMetric) throws Exception {
// Convert the dataType
MetricDataType dataType = MetricDataType.fromInteger((protoMetric.getDatatype()));
// Build and return the Metric
return new Metric.MetricBuilder(protoMetric.getName(), dataType, getMetricValue(protoMetric))
.isHistorical(
protoMetric.hasIsHistorical() ? protoMetric.getIsHistorical() : null)
.isTransient(protoMetric
.hasIsTransient() ? protoMetric.getIsTransient() : null)
.timestamp(protoMetric.hasTimestamp() ? new Date(protoMetric.getTimestamp()) : null)
.alias(protoMetric.hasAlias() ? protoMetric.getAlias() : null)
.metaData(protoMetric.hasMetadata()
? new MetaData.MetaDataBuilder().contentType(protoMetric.getMetadata().getContentType())
.size(protoMetric.getMetadata().getSize()).seq(protoMetric.getMetadata().getSeq())
.fileName(protoMetric.getMetadata().getFileName())
.fileType(protoMetric.getMetadata().getFileType())
.md5(protoMetric.getMetadata().getMd5())
.description(protoMetric.getMetadata().getDescription()).createMetaData()
: null)
.properties(protoMetric.hasProperties()
? new PropertySet.PropertySetBuilder().addProperties(convertProperties(protoMetric.getProperties()))
.createPropertySet()
: null)
.createMetric();
}
private Map<String, PropertyValue> convertProperties(SparkplugBProto.Payload.PropertySet decodedPropSet)
throws Exception {
Map<String, PropertyValue> map = new HashMap<String, PropertyValue>();
List<String> keys = decodedPropSet.getKeysList();
List<SparkplugBProto.Payload.PropertyValue> values = decodedPropSet.getValuesList();
for (int i = 0; i < keys.size(); i++) {
SparkplugBProto.Payload.PropertyValue value = values.get(i);
map.put(keys.get(i),
new PropertyValue(PropertyDataType.fromInteger(value.getType()), getPropertyValue(value)));
}
return map;
}
private Object getPropertyValue(SparkplugBProto.Payload.PropertyValue value) throws Exception {
PropertyDataType type = PropertyDataType.fromInteger(value.getType());
if (value.getIsNull()) {
return null;
}
switch (type) {
case Boolean:
return value.getBooleanValue();
case DateTime:
return new Date(value.getLongValue());
case Float:
return value.getFloatValue();
case Double:
return value.getDoubleValue();
case Int8:
return (byte) value.getIntValue();
case Int16:
case UInt8:
return (short) value.getIntValue();
case Int32:
case UInt16:
return value.getIntValue();
case UInt32:
case Int64:
return value.getLongValue();
case UInt64:
return BigInteger.valueOf(value.getLongValue());
case String:
case Text:
return value.getStringValue();
case PropertySet:
return new PropertySet.PropertySetBuilder().addProperties(convertProperties(value.getPropertysetValue()))
.createPropertySet();
case PropertySetList:
List<PropertySet> propertySetList = new ArrayList<PropertySet>();
List<SparkplugBProto.Payload.PropertySet> list = value.getPropertysetsValue().getPropertysetList();
for (SparkplugBProto.Payload.PropertySet decodedPropSet : list) {
propertySetList.add(new PropertySet.PropertySetBuilder().addProperties(convertProperties(decodedPropSet))
.createPropertySet());
}
return propertySetList;
case Unknown:
default:
throw new Exception("Failed to decode: Unknown Property Data Type " + type);
}
}
private Object getMetricValue(SparkplugBProto.Payload.Metric protoMetric) throws Exception {
// Check if the null flag has been set indicating that the value is null
if (protoMetric.getIsNull()) {
return null;
}
// Otherwise convert the value based on the type
int metricType = protoMetric.getDatatype();
switch (MetricDataType.fromInteger(metricType)) {
case Boolean:
return protoMetric.getBooleanValue();
case DateTime:
return new Date(protoMetric.getLongValue());
case File:
String filename = protoMetric.getMetadata().getFileName();
byte[] fileBytes = protoMetric.getBytesValue().toByteArray();
return new File(filename, fileBytes);
case Float:
return protoMetric.getFloatValue();
case Double:
return protoMetric.getDoubleValue();
case Int8:
return (byte) protoMetric.getIntValue();
case Int16:
case UInt8:
return (short) protoMetric.getIntValue();
case Int32:
case UInt16:
return protoMetric.getIntValue();
case UInt32:
case Int64:
return protoMetric.getLongValue();
case UInt64:
return BigInteger.valueOf(protoMetric.getLongValue());
case String:
case Text:
case UUID:
return protoMetric.getStringValue();
case Bytes:
return protoMetric.getBytesValue().toByteArray();
case DataSet:
SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue();
// Build the and create the DataSet
return new DataSet.DataSetBuilder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList())
.addTypes(convertDataSetDataTypes(protoDataSet.getTypesList()))
.addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList()))
.createDataSet();
case Template:
SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue();
List<Metric> metrics = new ArrayList<Metric>();
List<Parameter> parameters = new ArrayList<Parameter>();
for (SparkplugBProto.Payload.Template.Parameter protoParameter : protoTemplate.getParametersList()) {
String name = protoParameter.getName();
ParameterDataType type = ParameterDataType.fromInteger(protoParameter.getType());
Object value = getParameterValue(protoParameter);
if (log.isTraceEnabled()) {
log.trace("Setting template parameter name: " + name + ", type: " + type + ", value: "
+ value + ", valueType" + value.getClass());
}
parameters.add(new Parameter(name, type, value));
}
for (SparkplugBProto.Payload.Metric protoTemplateMetric : protoTemplate.getMetricsList()) {
Metric templateMetric = convertMetric(protoTemplateMetric);
if (log.isTraceEnabled()) {
log.trace("Setting template parameter name: " + templateMetric.getName() + ", type: "
+ templateMetric.getDataType() + ", value: " + templateMetric.getValue());
}
metrics.add(templateMetric);
}
Template template = new Template.TemplateBuilder().version(protoTemplate.getVersion())
.templateRef(protoTemplate.getTemplateRef()).definition(protoTemplate.getIsDefinition())
.addMetrics(metrics).addParameters(parameters).createTemplate();
if (log.isTraceEnabled()) {
log.trace(
"Setting template - name: " + protoMetric.getName() + ", version: " + template.getVersion()
+ ", ref: " + template.getTemplateRef() + ", isDef: " + template.isDefinition()
+ ", metrics: " + metrics.size() + ", params: " + parameters.size());
}
return template;
case Unknown:
default:
throw new Exception("Failed to decode: Unknown Metric DataType " + metricType);
}
}
private Collection<Row> convertDataSetRows(List<SparkplugBProto.Payload.DataSet.Row> protoRows,
List<Integer> protoTypes) throws Exception {
Collection<Row> rows = new ArrayList<Row>();
if (protoRows != null) {
for (SparkplugBProto.Payload.DataSet.Row protoRow : protoRows) {
List<SparkplugBProto.Payload.DataSet.DataSetValue> protoValues = protoRow.getElementsList();
List<SparkplugValue<?>> values = new ArrayList<SparkplugValue<?>>();
for (int index = 0; index < protoRow.getElementsCount(); index++) {
values.add(convertDataSetValue(protoTypes.get(index), protoValues.get(index)));
}
// Add the values to the row and the row to the rows
rows.add(new Row.RowBuilder().addValues(values).createRow());
}
}
return rows;
}
private Collection<DataSetDataType> convertDataSetDataTypes(List<Integer> protoTypes) {
List<DataSetDataType> types = new ArrayList<DataSetDataType>();
// Build up a List of column types
for (int type : protoTypes) {
types.add(DataSetDataType.fromInteger(type));
}
return types;
}
private Object getParameterValue(SparkplugBProto.Payload.Template.Parameter protoParameter) throws Exception {
// Otherwise convert the value based on the type
int type = protoParameter.getType();
switch (MetricDataType.fromInteger(type)) {
case Boolean:
return protoParameter.getBooleanValue();
case DateTime:
return new Date(protoParameter.getLongValue());
case Float:
return protoParameter.getFloatValue();
case Double:
return protoParameter.getDoubleValue();
case Int8:
return (byte) protoParameter.getIntValue();
case Int16:
case UInt8:
return (short) protoParameter.getIntValue();
case Int32:
case UInt16:
return protoParameter.getIntValue();
case UInt32:
case Int64:
return protoParameter.getLongValue();
case UInt64:
return BigInteger.valueOf(protoParameter.getLongValue());
case String:
case Text:
return protoParameter.getStringValue();
case Unknown:
default:
throw new Exception("Failed to decode: Unknown Parameter Type " + type);
}
}
private SparkplugValue<?> convertDataSetValue(int protoType, SparkplugBProto.Payload.DataSet.DataSetValue protoValue)
throws Exception {
DataSetDataType type = DataSetDataType.fromInteger(protoType);
switch (type) {
case Boolean:
return new SparkplugValue<Boolean>(type, protoValue.getBooleanValue());
case DateTime:
// FIXME - remove after is_null is supported for dataset values
if (protoValue.getLongValue() == -9223372036854775808L) {
return new SparkplugValue<Date>(type, null);
} else {
return new SparkplugValue<Date>(type, new Date(protoValue.getLongValue()));
}
case Float:
return new SparkplugValue<Float>(type, protoValue.getFloatValue());
case Double:
return new SparkplugValue<Double>(type, protoValue.getDoubleValue());
case Int8:
return new SparkplugValue<Byte>(type, (byte) protoValue.getIntValue());
case UInt8:
case Int16:
return new SparkplugValue<Short>(type, (short) protoValue.getIntValue());
case UInt16:
case Int32:
return new SparkplugValue<Integer>(type, protoValue.getIntValue());
case UInt32:
case Int64:
return new SparkplugValue<Long>(type, protoValue.getLongValue());
case UInt64:
return new SparkplugValue<BigInteger>(type, BigInteger.valueOf(protoValue.getLongValue()));
case String:
case Text:
if (protoValue.getStringValue().equals("null")) {
return new SparkplugValue<String>(type, null);
} else {
return new SparkplugValue<String>(type, protoValue.getStringValue());
}
case Unknown:
default:
log.error("Unknown DataType: " + protoType);
throw new Exception("Failed to decode");
}
}
}

545
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java

@ -1,545 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.google.protobuf.ByteString;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* Created by nickAS21 on 13.12.22
*/
public class SparkplugBPayloadEncoder implements SparkplugPayloadEncoder <SparkplugBPayload> {
private static Logger logger = LogManager.getLogger(SparkplugBPayloadEncoder.class.getName());
public SparkplugBPayloadEncoder() {
super();
}
public byte[] getBytes(SparkplugBPayload payload) throws IOException {
SparkplugBProto.Payload.Builder protoMsg = SparkplugBProto.Payload.newBuilder();
// Set the timestamp
if (payload.getTimestamp() != null) {
protoMsg.setTimestamp(payload.getTimestamp().getTime());
}
// Set the sequence number
protoMsg.setSeq(payload.getSeq());
// Set the UUID if defined
if (payload.getUuid() != null) {
protoMsg.setUuid(payload.getUuid());
}
// Set the metrics
for (Metric metric : payload.getMetrics()) {
try {
protoMsg.addMetrics(convertMetric(metric));
} catch(Exception e) {
logger.error("Failed to add metric: " + metric.getName());
throw new RuntimeException(e);
}
}
// Set the body
if (payload.getBody() != null) {
protoMsg.setBody(ByteString.copyFrom(payload.getBody()));
}
return protoMsg.build().toByteArray();
}
private SparkplugBProto.Payload.Metric.Builder convertMetric(Metric metric) throws Exception {
// build a metric
SparkplugBProto.Payload.Metric.Builder builder = SparkplugBProto.Payload.Metric.newBuilder();
// set the basic parameters
builder.setDatatype(metric.getDataType().toIntValue());
builder = setMetricValue(builder, metric);
// Set the name, data type, and value
if (metric.hasName()) {
builder.setName(metric.getName());
}
// Set the alias
if (metric.hasAlias()) {
builder.setAlias(metric.getAlias());
}
// Set the timestamp
if (metric.getTimestamp() != null) {
builder.setTimestamp(metric.getTimestamp().getTime());
}
// Set isHistorical
if (metric.getIsHistorical() != null) {
builder.setIsHistorical(metric.isHistorical());
}
// Set isTransient
if (metric.getIsTransient() != null) {
builder.setIsTransient(metric.isTransient());
}
// Set isNull
if (metric.getIsNull() != null) {
builder.setIsNull(metric.isNull());
}
// Set the metadata
if (metric.getMetaData() != null) {
builder = setMetaData(builder, metric);
}
// Set the property set
if (metric.getProperties() != null) {
builder.setProperties(convertPropertySet(metric.getProperties()));
}
return builder;
}
private SparkplugBProto.Payload.Template.Parameter.Builder convertParameter(Parameter parameter) throws Exception {
// build a metric
SparkplugBProto.Payload.Template.Parameter.Builder builder =
SparkplugBProto.Payload.Template.Parameter.newBuilder();
if (logger.isTraceEnabled()) {
logger.trace("Adding parameter: " + parameter.getName());
logger.trace(" type: " + parameter.getType());
}
// Set the name
builder.setName(parameter.getName());
// Set the type and value
builder = setParameterValue(builder, parameter);
return builder;
}
private SparkplugBProto.Payload.PropertySet.Builder convertPropertySet(PropertySet propertySet) throws Exception {
SparkplugBProto.Payload.PropertySet.Builder setBuilder = SparkplugBProto.Payload.PropertySet.newBuilder();
Map<String, PropertyValue> map = propertySet.getPropertyMap();
for (String key : map.keySet()) {
SparkplugBProto.Payload.PropertyValue.Builder builder = SparkplugBProto.Payload.PropertyValue.newBuilder();
PropertyValue value = map.get(key);
PropertyDataType type = value.getType();
builder.setType(type.toIntValue());
if (value.getValue() == null) {
builder.setIsNull(true);
} else {
switch (type) {
case Boolean:
builder.setBooleanValue((Boolean) value.getValue());
break;
case DateTime:
builder.setLongValue(((Date) value.getValue()).getTime());
break;
case Double:
builder.setDoubleValue((Double) value.getValue());
break;
case Float:
builder.setFloatValue((Float) value.getValue());
break;
case Int8:
builder.setIntValue((Byte) value.getValue());
break;
case Int16:
case UInt8:
builder.setIntValue((Short) value.getValue());
break;
case Int32:
case UInt16:
builder.setIntValue((Integer) value.getValue());
break;
case Int64:
case UInt32:
builder.setLongValue((Long) value.getValue());
break;
case UInt64:
builder.setLongValue(((BigInteger) value.getValue()).longValue());
break;
case String:
case Text:
builder.setStringValue((String) value.getValue());
break;
case PropertySet:
builder.setPropertysetValue(convertPropertySet((PropertySet) value.getValue()));
break;
case PropertySetList:
List<?> setList = (List<?>) value.getValue();
SparkplugBProto.Payload.PropertySetList.Builder listBuilder =
SparkplugBProto.Payload.PropertySetList.newBuilder();
for (Object obj : setList) {
listBuilder.addPropertyset(convertPropertySet((PropertySet) obj));
}
builder.setPropertysetsValue(listBuilder);
break;
case Unknown:
default:
logger.error("Unknown DataType: " + value.getType());
throw new Exception("Failed to convert value " + value.getType());
}
}
setBuilder.addKeys(key);
setBuilder.addValues(builder);
}
return setBuilder;
}
private SparkplugBProto.Payload.Template.Parameter.Builder setParameterValue(
SparkplugBProto.Payload.Template.Parameter.Builder builder, Parameter parameter) throws Exception {
ParameterDataType type = parameter.getType();
builder.setType(type.toIntValue());
Object value = parameter.getValue();
switch (type) {
case Boolean:
builder.setBooleanValue(toBoolean(value));
break;
case DateTime:
builder.setLongValue(((Date) value).getTime());
break;
case Double:
builder.setDoubleValue((Double) value);
break;
case Float:
builder.setFloatValue((Float) value);
break;
case Int8:
builder.setIntValue((Byte) value);
break;
case Int16:
case UInt8:
builder.setIntValue((Short) value);
break;
case Int32:
case UInt16:
builder.setIntValue((Integer) value);
break;
case Int64:
case UInt32:
builder.setLongValue((Long) value);
break;
case UInt64:
builder.setLongValue(((BigInteger) value).longValue());
break;
case Text:
case String:
if (value == null) {
builder.setStringValue("");
} else {
builder.setStringValue((String) value);
}
break;
case Unknown:
default:
logger.error("Unknown Type: " + type);
throw new Exception("Failed to encode");
}
return builder;
}
private SparkplugBProto.Payload.Metric.Builder setMetricValue(SparkplugBProto.Payload.Metric.Builder metricBuilder,
Metric metric) throws Exception {
// Set the data type
metricBuilder.setDatatype(metric.getDataType().toIntValue());
if (metric.getValue() == null) {
metricBuilder.setIsNull(true);
} else {
switch (metric.getDataType()) {
case Boolean:
metricBuilder.setBooleanValue(toBoolean(metric.getValue()));
break;
case DateTime:
metricBuilder.setLongValue(((Date) metric.getValue()).getTime());
break;
case File:
metricBuilder.setBytesValue(ByteString.copyFrom(((File) metric.getValue()).getBytes()));
SparkplugBProto.Payload.MetaData.Builder metaDataBuilder =
SparkplugBProto.Payload.MetaData.newBuilder();
metaDataBuilder.setFileName(((File) metric.getValue()).getFileName());
metricBuilder.setMetadata(metaDataBuilder);
break;
case Float:
metricBuilder.setFloatValue((Float) metric.getValue());
break;
case Double:
metricBuilder.setDoubleValue((Double) metric.getValue());
break;
case Int8:
metricBuilder.setIntValue(((Byte)metric.getValue()).intValue());
break;
case Int16:
case UInt8:
metricBuilder.setIntValue(((Short)metric.getValue()).intValue());
break;
case Int32:
case UInt16:
metricBuilder.setIntValue((int) metric.getValue());
break;
case UInt32:
case Int64:
metricBuilder.setLongValue((Long) metric.getValue());
break;
case UInt64:
metricBuilder.setLongValue(((BigInteger) metric.getValue()).longValue());
break;
case String:
case Text:
case UUID:
metricBuilder.setStringValue((String) metric.getValue());
break;
case Bytes:
metricBuilder.setBytesValue(ByteString.copyFrom((byte[]) metric.getValue()));
break;
case DataSet:
DataSet dataSet = (DataSet) metric.getValue();
SparkplugBProto.Payload.DataSet.Builder dataSetBuilder =
SparkplugBProto.Payload.DataSet.newBuilder();
dataSetBuilder.setNumOfColumns(dataSet.getNumOfColumns());
// Column names
List<String> columnNames = dataSet.getColumnNames();
if (columnNames != null && !columnNames.isEmpty()) {
for (String name : columnNames) {
// Add the column name
dataSetBuilder.addColumns(name);
}
}
// Column types
List<DataSetDataType> columnTypes = dataSet.getTypes();
if (columnTypes != null && !columnTypes.isEmpty()) {
for (DataSetDataType type : columnTypes) {
// Add the column type
dataSetBuilder.addTypes(type.toIntValue());
}
}
// Dataset rows
List<Row> rows = dataSet.getRows();
if (rows != null && !rows.isEmpty()) {
for (Row row : rows) {
SparkplugBProto.Payload.DataSet.Row.Builder protoRowBuilder =
SparkplugBProto.Payload.DataSet.Row.newBuilder();
List<SparkplugValue<?>> values = row.getValues();
if (values != null && !values.isEmpty()) {
for (SparkplugValue<?> value : values) {
// Add the converted element
protoRowBuilder.addElements(convertDataSetValue(value));
}
dataSetBuilder.addRows(protoRowBuilder);
}
}
}
// Finally add the dataset
metricBuilder.setDatasetValue(dataSetBuilder);
break;
case Template:
Template template = (Template) metric.getValue();
SparkplugBProto.Payload.Template.Builder templateBuilder =
SparkplugBProto.Payload.Template.newBuilder();
// Set isDefinition
templateBuilder.setIsDefinition(template.isDefinition());
// Set Version
if (template.getVersion() != null) {
templateBuilder.setVersion(template.getVersion());
}
// Set Template Reference
if (template.getTemplateRef() != null) {
templateBuilder.setTemplateRef(template.getTemplateRef());
}
// Set the template metrics
if (template.getMetrics() != null) {
for (Metric templateMetric : template.getMetrics()) {
templateBuilder.addMetrics(convertMetric(templateMetric));
}
}
// Set the template parameters
if (template.getParameters() != null) {
for (Parameter parameter : template.getParameters()) {
templateBuilder.addParameters(convertParameter(parameter));
}
}
// Add the template to the metric
metricBuilder.setTemplateValue(templateBuilder);
break;
case Unknown:
default:
logger.error("Unknown DataType: " + metric.getDataType());
throw new Exception("Failed to encode");
}
}
return metricBuilder;
}
private SparkplugBProto.Payload.Metric.Builder setMetaData(SparkplugBProto.Payload.Metric.Builder metricBuilder,
Metric metric) throws Exception {
// If the builder has been built already - use it
SparkplugBProto.Payload.MetaData.Builder metaDataBuilder = metricBuilder.getMetadataBuilder() != null
? metricBuilder.getMetadataBuilder()
: SparkplugBProto.Payload.MetaData.newBuilder();
MetaData metaData = metric.getMetaData();
if (metaData.getContentType() != null) {
metaDataBuilder.setContentType(metaData.getContentType());
}
if (metaData.getSize() != null) {
metaDataBuilder.setSize(metaData.getSize());
}
if (metaData.getSeq() != null) {
metaDataBuilder.setSeq(metaData.getSeq());
}
if (metaData.getFileName() != null) {
metaDataBuilder.setFileName(metaData.getFileName());
}
if (metaData.getFileType() != null) {
metaDataBuilder.setFileType(metaData.getFileType());
}
if (metaData.getMd5() != null) {
metaDataBuilder.setMd5(metaData.getMd5());
}
if (metaData.getDescription() != null) {
metaDataBuilder.setDescription(metaData.getDescription());
}
metricBuilder.setMetadata(metaDataBuilder);
return metricBuilder;
}
private SparkplugBProto.Payload.DataSet.DataSetValue.Builder convertDataSetValue(SparkplugValue<?> value) throws Exception {
SparkplugBProto.Payload.DataSet.DataSetValue.Builder protoValueBuilder =
SparkplugBProto.Payload.DataSet.DataSetValue.newBuilder();
// Set the value
DataSetDataType type = value.getType();
switch (type) {
case Int8:
protoValueBuilder.setIntValue((Byte) value.getValue());
break;
case Int16:
case UInt8:
protoValueBuilder.setIntValue((Short) value.getValue());
break;
case Int32:
case UInt16:
protoValueBuilder.setIntValue((Integer) value.getValue());
break;
case Int64:
case UInt32:
protoValueBuilder.setLongValue((Long) value.getValue());
break;
case UInt64:
protoValueBuilder.setLongValue(((BigInteger) value.getValue()).longValue());
break;
case Float:
protoValueBuilder.setFloatValue((Float) value.getValue());
break;
case Double:
protoValueBuilder.setDoubleValue((Double) value.getValue());
break;
case String:
case Text:
if (value.getValue() != null) {
protoValueBuilder.setStringValue((String) value.getValue());
} else {
logger.warn("String value for dataset is null");
protoValueBuilder.setStringValue("null");
}
break;
case Boolean:
protoValueBuilder.setBooleanValue(toBoolean(value.getValue()));
break;
case DateTime:
try {
protoValueBuilder.setLongValue(((Date) value.getValue()).getTime());
} catch (NullPointerException npe) {
// FIXME - remove after is_null is supported for dataset values
logger.debug("Date in dataset was null - leaving it -9223372036854775808L");
protoValueBuilder.setLongValue(-9223372036854775808L);
}
break;
default:
logger.error("Unknown DataType: " + value.getType());
throw new Exception("Failed to convert value " + value.getType());
}
return protoValueBuilder;
}
private Boolean toBoolean(Object value) {
if (value == null) {
return null;
}
if (value instanceof Integer) {
return ((Integer)value).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Long) {
return ((Long)value).longValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Float) {
return ((Float)value).floatValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Double) {
return ((Double)value).doubleValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Short) {
return ((Short)value).shortValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof Byte) {
return ((Byte)value).byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
} else if (value instanceof String) {
return Boolean.parseBoolean(value.toString());
}
return (Boolean)value;
}
}

17414
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java

File diff suppressed because it is too large

7
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
/**
* An enumeration of Sparkplug MQTT message types. The type provides an indication as to what the MQTT Payload of
* message will contain.
@ -76,13 +79,13 @@ public enum SparkplugMessageType {
*/
NRECORD;
public static SparkplugMessageType parseMessageType(String type) throws Exception {
public static SparkplugMessageType parseMessageType(String type) throws ThingsboardException {
for (SparkplugMessageType messageType : SparkplugMessageType.values()) {
if (messageType.name().equals(type)) {
return messageType;
}
}
throw new Exception("Invalid message type: " + type);
throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
public boolean isDeath() {

37
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadDecoder.java

@ -1,37 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* Created by nickAS21 on 13.12.22
*/
/**
* An interface for decoding payloads.
*
* @param <P> the type of payload.
*/
public interface SparkplugPayloadDecoder <P> {
/**
* Builds a payload from a supplied byte array.
*
* @param bytes the bytes representing the payload
* @return a payload object built from the byte array
* @throws Exception
*/
public P buildFromByteArray(byte[] bytes) throws Exception;
}

39
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadEncoder.java

@ -1,39 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
/**
* Created by nickAS21 on 13.12.22
*/
import java.io.IOException;
/**
* An interface for decoding payloads.
*
* @param <P> the type of payload.
*/
public interface SparkplugPayloadEncoder<P> {
/**
* Converts a payload object into a byte array.
*
* @param payload a payload object
* @return the byte array representing the payload
* @throws IOException
*/
public byte[] getBytes(P payload) throws IOException;
}

3
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugTopic.java → common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java

@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
/**
* Created by nickAS21 on 12.12.22

13
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java

@ -17,7 +17,8 @@ package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugTopic;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import java.util.HashMap;
import java.util.Map;
@ -56,9 +57,9 @@ public class SparkplugTopicUtil {
*
* @param topic a topic string
* @return a {@link SparkplugTopic} instance
* @throws Exception if an error occurs while parsing
* @throws ThingsboardException if an error occurs while parsing
*/
public static SparkplugTopic parseTopic(String topic) throws Exception {
public static SparkplugTopic parseTopic(String topic) throws ThingsboardException {
topic = topic.indexOf("#") > 0 ? topic.substring(0, topic.indexOf("#")) : topic;
return parseTopic(SparkplugTopicUtil.getSplitTopic(topic));
}
@ -71,13 +72,13 @@ public class SparkplugTopicUtil {
* @throws Exception if an error occurs while parsing
*/
@SuppressWarnings("incomplete-switch")
public static SparkplugTopic parseTopic(String[] splitTopic) throws Exception {
public static SparkplugTopic parseTopic(String[] splitTopic) throws ThingsboardException {
SparkplugMessageType type;
String namespace, edgeNodeId, groupId;
int length = splitTopic.length;
if (length < 4 || length > 5) {
throw new Exception("Invalid number of topic elements: " + length);
throw new ThingsboardException("Invalid number of topic elements: " + length, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
namespace = splitTopic[0];
@ -108,6 +109,6 @@ public class SparkplugTopicUtil {
return new SparkplugTopic(namespace, groupId, edgeNodeId, splitTopic[4], type);
}
}
throw new Exception("Invalid number of topic elements " + length + " for topic type " + type);
throw new ThingsboardException("Invalid number of topic elements " + length + " for topic type " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}

129
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DataSetDeserializer.java

@ -1,129 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* A JSON deserializer for {@link DataSet} instances.
*/
public class DataSetDeserializer extends StdDeserializer<DataSet> {
private static Logger logger = LogManager.getLogger(DataSetDeserializer.class.getName());
private static final String FIELD_SIZE = "numberOfColumns";
private static final String FIELD_TYPES = "types";
private static final String FIELD_NAMES = "columnNames";
private static final String FIELD_ROWS = "rows";
/**
* Constructor.
*
* @param clazz
*/
protected DataSetDeserializer(Class<DataSet> clazz) {
super(clazz);
}
@Override
public DataSet deserialize(JsonParser parser, DeserializationContext context)
throws IOException, JsonProcessingException {
JsonNode node = parser.getCodec().readTree(parser);
long size = (Long) node.get(FIELD_SIZE).numberValue();
DataSet.DataSetBuilder builder = new DataSet.DataSetBuilder(size);
JsonNode namesNode = node.get(FIELD_NAMES);
if (namesNode.isArray()) {
for (JsonNode nameNode : namesNode) {
builder.addColumnName(nameNode.textValue());
}
}
JsonNode typesNode = node.get(FIELD_TYPES);
List<DataSetDataType> typesList = new ArrayList<DataSetDataType>();
if (typesNode.isArray()) {
for (JsonNode typeNode : typesNode) {
typesList.add(DataSetDataType.valueOf(typeNode.textValue()));
}
builder.addTypes(typesList);
}
JsonNode rowsNode = node.get(FIELD_ROWS);
if (rowsNode.isArray()) {
for (JsonNode rowNode : rowsNode) {
List<SparkplugValue<?>> values = new ArrayList<SparkplugValue<?>>();
for (int i = 0; i < size; i++) {
JsonNode value = rowNode.get(i);
DataSetDataType type = typesList.get(i);
values.add(getValueFromNode(value, type));
}
builder.addRow(new Row(values));
}
}
try {
return builder.createDataSet();
} catch (Exception e) {
logger.error("Error deserializing DataSet ", e);
}
return null;
}
/*
* Creates and returns a Value instance
*/
private SparkplugValue<?> getValueFromNode(JsonNode nodeValue, DataSetDataType type) {
switch (type) {
case Boolean:
return new SparkplugValue<Boolean>(type, (boolean)nodeValue.asBoolean());
case DateTime:
return new SparkplugValue<Date>(type, new Date(nodeValue.asLong()));
case Double:
return new SparkplugValue<Double>(type, nodeValue.asDouble());
case Float:
return new SparkplugValue<Float>(type, (float)nodeValue.asDouble());
case Int16:
case UInt8:
return new SparkplugValue<Byte>(type, (byte)nodeValue.asInt());
case UInt16:
case Int32:
return new SparkplugValue<Integer>(type, nodeValue.asInt());
case UInt32:
case Int64:
return new SparkplugValue<Long>(type, (long)nodeValue.asLong());
case Text:
case String:
return new SparkplugValue<String>(type, nodeValue.asText());
case UInt64:
return new SparkplugValue<BigInteger>(type, BigInteger.valueOf(nodeValue.asLong()));
case Unknown:
default:
return null;
}
}
}

38
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModifier.java

@ -1,38 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
/**
*
*/
public class DeserializerModifier extends BeanDeserializerModifier {
@Override
public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer<?> deserializer) {
if (Metric.class.equals(beanDesc.getBeanClass())) {
return new MetricDeserializer(deserializer);
}
return super.modifyDeserializer(config, beanDesc, deserializer);
}
}

39
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModule.java

@ -1,39 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
/**
* Used to register the {@link DeserializerModifier} instance.
*/
public class DeserializerModule extends SimpleModule {
private BeanDeserializerModifier deserializerModifier;
public DeserializerModule(BeanDeserializerModifier deserializerModifier) {
super("DeserializerModule", Version.unknownVersion());
this.deserializerModifier = deserializerModifier;
}
@Override
public void setupModule(SetupContext context) {
super.setupModule(context);
context.addBeanDeserializerModifier(deserializerModifier);
}
}

52
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/FileSerializer.java

@ -1,52 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.eclipse.californium.elements.util.Base64;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import java.io.IOException;
/**
* Serializes a {@link File} instance.
*/
public class FileSerializer extends StdSerializer<File> {
/**
* Constructor.
*/
protected FileSerializer() {
super(File.class);
}
/**
* Constructor.
*
* @param clazz class.
*/
protected FileSerializer(Class<File> clazz) {
super(clazz);
}
@Override
public void serialize(File value, JsonGenerator generator, SerializerProvider provider) throws IOException {
generator.writeString(Base64.encodeBytes(value.getBytes()));
}
}

70
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/MetricDeserializer.java

@ -1,70 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.json;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetricDataType;
import java.io.IOException;
import java.util.Base64;
/**
* A custom JSON deserializer for {@link Metric} instances.
*/
public class MetricDeserializer extends StdDeserializer<Metric> implements ResolvableDeserializer {
private final JsonDeserializer<?> defaultDeserializer;
/**
* Constructor.
*/
protected MetricDeserializer(JsonDeserializer<?> defaultDeserializer) {
super(Metric.class);
this.defaultDeserializer = defaultDeserializer;
}
@Override
public Metric deserialize(JsonParser parser, DeserializationContext ctxt) throws IOException, JsonProcessingException {
Metric metric = (Metric) defaultDeserializer.deserialize(parser, ctxt);
System.out.println(metric);
// Check if the data type is a File
if (metric.getDataType().equals(MetricDataType.File)) {
// Perform the custom logic for File types by building up the File object.
MetaData metaData = metric.getMetaData();
String fileName = metaData == null ? null : metaData.getFileName();
File file = new File(fileName, Base64.getDecoder().decode((String)metric.getValue()));
metric.setValue(file);
}
return metric;
}
@Override
public void resolve(DeserializationContext ctxt) throws JsonMappingException {
((ResolvableDeserializer) defaultDeserializer).resolve(ctxt);
}
}

234
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSet.java

@ -1,234 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.transport.mqtt.util.sparkplug.json.DataSetDeserializer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A data set that represents a table of data.
*/
@Slf4j
@JsonDeserialize(using = DataSetDeserializer.class)
public class DataSet {
/**
* The number of columns
*/
@JsonProperty("numberOfColumns")
private long numOfColumns;
/**
* A list containing the names of each column
*/
@JsonProperty("columnNames")
private List<String> columnNames;
/**
* A list containing the data types of each column
*/
@JsonProperty("types")
private List<DataSetDataType> types;
/**
* A list containing the rows in the data set
*/
private List<Row> rows;
public DataSet(long numOfColumns, List<String> columnNames, List<DataSetDataType> types, List<Row> rows) {
this.numOfColumns = numOfColumns;
this.columnNames = columnNames;
this.types = types;
this.rows = rows;
}
public long getNumOfColumns() {
return numOfColumns;
}
public void setNumOfColumns(long numOfColumns) {
this.numOfColumns = numOfColumns;
}
public List<String> getColumnNames() {
return columnNames;
}
public void setColumnNames(List<String> columnNames) {
this.columnNames = columnNames;
}
public void addColumnName(String columnName) {
this.columnNames.add(columnName);
}
public List<Row> getRows() {
return rows;
}
@JsonGetter("rows")
public List<List<Object>> getRowsAsLists() {
List<List<Object>> list = new ArrayList<List<Object>>(getRows().size());
for (Row row : getRows()) {
list.add(Row.toValues(row));
}
return list;
}
public void addRow(Row row) {
this.rows.add(row);
}
public void addRow(int index, Row row) {
this.rows.add(index, row);
}
public Row removeRow(int index) {
return rows.remove(index);
}
public boolean removeRow(Row row) {
return rows.remove(row);
}
public void setRows(List<Row> rows) {
this.rows = rows;
}
public List<DataSetDataType> getTypes() {
return types;
}
public void setTypes(List<DataSetDataType> types) {
this.types = types;
}
public void addType(DataSetDataType type) {
this.types.add(type);
}
public void addType(int index, DataSetDataType type) {
this.types.add(index, type);
}
@Override
public String toString() {
return "DataSet [numOfColumns=" + numOfColumns + ", columnNames=" + columnNames + ", types=" + types + ", rows="
+ rows + "]";
}
/**
* A builder for creating a {@link DataSet} instance.
*/
public static class DataSetBuilder {
private long numOfColumns;
private List<String> columnNames;
private List<DataSetDataType> types;
private List<Row> rows;
public DataSetBuilder(long numOfColumns) {
this.numOfColumns = numOfColumns;
this.columnNames = new ArrayList<String>();
this.types = new ArrayList<DataSetDataType>();
this.rows = new ArrayList<Row>();
}
public DataSetBuilder(DataSet dataSet) {
this.numOfColumns = dataSet.getNumOfColumns();
this.columnNames = new ArrayList<String>(dataSet.getColumnNames());
this.types = new ArrayList<DataSetDataType>(dataSet.getTypes());
this.rows = new ArrayList<Row>(dataSet.getRows().size());
for (Row row : dataSet.getRows()) {
rows.add(new Row.RowBuilder(row).createRow());
}
}
public DataSetBuilder addColumnNames(Collection<String> columnNames) {
this.columnNames.addAll(columnNames);
return this;
}
public DataSetBuilder addColumnName(String columnName) {
this.columnNames.add(columnName);
return this;
}
public DataSetBuilder addType(DataSetDataType type) {
this.types.add(type);
return this;
}
public DataSetBuilder addTypes(Collection<DataSetDataType> types) {
this.types.addAll(types);
return this;
}
public DataSetBuilder addRow(Row row) {
this.rows.add(row);
return this;
}
public DataSetBuilder addRows(Collection<Row> rows) {
this.rows.addAll(rows);
return this;
}
public DataSet createDataSet() throws Exception {
log.trace("Number of columns: " + numOfColumns);
for (String columnName : columnNames) {
log.trace("\tcolumnName: " + columnName);
}
for (DataSetDataType type : types) {
log.trace("\ttypes: " + type);
}
for (Row row : rows) {
log.trace("\t\trow: " + row);
}
validate();
return new DataSet(numOfColumns, columnNames, types, rows);
}
public void validate() throws Exception {
if (columnNames.size() != numOfColumns) {
throw new Exception("Invalid number of columns in data set column names: " +
columnNames.size() + " vs expected " + numOfColumns);
}
if (types.size() != numOfColumns) {
throw new Exception("Invalid number of columns in data set types: " +
types.size() + " vs expected: " + numOfColumns);
}
for (int i = 0; i < types.size(); i++) {
for (Row row : rows) {
List<SparkplugValue<?>> values = row.getValues();
if (values.size() != numOfColumns) {
throw new Exception("Invalid number of columns in data set row: " +
values.size() + " vs expected: " + numOfColumns);
}
types.get(i).checkType(row.getValues().get(i).getValue());
}
}
}
}
}

117
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSetDataType.java

@ -1,117 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import java.math.BigInteger;
import java.util.Date;
/**
* A enumeration of data types of values in a {@link DataSet}
*/
public enum DataSetDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Unknown
Unknown(0, Object.class);
private Class<?> clazz = null;
private int intValue = 0;
private DataSetDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
throw new Exception(value.getClass().getName());
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link DataSetDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link DataSetDataType} instance.
*/
public static DataSetDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

64
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/File.java

@ -1,64 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import org.thingsboard.server.transport.mqtt.util.sparkplug.json.FileSerializer;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Arrays;
@JsonIgnoreProperties(value = { "fileName" })
@JsonSerialize(using = FileSerializer.class)
public class File {
private String fileName;
private byte[] bytes;
public File() {
super();
}
public File(String fileName, byte[] bytes) {
super();
this.fileName = fileName == null
? null
: fileName.replace("/", System.getProperty("file.separator"))
.replace("\\", System.getProperty("file.separator"));
this.bytes = Arrays.copyOf(bytes, bytes.length);
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
@Override
public String toString() {
return "File [fileName=" + fileName + ", bytes length=" + bytes.length + "]";
}
}

267
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetaData.java

@ -1,267 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import java.util.Objects;
/**
* A class to represent the meta data associated with a metric.
*/
@JsonInclude(Include.NON_NULL)
public class MetaData {
/**
* Indicates if the metric represents one of multiple parts.
*/
private Boolean isMultiPart;
/**
* A content type associated with the metric.
*/
private String contentType;
/**
* A size associated with the metric.
*/
private Long size;
/**
* A sequence associated with the metric.
*/
private Long seq;
/**
* A file name associated with the metric.
*/
private String fileName;
/**
* A file type associated with the metric.
*/
private String fileType;
/**
* A MD5 sum associated with the metric.
*/
private String md5;
/**
* A description associated with the metric.
*/
private String description;
/**
* Default no-arg constructor.
*/
public MetaData() {}
/**
* Constructor with fields.
*
* @param isMultiPart if the metric represents one of multiple parts.
* @param contentType a content type associated with the metric.
* @param size a size associated with the metric.
* @param seq a sequence associated with the metric.
* @param fileName a file name associated with the metric.
* @param fileType a file type associated with the metric.
* @param md5 a MD5 sum associated with the metric.
* @param description a description associated with the metric
*/
public MetaData(Boolean isMultiPart, String contentType, Long size, Long seq, String fileName,
String fileType, String md5, String description) {
this.isMultiPart = isMultiPart;
this.contentType = contentType;
this.size = size;
this.seq = seq;
this.fileName = fileName;
this.fileType = fileType;
this.md5 = md5;
this.description = description;
}
public Boolean isMultiPart() {
return isMultiPart;
}
public MetaData setMultiPart(Boolean isMultiPart) {
this.isMultiPart = isMultiPart;
return this;
}
public String getContentType() {
return contentType;
}
public MetaData setContentType(String contentType) {
this.contentType = contentType;
return this;
}
public Long getSize() {
return size;
}
public MetaData setSize(Long size) {
this.size = size;
return this;
}
public Long getSeq() {
return seq;
}
public MetaData setSeq(Long seq) {
this.seq = seq;
return this;
}
public String getFileName() {
return fileName;
}
public MetaData setFileName(String fileName) {
this.fileName = fileName;
return this;
}
public String getFileType() {
return fileType;
}
public MetaData setFileType(String fileType) {
this.fileType = fileType;
return this;
}
public String getMd5() {
return md5;
}
public MetaData setMd5(String md5) {
this.md5 = md5;
return this;
}
public String getDescription() {
return description;
}
public MetaData setDescription(String description) {
this.description = description;
return this;
}
@Override
public String toString() {
return "MetaData [isMultiPart=" + isMultiPart + ", contentType=" + contentType + ", size=" + size + ", seq="
+ seq + ", fileName=" + fileName + ", fileType=" + fileType + ", md5=" + md5 + ", description="
+ description + "]";
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || this.getClass() != object.getClass()) {
return false;
}
MetaData meta = (MetaData) object;
return Objects.equals(isMultiPart, meta.isMultiPart())
&& Objects.equals(contentType, meta.getContentType())
&& Objects.equals(size, meta.getSize())
&& Objects.equals(seq, meta.getSeq())
&& Objects.equals(fileName, meta.getFileName())
&& Objects.equals(fileType, meta.getFileType())
&& Objects.equals(md5, meta.getMd5())
&& Objects.equals(description, meta.getDescription());
}
/**
* A Builder for a MetaData instance.
*/
public static class MetaDataBuilder {
private Boolean isMultiPart;
private String contentType;
private Long size;
private Long seq;
private String fileName;
private String fileType;
private String md5;
private String description;
public MetaDataBuilder() {};
public MetaDataBuilder(MetaData metaData) {
this.isMultiPart = metaData.isMultiPart();
this.contentType = metaData.getContentType();
this.size = metaData.getSize();
this.seq = metaData.getSeq();
this.fileName = metaData.getFileName();
this.fileType = metaData.getFileType();
this.md5 = metaData.getMd5();
this.description = metaData.getDescription();
}
public MetaDataBuilder multiPart(Boolean isMultiPart) {
this.isMultiPart = isMultiPart;
return this;
}
public MetaDataBuilder contentType(String contentType) {
this.contentType = contentType;
return this;
}
public MetaDataBuilder size(Long size) {
this.size = size;
return this;
}
public MetaDataBuilder seq(Long seq) {
this.seq = seq;
return this;
}
public MetaDataBuilder fileName(String fileName) {
this.fileName = fileName;
return this;
}
public MetaDataBuilder fileType(String fileType) {
this.fileType = fileType;
return this;
}
public MetaDataBuilder md5(String md5) {
this.md5 = md5;
return this;
}
public MetaDataBuilder description(String description) {
this.description = description;
return this;
}
public MetaData createMetaData() {
return new MetaData(isMultiPart, contentType, size, seq, fileName, fileType, md5, description);
}
}
}

320
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Metric.java

@ -1,320 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet.DataSetBuilder;
import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData.MetaDataBuilder;
import java.util.Date;
;
/**
* A metric of a Sparkplug Payload.
*/
@JsonIgnoreProperties(value = { "isNull" })
@JsonInclude(Include.NON_NULL)
public class Metric {
@JsonProperty("name")
private String name;
@JsonProperty("alias")
private Long alias;
@JsonProperty("timestamp")
private Date timestamp;
@JsonProperty("dataType")
private MetricDataType dataType;
@JsonProperty("isHistorical")
private Boolean isHistorical;
@JsonProperty("isTransient")
private Boolean isTransient;
@JsonProperty("metaData")
private MetaData metaData;
@JsonProperty("properties")
@JsonInclude(Include.NON_EMPTY)
private PropertySet properties;
@JsonProperty("value")
private Object value;
private Boolean isNull = null;
public Metric() {};
/**
* @param name
* @param alias
* @param timestamp
* @param dataType
* @param isHistorical
* @param isTransient
* @param metaData
* @param properties
* @param value
* @throws Exception
*/
public Metric(String name, Long alias, Date timestamp, MetricDataType dataType, Boolean isHistorical,
Boolean isTransient, MetaData metaData, PropertySet properties, Object value)
throws Exception {
super();
this.name = name;
this.alias = alias;
this.timestamp = timestamp;
this.dataType = dataType;
this.isHistorical = isHistorical;
this.isTransient = isTransient;
isNull = (value == null) ? true : false;
this.metaData = metaData;
this.properties = properties;
this.value = value;
this.dataType.checkType(value);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean hasName() {
return !(name == null);
}
public boolean hasAlias() {
return !(alias == null);
}
public Long getAlias() {
return alias;
}
public void setAlias(long alias) {
this.alias = alias;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public MetricDataType getDataType() {
return dataType;
}
public void setDataType(MetricDataType dataType) {
this.dataType = dataType;
}
@JsonGetter("metaData")
public MetaData getMetaData() {
return metaData;
}
@JsonSetter("metaData")
public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
isNull = (value == null);
}
public PropertySet getProperties() {
return this.properties;
}
public void setProperties(PropertySet properties) {
this.properties = properties;
}
@JsonIgnore
public Boolean isHistorical() {
return isHistorical == null ? false : isHistorical;
}
@JsonGetter("isHistorical")
public Boolean getIsHistorical() {
return isHistorical;
}
@JsonSetter("isHistorical")
public void setHistorical(Boolean isHistorical) {
this.isHistorical = isHistorical;
}
@JsonIgnore
public Boolean isTransient() {
return isTransient == null ? false : isTransient;
}
@JsonGetter("isTransient")
public Boolean getIsTransient() {
return isTransient;
}
@JsonSetter("isTransient")
public void setTransient(Boolean isTransient) {
this.isTransient = isTransient;
}
@JsonIgnore
public Boolean isNull() {
return isNull == null ? false : isNull;
}
@JsonIgnore
public Boolean getIsNull() {
return isNull;
}
@Override
public String toString() {
return "Metric [name=" + name + ", alias=" + alias + ", timestamp=" + timestamp + ", dataType=" + dataType
+ ", isHistorical=" + isHistorical + ", isTransient=" + isTransient + ", isNull=" + isNull
+ ", metaData=" + metaData + ", propertySet=" + properties + ", value=" + value + "]";
}
/**
* A builder for creating a {@link Metric} instance.
*/
public static class MetricBuilder {
private String name;
private Long alias;
private Date timestamp;
private MetricDataType dataType;
private Boolean isHistorical;
private Boolean isTransient;
private MetaData metaData = null;
private PropertySet properties = null;
private Object value;
public MetricBuilder(String name, MetricDataType dataType, Object value) {
this.name = name;
this.timestamp = new Date();
this.dataType = dataType;
this.value = value;
}
public MetricBuilder(Long alias, MetricDataType dataType, Object value) {
this.alias = alias;
this.timestamp = new Date();
this.dataType = dataType;
this.value = value;
}
public MetricBuilder(Metric metric) throws Exception {
this.name = metric.getName();
this.alias = metric.getAlias();
this.timestamp = metric.getTimestamp();
this.dataType = metric.getDataType();
this.isHistorical = metric.isHistorical();
this.isTransient = metric.isTransient();
this.metaData = metric.getMetaData() != null
? new MetaDataBuilder(metric.getMetaData()).createMetaData() : null;
this.properties = metric.getMetaData() != null
? new PropertySet.PropertySetBuilder(metric.getProperties()).createPropertySet() : null;
switch (dataType) {
case DataSet:
this.value = metric.getValue() != null
? new DataSetBuilder((DataSet) metric.getValue()).createDataSet()
: null;
break;
case Template:
this.value = metric.getValue() != null
? new Template.TemplateBuilder((Template) metric.getValue()).createTemplate()
: null;
break;
default:
this.value = metric.getValue();
}
}
public MetricBuilder name(String name) {
this.name = name;
return this;
}
public MetricBuilder alias(Long alias) {
this.alias = alias;
return this;
}
public MetricBuilder timestamp(Date timestamp) {
this.timestamp = timestamp;
return this;
}
public MetricBuilder dataType(MetricDataType dataType) {
this.dataType = dataType;
return this;
}
public MetricBuilder isHistorical(Boolean isHistorical) {
this.isHistorical = isHistorical;
return this;
}
public MetricBuilder isTransient(Boolean isTransient) {
this.isTransient = isTransient;
return this;
}
public MetricBuilder metaData(MetaData metaData) {
this.metaData = metaData;
return this;
}
public MetricBuilder properties(PropertySet properties) {
this.properties = properties;
return this;
}
public MetricBuilder value(Object value) {
this.value = value;
return this;
}
public Metric createMetric() throws Exception {
return new Metric(name, alias, timestamp, dataType, isHistorical, isTransient, metaData,
properties, value);
}
}
}

138
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetricDataType.java

@ -1,138 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import lombok.extern.slf4j.Slf4j;
import java.math.BigInteger;
import java.util.Date;
/**
* An enumeration of data types associated with the value of a {@link Metric}
*/
@Slf4j
public enum MetricDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Custom Types for Metrics
UUID(15, String.class),
DataSet(16, DataSet.class),
Bytes(17, byte[].class),
File(18, File.class),
Template(19, Template.class),
// Unknown
Unknown(0, Object.class);
private Class<?> clazz = null;
private int intValue = 0;
private MetricDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
log.warn("Failed type check - " + clazz + " != " + ((value != null) ? value.getClass().toString() : "null"));
throw new Exception(value.getClass().getName());
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link MetricDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link MetricDataType} instance.
*/
public static MetricDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
case 15:
return UUID;
case 16:
return DataSet;
case 17:
return Bytes;
case 18:
return File;
case 19:
return Template;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

107
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Parameter.java

@ -1,107 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import java.util.Objects;
/**
* A class to represent a parameter associated with a template.
*/
public class Parameter {
/**
* The name of the parameter
*/
@JsonProperty("name")
private String name;
/**
* The data type of the parameter
*/
@JsonProperty("type")
private ParameterDataType type;
/**
* The value of the parameter
*/
@JsonProperty("value")
private Object value;
/**
* Constructs a Parameter instance.
*
* @param name The name of the parameter.
* @param type The type of the parameter.
* @param value The value of the parameter.
* @throws Exception
*/
public Parameter(String name, ParameterDataType type, Object value) throws Exception {
this.name = name;
this.type = type;
this.value = value;
this.type.checkType(value);
}
@JsonGetter("name")
public String getName() {
return name;
}
@JsonSetter("name")
public void setName(String name) {
this.name = name;
}
public ParameterDataType getType() {
return type;
}
public void setType(ParameterDataType type) {
this.type = type;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || this.getClass() != object.getClass()) {
return false;
}
Parameter param = (Parameter) object;
return Objects.equals(name, param.getName())
&& Objects.equals(type, param.getType())
&& Objects.equals(value, param.getValue());
}
@Override
public String toString() {
return "Parameter [name=" + name + ", type=" + type + ", value=" + value + "]";
}
}

125
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/ParameterDataType.java

@ -1,125 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.math.BigInteger;
import java.util.Date;
/**
* An enumeration of data types for the value of a {@link Parameter} for a {@link Template}
*/
@Slf4j
public enum ParameterDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Unknown
Unknown(0, Object.class);
private static Logger logger = LogManager.getLogger(ParameterDataType.class.getName());
private Class<?> clazz = null;
private int intValue = 0;
private ParameterDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
log.warn("Failed type check - " + clazz + " != " + value.getClass().toString());
throw new Exception(value.getClass().getName());
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link ParameterDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link ParameterDataType} instance.
*/
public static ParameterDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

134
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyDataType.java

@ -1,134 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import lombok.extern.slf4j.Slf4j;
import java.math.BigInteger;
import java.util.Date;
import java.util.List;
/**
* An enumeration of data types for values of a {@link PropertySet}
*/
@Slf4j
public enum PropertyDataType {
// Basic Types
Int8(1, Byte.class),
Int16(2, Short.class),
Int32(3, Integer.class),
Int64(4, Long.class),
UInt8(5, Short.class),
UInt16(6, Integer.class),
UInt32(7, Long.class),
UInt64(8, BigInteger.class),
Float(9, Float.class),
Double(10, Double.class),
Boolean(11, Boolean.class),
String(12, String.class),
DateTime(13, Date.class),
Text(14, String.class),
// Custom Types for PropertySets
PropertySet(20, PropertySet.class),
PropertySetList(21, List.class),
// Unknown
Unknown(0, Object.class);
private Class<?> clazz = null;
private int intValue = 0;
private PropertyDataType(int intValue, Class<?> clazz) {
this.intValue = intValue;
this.clazz = clazz;
}
public void checkType(Object value) throws Exception {
if (value != null && !value.getClass().equals(clazz)) {
if(clazz == List.class && value instanceof List) {
// Allow List subclasses
} else {
log.warn("Failed type check - " + clazz + " != " + value.getClass().toString());
throw new Exception(value.getClass().getName());
}
}
}
/**
* Returns an integer representation of the data type.
*
* @return an integer representation of the data type.
*/
public int toIntValue() {
return this.intValue;
}
/**
* Converts the integer representation of the data type into a {@link PropertyDataType} instance.
*
* @param i the integer representation of the data type.
* @return a {@link PropertyDataType} instance.
*/
public static PropertyDataType fromInteger(int i) {
switch(i) {
case 1:
return Int8;
case 2:
return Int16;
case 3:
return Int32;
case 4:
return Int64;
case 5:
return UInt8;
case 6:
return UInt16;
case 7:
return UInt32;
case 8:
return UInt64;
case 9:
return Float;
case 10:
return Double;
case 11:
return Boolean;
case 12:
return String;
case 13:
return DateTime;
case 14:
return Text;
case 20:
return PropertySet;
case 21:
return PropertySetList;
default:
return Unknown;
}
}
/**
* Returns the class type for this DataType
*
* @return the class type for this DataType
*/
public Class<?> getClazz() {
return clazz;
}
}

169
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertySet.java

@ -1,169 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* A class that maintains a set of properties associated with a {@link Metric}.
*/
public class PropertySet implements Map<String, PropertyValue> {
@JsonIgnore
private Map<String, PropertyValue> map;
public PropertySet() {
this.map = new HashMap<String, PropertyValue>();
}
private PropertySet(Map<String, PropertyValue> propertyMap) {
this.map = propertyMap;
}
@JsonIgnore
public PropertyValue getPropertyValue(String name) {
return this.map.get(name);
}
@JsonIgnore
public void setProperty(String name, PropertyValue value) {
this.map.put(name, value);
}
@JsonIgnore
public void removeProperty(String name) {
this.map.remove(name);
}
@JsonIgnore
public void clear() {
this.map.clear();
}
@JsonIgnore
public Set<String> getNames() {
return map.keySet();
}
@JsonIgnore
public Collection<PropertyValue> getValues() {
return map.values();
}
@JsonIgnore
public Map<String, PropertyValue> getPropertyMap() {
return map;
}
@Override
public String toString() {
return "PropertySet [propertyMap=" + map + "]";
}
@Override
public int size() {
return map.size();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return map.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return map.containsValue(value);
}
@Override
public PropertyValue get(Object key) {
return map.get(key);
}
@Override
public PropertyValue put(String key, PropertyValue value) {
return map.put(key, value);
}
@Override
public PropertyValue remove(Object key) {
return map.remove(key);
}
@Override
public void putAll(Map<? extends String, ? extends PropertyValue> m) {
map.putAll(m);
}
@Override
public Set<String> keySet() {
return map.keySet();
}
@Override
public Collection<PropertyValue> values() {
return map.values();
}
@Override
public Set<Entry<String, PropertyValue>> entrySet() {
return map.entrySet();
}
/**
* A builder for a PropertySet instance
*/
public static class PropertySetBuilder {
private Map<String, PropertyValue> propertyMap;
public PropertySetBuilder() {
this.propertyMap = new HashMap<String, PropertyValue>();
}
public PropertySetBuilder(PropertySet propertySet) throws Exception {
this.propertyMap = new HashMap<String, PropertyValue>();
for (String name : propertySet.getNames()) {
PropertyValue value = propertySet.getPropertyValue(name);
propertyMap.put(name, new PropertyValue(value.getType(), value.getValue()));
}
}
public PropertySetBuilder addProperty(String name, PropertyValue value) {
this.propertyMap.put(name, value);
return this;
}
public PropertySetBuilder addProperties(Map<String, PropertyValue> properties) {
this.propertyMap.putAll(properties);
return this;
}
public PropertySet createPropertySet() {
return new PropertySet(propertyMap);
}
}
}

86
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyValue.java

@ -1,86 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Objects;
/**
* The value of a property in a {@link PropertySet}.
*/
public class PropertyValue {
private PropertyDataType type;
private Object value;
private Boolean isNull = null;
public PropertyValue() {}
/**
* A constructor.
*
* @param type the property type
* @param value the property value
* @throws Exception
*/
public PropertyValue(PropertyDataType type, Object value) throws Exception {
this.type = type;
this.value = value;
isNull = (value == null) ? true : false;
type.checkType(value);
}
public PropertyDataType getType() {
return type;
}
public void setType(PropertyDataType type) {
this.type = type;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
isNull = (value == null) ? true : false;
}
@JsonIgnore
public Boolean isNull() {
return isNull;
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || this.getClass() != object.getClass()) {
return false;
}
PropertyValue propValue = (PropertyValue) object;
return Objects.equals(type, propValue.getType())
&& Objects.equals(value, propValue.getValue());
}
@Override
public String toString() {
return "PropertyValue [type=" + type + ", value=" + value + ", isNull=" + isNull + "]";
}
}

93
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Row.java

@ -1,93 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A class for representing a row of a data set.
*/
public class Row {
private List<SparkplugValue<?>> values;
public Row(List<SparkplugValue<?>> values) {
this.values = values;
}
public List<SparkplugValue<?>> getValues() {
return values;
}
public void setValues(List<SparkplugValue<?>> values) {
this.values = values;
}
public void addValue(SparkplugValue<?> value) {
this.values.add(value);
}
@Override
public String toString() {
return "Row [values=" + values + "]";
}
/**
* Converts a {@link Row} instance to a {@link List} of Objects representing the values.
*
* @param row a {@link Row} instance.
* @return a {@link List} of Objects.
*/
public static List<Object> toValues(Row row) {
List<Object> list = new ArrayList<Object>(row.getValues().size());
for (SparkplugValue<?> value : row.getValues()) {
list.add(value.getValue());
}
return list;
}
/**
* A builder for creating a {@link Row} instance.
*/
public static class RowBuilder {
private List<SparkplugValue<?>> values;
public RowBuilder() {
this.values = new ArrayList<SparkplugValue<?>>();
}
public RowBuilder(Row row) {
this.values = new ArrayList<SparkplugValue<?>>(row.getValues());
}
public RowBuilder addValue(SparkplugValue<?> value) {
this.values.add(value);
return this;
}
public RowBuilder addValues(Collection<SparkplugValue<?>> values) {
this.values.addAll(values);
return this;
}
public Row createRow() {
return new Row(values);
}
}
}

53
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugValue.java

@ -1,53 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
public class SparkplugValue<V> {
private DataSetDataType type;
private V value;
public SparkplugValue() {
super();
}
public SparkplugValue(DataSetDataType type, V value) {
super();
this.type = type;
this.value = value;
}
public DataSetDataType getType() {
return type;
}
public void setType(DataSetDataType type) {
this.type = type;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}
@Override
public String toString() {
return "Value [type=" + type + ", value=" + value + "]";
}
}

202
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Template.java

@ -1,202 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug.message;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A class representing a template associated with a metric
*/
@JsonInclude(Include.NON_NULL)
public class Template {
/**
* The Template version.
*/
@JsonProperty("version")
private String version;
/**
* The template reference
*/
@JsonProperty("reference")
private String templateRef;
/**
* True if the template is a definition, false otherwise.
*/
@JsonProperty("isDefinition")
private boolean isDefinition;
/**
* List of metrics.
*/
@JsonProperty("metrics")
private List<Metric> metrics;
/**
* List of parameters.
*/
@JsonProperty("parameters")
@JsonInclude(Include.NON_EMPTY)
private List<Parameter> parameters;
public Template() {}
public Template(String version, String templateRef, boolean isDefinition, List<Metric> metrics,
List<Parameter> parameters) {
this.version = version;
this.templateRef = templateRef;
this.isDefinition = isDefinition;
this.metrics = metrics;
this.parameters = parameters;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getTemplateRef() {
return templateRef;
}
public void setTemplateRef(String templateRef) {
this.templateRef = templateRef;
}
@JsonGetter("isDefinition")
public boolean isDefinition() {
return isDefinition;
}
@JsonSetter("isDefinition")
public void setDefinition(boolean isDefinition) {
this.isDefinition = isDefinition;
}
public List<Metric> getMetrics() {
return metrics;
}
public void setMetrics(List<Metric> metrics) {
this.metrics = metrics;
}
public void addMetric(Metric metric) {
this.metrics.add(metric);
}
public List<Parameter> getParameters() {
return parameters;
}
public void setParameters(List<Parameter> parameters) {
this.parameters = parameters;
}
public void addParameter(Parameter parameter) {
this.parameters.add(parameter);
}
@Override
public String toString() {
return "Template [version=" + version + ", templateRef=" + templateRef + ", isDefinition=" + isDefinition
+ ", metrics=" + metrics + ", parameters=" + parameters + "]";
}
/**
* A builder for creating a {@link Template} instance.
*/
public static class TemplateBuilder {
private String version;
private String templateRef;
private boolean isDefinition;
private List<Metric> metrics;
private List<Parameter> parameters;
public TemplateBuilder() {
super();
this.metrics = new ArrayList<Metric>();
this.parameters = new ArrayList<Parameter>();
}
public TemplateBuilder(Template template) throws Exception {
this.version = template.getVersion();
this.templateRef = template.getTemplateRef();
this.isDefinition = template.isDefinition();
this.metrics = new ArrayList<Metric>(template.getMetrics().size());
for (Metric metric : template.getMetrics()) {
this.metrics.add(new Metric.MetricBuilder(metric).createMetric());
}
this.parameters = new ArrayList<Parameter>(template.getParameters().size());
for (Parameter parameter : template.getParameters()) {
this.parameters.add(new Parameter(parameter.getName(), parameter.getType(), parameter.getValue()));
}
}
public TemplateBuilder version(String version) {
this.version = version;
return this;
}
public TemplateBuilder templateRef(String templateRef) {
this.templateRef = templateRef;
return this;
}
public TemplateBuilder definition(boolean isDefinition) {
this.isDefinition = isDefinition;
return this;
}
public TemplateBuilder addMetric(Metric metric) {
this.metrics.add(metric);
return this;
}
public TemplateBuilder addMetrics(Collection<Metric> metrics) {
this.metrics.addAll(metrics);
return this;
}
public TemplateBuilder addParameter(Parameter parameter) {
this.parameters.add(parameter);
return this;
}
public TemplateBuilder addParameters(Collection<Parameter> parameters) {
this.parameters.addAll(parameters);
return this;
}
public Template createTemplate() {
return new Template(version, templateRef, isDefinition, metrics, parameters);
}
}
}

210
common/transport/mqtt/src/main/proto/sparkplug.proto

@ -0,0 +1,210 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
//
// To compile:
// cd client_libraries/java
// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
//
//package com.cirruslink.sparkplug.protobuf;
import "google/protobuf/any.proto";
option java_package = "org.thingsboard.server.gen.transport.mqtt";
option java_outer_classname = "SparkplugBProto";
message Payload {
/*
// Indexes of Data Types
// Unknown placeholder for future expansion.
Unknown = 0;
// Basic Types
Int8 = 1;
Int16 = 2;
Int32 = 3;
Int64 = 4;
UInt8 = 5;
UInt16 = 6;
UInt32 = 7;
UInt64 = 8;
Float = 9;
Double = 10;
Boolean = 11;
String = 12;
DateTime = 13;
Text = 14;
// Additional Metric Types
UUID = 15;
DataSet = 16;
Bytes = 17;
File = 18;
Template = 19;
// Additional PropertyValue Types
PropertySet = 20;
PropertySetList = 21;
*/
message Template {
message Parameter {
optional string name = 1;
optional uint32 type = 2;
oneof value {
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
ParameterValueExtension extension_value = 9;
}
message ParameterValueExtension {
google.protobuf.Any extensions = 1;
}
}
optional string version = 1; // The version of the Template to prevent mismatches
repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
repeated Parameter parameters = 3;
optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
optional bool is_definition = 5;
google.protobuf.Any extensions = 6;
}
message DataSet {
message DataSetValue {
oneof value {
uint32 int_value = 1;
uint64 long_value = 2;
float float_value = 3;
double double_value = 4;
bool boolean_value = 5;
string string_value = 6;
DataSetValueExtension extension_value = 7;
}
message DataSetValueExtension {
google.protobuf.Any extensions = 1;
}
}
message Row {
repeated DataSetValue elements = 1;
google.protobuf.Any extensions = 2; // For third party extensions
}
optional uint64 num_of_columns = 1;
repeated string columns = 2;
repeated uint32 types = 3;
repeated Row rows = 4;
google.protobuf.Any extensions = 5; // For third party extensions
}
message PropertyValue {
optional uint32 type = 1;
optional bool is_null = 2;
oneof value {
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
PropertySet propertyset_value = 9;
PropertySetList propertysets_value = 10; // List of Property Values
PropertyValueExtension extension_value = 11;
}
message PropertyValueExtension {
google.protobuf.Any extensions = 1;
}
}
message PropertySet {
repeated string keys = 1; // Names of the properties
repeated PropertyValue values = 2;
google.protobuf.Any extensions = 3;
}
message PropertySetList {
repeated PropertySet propertyset = 1;
google.protobuf.Any extensions = 2;
}
message MetaData {
// Bytes specific metadata
optional bool is_multi_part = 1;
// General metadata
optional string content_type = 2; // Content/Media type
optional uint64 size = 3; // File size, String size, Multi-part size, etc
optional uint64 seq = 4; // Sequence number for multi-part messages
// File metadata
optional string file_name = 5; // File name
optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
optional string md5 = 7; // md5 of data
// Catchalls and future expansion
optional string description = 8; // Could be anything such as json or xml of custom properties
google.protobuf.Any extensions = 9;
}
message Metric {
optional string name = 1; // Metric name - should only be included on birth
optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
optional uint32 datatype = 4; // DataType of the metric/tag value
optional bool is_historical = 5; // If this is historical data and should not update real time tag
optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
optional MetaData metadata = 8; // Metadata for the payload
optional PropertySet properties = 9;
oneof value {
uint32 int_value = 10;
uint64 long_value = 11;
float float_value = 12;
double double_value = 13;
bool boolean_value = 14;
string string_value = 15;
bytes bytes_value = 16; // Bytes, File
DataSet dataset_value = 17;
Template template_value = 18;
MetricValueExtension extension_value = 19;
}
message MetricValueExtension {
google.protobuf.Any extensions = 1;
}
}
optional uint64 timestamp = 1; // Timestamp at message sending time
repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
optional uint64 seq = 3; // Sequence number
optional string uuid = 4; // UUID to track message type in terms of schema definitions
optional bytes body = 5; // To optionally bypass the whole definition above
google.protobuf.Any extensions = 6;
}

5
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -19,7 +19,6 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -32,7 +31,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceResponseMsg
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg;
@ -95,9 +93,6 @@ public interface TransportService {
void process(GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback);
void process(GetOrCreateDeviceFromSparkplugRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromSparkplugResponse> callback);
void process(ProvisionDeviceRequestMsg msg,
TransportServiceCallback<ProvisionDeviceResponseMsg> callback);

29
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/GetOrCreateDeviceFromSparkplugResponse.java

@ -1,29 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.DeviceProfile;
@Data
@Builder
public class GetOrCreateDeviceFromSparkplugResponse implements DeviceProfileAware {
private TransportDeviceInfo deviceInfo;
private DeviceProfile deviceProfile;
}

22
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -69,7 +69,6 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.TransportTenantProfileCache;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
@ -480,27 +479,6 @@ public class DefaultTransportService implements TransportService {
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromSparkplugResponse> callback) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceSparlplugRequestMsg(requestMsg).build());
log.trace("Processing msg: {}", requestMsg);
ListenableFuture<GetOrCreateDeviceFromSparkplugResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg msg = tmp.getValue().getGetOrCreateDeviceSparkResponseMsg();
GetOrCreateDeviceFromSparkplugResponse.GetOrCreateDeviceFromSparkplugResponseBuilder result = GetOrCreateDeviceFromSparkplugResponse.builder();
if (msg.hasDeviceInfo()) {
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (profileBody != null && !profileBody.isEmpty()) {
result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody));
}
}
return result.build();
}, MoreExecutors.directExecutor());
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TransportProtos.LwM2MRequestMsg msg, TransportServiceCallback<TransportProtos.LwM2MResponseMsg> callback) {
log.trace("Processing msg: {}", msg);

Loading…
Cancel
Save