From f6aa9e61252d50bc3abaa892c4d628ec21cda9cf Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 15 Dec 2022 18:44:25 +0200 Subject: [PATCH] sparklug: comments4 --- .../transport/DefaultTransportApiService.java | 100 +- common/cluster-api/src/main/proto/queue.proto | 14 - common/transport/mqtt/pom.xml | 13 + .../transport/mqtt/MqttTransportHandler.java | 183 +- .../mqtt/session/DeviceSessionCtx.java | 4 + .../session/SparkplugNodeSessionHandler.java | 137 +- .../util/sparkplug/SparkplugBPayload.java | 175 - .../sparkplug/SparkplugBPayloadDecoder.java | 371 - .../sparkplug/SparkplugBPayloadEncoder.java | 545 - .../mqtt/util/sparkplug/SparkplugBProto.java | 17414 ---------------- .../util/sparkplug/SparkplugMessageType.java | 7 +- .../sparkplug/SparkplugPayloadDecoder.java | 37 - .../sparkplug/SparkplugPayloadEncoder.java | 39 - .../{message => }/SparkplugTopic.java | 3 +- .../util/sparkplug/SparkplugTopicUtil.java | 13 +- .../sparkplug/json/DataSetDeserializer.java | 129 - .../sparkplug/json/DeserializerModifier.java | 38 - .../sparkplug/json/DeserializerModule.java | 39 - .../util/sparkplug/json/FileSerializer.java | 52 - .../sparkplug/json/MetricDeserializer.java | 70 - .../mqtt/util/sparkplug/message/DataSet.java | 234 - .../sparkplug/message/DataSetDataType.java | 117 - .../mqtt/util/sparkplug/message/File.java | 64 - .../mqtt/util/sparkplug/message/MetaData.java | 267 - .../mqtt/util/sparkplug/message/Metric.java | 320 - .../sparkplug/message/MetricDataType.java | 138 - .../util/sparkplug/message/Parameter.java | 107 - .../sparkplug/message/ParameterDataType.java | 125 - .../sparkplug/message/PropertyDataType.java | 134 - .../util/sparkplug/message/PropertySet.java | 169 - .../util/sparkplug/message/PropertyValue.java | 86 - .../mqtt/util/sparkplug/message/Row.java | 93 - .../sparkplug/message/SparkplugValue.java | 53 - .../mqtt/util/sparkplug/message/Template.java | 202 - .../mqtt/src/main/proto/sparkplug.proto | 210 + .../common/transport/TransportService.java | 5 - ...etOrCreateDeviceFromSparkplugResponse.java | 29 - .../service/DefaultTransportService.java | 22 - 38 files changed, 456 insertions(+), 21302 deletions(-) delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadDecoder.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugPayloadEncoder.java rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/{message => }/SparkplugTopic.java (96%) delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DataSetDeserializer.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModifier.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/DeserializerModule.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/FileSerializer.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/json/MetricDeserializer.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSet.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/DataSetDataType.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/File.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetaData.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Metric.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/MetricDataType.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Parameter.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/ParameterDataType.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyDataType.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertySet.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/PropertyValue.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Row.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/SparkplugValue.java delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/message/Template.java create mode 100644 common/transport/mqtt/src/main/proto/sparkplug.proto delete mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/GetOrCreateDeviceFromSparkplugResponse.java diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 11759dc14f..a1f9800b23 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -47,6 +47,7 @@ import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfigu import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.PowerMode; import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -161,8 +162,6 @@ public class DefaultTransportApiService implements TransportApiService { result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); - } else if (transportApiRequestMsg.hasGetOrCreateDeviceSparlplugRequestMsg()) { - result = handle(transportApiRequestMsg.getGetOrCreateDeviceSparlplugRequestMsg()); } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { result = handle(transportApiRequestMsg.getEntityProfileRequestMsg()); } else if (transportApiRequestMsg.hasLwM2MRequestMsg()) { @@ -283,6 +282,8 @@ public class DefaultTransportApiService implements TransportApiService { Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); deviceCreationLock.lock(); try { + DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); + boolean isSparkplug = ((MqttDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration()).isSparkPlug(); Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName()); if (device == null) { TenantId tenantId = gateway.getTenantId(); @@ -291,7 +292,7 @@ public class DefaultTransportApiService implements TransportApiService { device.setName(requestMsg.getDeviceName()); device.setType(requestMsg.getDeviceType()); device.setCustomerId(gateway.getCustomerId()); - DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); + device.setDeviceProfileId(deviceProfile.getId()); ObjectNode additionalInfo = JacksonUtil.newObjectNode(); additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); @@ -307,7 +308,8 @@ public class DefaultTransportApiService implements TransportApiService { if (customerId != null && !customerId.isNullUid()) { metaData.putValue("customerId", customerId.toString()); } - metaData.putValue("gatewayId", gatewayId.toString()); + String deviceIdStr = isSparkplug ? "sparkplugId" : "gatewayId"; + metaData.putValue(deviceIdStr, gatewayId.toString()); DeviceId deviceId = device.getId(); ObjectNode entityNode = mapper.valueToTree(device); @@ -318,23 +320,19 @@ public class DefaultTransportApiService implements TransportApiService { if (deviceAdditionalInfo == null) { deviceAdditionalInfo = JacksonUtil.newObjectNode(); } + String lastConnectedStr = isSparkplug ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY; if (deviceAdditionalInfo.isObject() && - (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY) - || !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) { + (!deviceAdditionalInfo.has(lastConnectedStr) + || !gatewayId.toString().equals(deviceAdditionalInfo.get(lastConnectedStr).asText()))) { ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; - newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); + newDeviceAdditionalInfo.put(lastConnectedStr, gatewayId.toString()); Device savedDevice = deviceService.saveDevice(device); tbClusterService.onDeviceUpdated(savedDevice, device); } } GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() .setDeviceInfo(getDeviceInfoProto(device)); - DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); - if (deviceProfile != null) { - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); - } else { - log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); - } + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); return TransportApiResponseMsg.newBuilder() .setGetOrCreateDeviceResponseMsg(builder.build()) .build(); @@ -347,82 +345,6 @@ public class DefaultTransportApiService implements TransportApiService { }, dbCallbackExecutorService); } - private ListenableFuture handle(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg requestMsg) { - DeviceId sparkplugNodeId = new DeviceId(new UUID(requestMsg.getSparkplugIdMSB(), requestMsg.getSparkplugIdLSB())); - ListenableFuture sparkplugNodeFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, sparkplugNodeId); - return Futures.transform(sparkplugNodeFuture, sparkplugNode -> { - Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); - deviceCreationLock.lock(); - try { - Device device = deviceService.findDeviceByTenantIdAndName(sparkplugNode.getTenantId(), requestMsg.getDeviceName()); - if (device == null) { - TenantId tenantId = sparkplugNode.getTenantId(); - device = new Device(); - device.setTenantId(tenantId); - device.setName(requestMsg.getDeviceName()); - device.setType(requestMsg.getDeviceType()); - device.setCustomerId(sparkplugNode.getCustomerId()); - DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(sparkplugNode.getTenantId(), requestMsg.getDeviceType()); - device.setDeviceProfileId(deviceProfile.getId()); - ObjectNode additionalInfo = JacksonUtil.newObjectNode(); - additionalInfo.put(DataConstants.LAST_CONNECTED_SPARKPLUG, sparkplugNodeId.toString()); - device.setAdditionalInfo(additionalInfo); - Device savedDevice = deviceService.saveDevice(device); - tbClusterService.onDeviceUpdated(savedDevice, null); - device = savedDevice; - - relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(device.getId(), sparkplugNode.getId(), "Created")); - - TbMsgMetaData metaData = new TbMsgMetaData(); - CustomerId customerId = sparkplugNode.getCustomerId(); - if (customerId != null && !customerId.isNullUid()) { - metaData.putValue("customerId", customerId.toString()); - } - metaData.putValue("sparkplugNodeId", sparkplugNodeId.toString()); - - DeviceId deviceId = device.getId(); - ObjectNode entityNode = mapper.valueToTree(device); - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); - } else { - JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); - if (deviceAdditionalInfo == null) { - deviceAdditionalInfo = JacksonUtil.newObjectNode(); - } - if (deviceAdditionalInfo.isObject() && - (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_SPARKPLUG) - || !sparkplugNodeId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_SPARKPLUG).asText()))) { - ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; - newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_SPARKPLUG, sparkplugNodeId.toString()); - Device savedDevice = deviceService.saveDevice(device); - relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(device.getId(), sparkplugNode.getId(), "Created")); - - tbClusterService.onDeviceUpdated(savedDevice, device); - } - - - } - TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.Builder builder = - TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.newBuilder() - .setDeviceInfo(getDeviceInfoProto(device)); - DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); - if (deviceProfile != null) { - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); - } else { - log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); - } - return TransportApiResponseMsg.newBuilder() - .setGetOrCreateDeviceSparkResponseMsg(builder.build()) - .build(); - } catch (JsonProcessingException e) { - log.warn("[{}] Failed to lookup device by sparkplug Node id and name: [{}]", sparkplugNodeId, requestMsg.getDeviceName(), e); - throw new RuntimeException(e); - } finally { - deviceCreationLock.unlock(); - } - }, dbCallbackExecutorService); - } - private ListenableFuture handle(ProvisionDeviceRequestMsg requestMsg) { ListenableFuture provisionResponseFuture = null; try { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index da425c7b2c..5a28d42546 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -193,18 +193,6 @@ message GetOrCreateDeviceFromGatewayResponseMsg { bytes profileBody = 2; } -message GetOrCreateDeviceFromSparkplugRequestMsg { - string deviceName = 1; - string deviceType = 2; - int64 sparkplugIdMSB = 3; - int64 sparkplugIdLSB = 4; -} - -message GetOrCreateDeviceFromSparkplugResponseMsg { - DeviceInfoProto deviceInfo = 1; - bytes profileBody = 2; -} - message GetEntityProfileRequestMsg { string entityType = 1; int64 entityIdMSB = 2; @@ -909,7 +897,6 @@ message TransportApiRequestMsg { GetDeviceRequestMsg deviceRequestMsg = 12; GetDeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 13; GetAllQueueRoutingInfoRequestMsg getAllQueueRoutingInfoRequestMsg = 14; - GetOrCreateDeviceFromSparkplugRequestMsg getOrCreateDeviceSparlplugRequestMsg = 15; } /* Response from ThingsBoard Core Service to Transport Service */ @@ -925,7 +912,6 @@ message TransportApiResponseMsg { GetDeviceResponseMsg deviceResponseMsg = 9; GetDeviceCredentialsResponseMsg deviceCredentialsResponseMsg = 10; repeated GetQueueRoutingInfoResponseMsg getQueueRoutingInfoResponseMsgs = 11; - GetOrCreateDeviceFromSparkplugResponseMsg getOrCreateDeviceSparkResponseMsg = 12; } /* Messages that are handled by ThingsBoard Core Service */ diff --git a/common/transport/mqtt/pom.xml b/common/transport/mqtt/pom.xml index 9442a66646..d8fd475ee7 100644 --- a/common/transport/mqtt/pom.xml +++ b/common/transport/mqtt/pom.xml @@ -97,6 +97,19 @@ awaitility test + + com.google.protobuf + protobuf-java + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + + diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 791f28c8cf..d28669895a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -47,7 +47,6 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; -import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; @@ -68,13 +67,14 @@ import org.thingsboard.server.common.transport.util.SslUtil; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; +import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugTopic; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; @@ -331,14 +331,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } } else if (sparkPlugSessionHandler != null) { - try { - SparkplugTopic sparkplugTopic = parseTopic(topicName); - log.error("Publish [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); - handleSparkplugPublishMsg(ctx, sparkplugTopic, msgId, mqttMsg); - transportService.reportActivity(deviceSessionCtx.getSessionInfo()); - } catch (Exception e) { - e.printStackTrace(); - } + handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg); + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } else { processDevicePublish(ctx, mqttMsg, topicName, msgId); } @@ -380,25 +374,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, SparkplugTopic sparkplugTopic, int msgId, MqttPublishMessage mqttMsg) { - String topicName = sparkplugTopic.toString(); + private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) { try { - switch(sparkplugTopic.getType()) { - case NBIRTH: - // TODO regular publish -// sparkPlugSessionHandler.onNodeConnectProto(mqttMsg); - break; - case DBIRTH: - sparkPlugSessionHandler.onDeviceConnectProto(mqttMsg); - break; - default: - - } - + sparkPlugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg); } catch (RuntimeException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); ctx.close(); - } catch (AdaptorException e) { + } catch (Exception e) { log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); sendAckOrCloseSession(ctx, topicName, msgId); } @@ -655,82 +637,80 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement return; } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); - try { - SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName()); - log.error("Subscribe [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); - } catch (Exception e) { - e.printStackTrace(); - } - List grantedQoSList = new ArrayList<>(); boolean activityReported = false; for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); try { - switch (topic) { - case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); - activityReported = true; - break; - } - case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); - activityReported = true; - break; - } - case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); - activityReported = true; - break; - } - case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: { - processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); - activityReported = true; - break; - } - case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: { - processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); - activityReported = true; - break; + if (sparkPlugSessionHandler != null) { + SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName()); + sparkPlugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS); + } else { + switch (topic) { + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); + activityReported = true; + break; + } + case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: { + processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: { + processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO); + activityReported = true; + break; + } + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC: + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: + case MqttTopics.GATEWAY_RPC_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: + case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: + case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: + case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: + case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: + case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + default: + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(FAILURE.value()); + break; } - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC: - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC: - case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC: - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC: - case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: - case MqttTopics.GATEWAY_RPC_TOPIC: - case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: - case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: - case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: - case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: - case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: - registerSubQoS(topic, grantedQoSList, reqQoS); - break; - default: - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(FAILURE.value()); - break; } } catch (Exception e) { log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); @@ -998,22 +978,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void checkSparkPlugConnected(SessionMetaData sessionMetaData, MqttConnectMessage connectMessage) { - if (((MqttDeviceProfileTransportConfiguration) deviceSessionCtx.getDeviceProfile().getProfileData().getTransportConfiguration()) - .isSparkPlug()) { + private void checkSparkPlugSession(SessionMetaData sessionMetaData, MqttConnectMessage connectMessage) { + if (deviceSessionCtx.isSparkplug()) { TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo(); try { JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo()); if (infoNode != null) { SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic()); + SparkplugBProto.Payload payloadBProto = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); if (sparkPlugSessionHandler == null) { - log.error("Connected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); + log.error("SparkPlugConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); sparkPlugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString()); } else { - log.error("ReConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); - } - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); + log.error("SparkPlugReConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); } } } catch (Exception e) { @@ -1057,7 +1034,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onSuccess(Void msg) { SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); checkGatewaySession(sessionMetaData); - checkSparkPlugConnected(sessionMetaData, connectMessage); + checkSparkPlugSession(sessionMetaData, connectMessage); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); deviceSessionCtx.setConnected(true); log.debug("[{}] Client connected!", sessionId); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 2163610b38..8499836b26 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -255,4 +255,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { return Collections.unmodifiableCollection(msgQueue); } + public boolean isSparkplug () { + return ((MqttDeviceProfileTransportConfiguration) this.getDeviceProfile().getProfileData().getTransportConfiguration()).isSparkPlug(); + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 46c1b64f54..ae93d50364 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -27,23 +27,25 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse; +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.transport.mqtt.MqttTransportContext; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; -import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.annotation.Nullable; +import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -102,7 +104,7 @@ public class SparkplugNodeSessionHandler { return sessionId; } - public String getNodeTopic() { + public String getNodeTopic() { return nodeTopic; } @@ -138,12 +140,64 @@ public class SparkplugNodeSessionHandler { String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId(); String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType(); processOnConnect(mqttMsg, deviceName, deviceType); - } catch (Exception e) { + } catch (Exception e) { throw new AdaptorException(e); } } - private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + public void onPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) throws Exception { + SparkplugTopic sparkplugTopic = parseTopic(topicName); + log.error("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + if (sparkplugTopic.isNode()) { + // A node topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case NBIRTH: + // TODO + break; + case NCMD: + // TODO + break; + case NDATA: + // TODO + break; + case NDEATH: + onNodeDisconnectProto(mqttMsg); + break; + case NRECORD: + // TODO + break; + default: + } + } else { + // A device topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case DBIRTH: + onDeviceConnectProto(mqttMsg); + break; + case DCMD: + // TODO + break; + case DDATA: + // TODO + break; + case DDEATH: + onDeviceDisconnectProto(mqttMsg); + break; + case DRECORD: + // TODO + break; + default: + } + } + } + + private void onNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { try { TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); String deviceName = checkDeviceName(connectProto.getDeviceName()); @@ -153,16 +207,75 @@ public class SparkplugNodeSessionHandler { } } + private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + try { + TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); + String deviceName = checkDeviceName(connectProto.getDeviceName()); + // TODO disconnect device without disconnect Node + } catch (RuntimeException | InvalidProtocolBufferException e) { + throw new AdaptorException(e); + } + } + private void processOnDisconnect(MqttPublishMessage msg, String deviceName) { deregisterSession(deviceName); ack(msg); } - - private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException { - return JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload()); + public void handleSparkplugSubscribeMsg(List grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) { + String topicName = sparkplugTopic.toString(); + log.error("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + + if (sparkplugTopic.isNode()) { + // A node topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case NBIRTH: + // TODO + break; + case NCMD: + // TODO + break; + case NDATA: + // TODO + break; + case NDEATH: + // TODO + break; + case NRECORD: + // TODO + break; + default: + } + } else { + // A device topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case DBIRTH: + // TODO + break; + case DCMD: + // TODO + break; + case DDATA: + // TODO + break; + case DDEATH: + // TODO + break; + case DRECORD: + // TODO + break; + default: + } + } } + private byte[] getBytes(ByteBuf payload) { return ProtoMqttAdaptor.toBytes(payload); } @@ -242,15 +355,15 @@ public class SparkplugNodeSessionHandler { return future; } try { - transportService.process(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg.newBuilder() + transportService.process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() .setDeviceName(deviceName) .setDeviceType(deviceType) - .setSparkplugIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits()) - .setSparkplugIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits()) + .setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits()) + .setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits()) .build(), new TransportServiceCallback<>() { @Override - public void onSuccess(GetOrCreateDeviceFromSparkplugResponse msg) { + public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { if (msg.getDeviceInfo() == null) { System.out.println("DeviceInfo == null"); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java deleted file mode 100644 index 467b4d8d7a..0000000000 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.util.sparkplug; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.List; - -/** - * Created by nickAS21 on 13.12.22 - */ -@JsonInclude(JsonInclude.Include.NON_NULL) -public class SparkplugBPayload { - - private Date timestamp; - private List metrics; - private long seq = -1; - private String uuid; - private byte[] body; - - public SparkplugBPayload() {}; - - public SparkplugBPayload(Date timestamp, List metrics, long seq, String uuid, byte[] body) { - this.timestamp = timestamp; - this.metrics = metrics; - this.seq = seq; - this.uuid = uuid; - this.body = body; - } - - public Date getTimestamp() { - return timestamp; - } - - public void setTimestamp(Date timestamp) { - this.timestamp = timestamp; - } - - public void addMetric(Metric metric) { - metrics.add(metric); - } - - public void addMetric(int index, Metric metric) { - metrics.add(index, metric); - } - - public void addMetrics(List metrics) { - this.metrics.addAll(metrics); - } - - public Metric removeMetric(int index) { - return metrics.remove(index); - } - - public boolean removeMetric(Metric metric) { - return metrics.remove(metric); - } - - public List getMetrics() { - return metrics; - } - - @JsonIgnore - public Integer getMetricCount() { - return metrics.size(); - } - - public void setMetrics(List metrics) { - this.metrics = metrics; - } - - public long getSeq() { - return seq; - } - - public void setSeq(long seq) { - this.seq = seq; - } - - public String getUuid() { - return uuid; - } - - public void setUuid(String uuid) { - this.uuid = uuid; - } - - public byte[] getBody() { - return body; - } - - public void setBody(byte[] body) { - this.body = body; - } - - @Override - public String toString() { - return "SparkplugBPayload [timestamp=" + timestamp + ", metrics=" + metrics + ", seq=" + seq + ", uuid=" + uuid - + ", body=" + Arrays.toString(body) + "]"; - } - - /** - * A builder for creating a {@link SparkplugBPayload} instance. - */ - public static class SparkplugBPayloadBuilder { - - private Date timestamp; - private List metrics; - private long seq = -1; - private String uuid; - private byte[] body; - - public SparkplugBPayloadBuilder(long sequenceNumber) { - this.seq = sequenceNumber; - metrics = new ArrayList(); - } - - public SparkplugBPayloadBuilder() { - metrics = new ArrayList(); - } - - public SparkplugBPayloadBuilder addMetric(Metric metric) { - this.metrics.add(metric); - return this; - } - - public SparkplugBPayloadBuilder addMetrics(Collection metrics) { - this.metrics.addAll(metrics); - return this; - } - - public SparkplugBPayloadBuilder setTimestamp(Date timestamp) { - this.timestamp = timestamp; - return this; - } - - public SparkplugBPayloadBuilder setSeq(long seq) { - this.seq = seq; - return this; - } - - public SparkplugBPayloadBuilder setUuid(String uuid) { - this.uuid = uuid; - return this; - } - - public SparkplugBPayloadBuilder setBody(byte[] body) { - this.body = body; - return this; - } - - public SparkplugBPayload createPayload() { - return new SparkplugBPayload(timestamp, metrics, seq, uuid, body); - } - } -} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java deleted file mode 100644 index 910a47286d..0000000000 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java +++ /dev/null @@ -1,371 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.util.sparkplug; - -/** - * Created by nickAS21 on 13.12.22 - */ - -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetricDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A {@link SparkplugPayloadDecoder} implementation for decoding Sparkplug B payloads. - */ -@Slf4j -public class SparkplugBPayloadDecoder implements SparkplugPayloadDecoder { - - - public SparkplugBPayloadDecoder() { - super(); - } - - public SparkplugBPayload buildFromByteArray(byte[] bytes) throws Exception { - SparkplugBProto.Payload protoPayload = SparkplugBProto.Payload.parseFrom(bytes); - SparkplugBPayload.SparkplugBPayloadBuilder builder = new SparkplugBPayload.SparkplugBPayloadBuilder(protoPayload.getSeq()); - - // Set the timestamp - if (protoPayload.hasTimestamp()) { - builder.setTimestamp(new Date(protoPayload.getTimestamp())); - } - - // Set the sequence number - if (protoPayload.hasSeq()) { - builder.setSeq(protoPayload.getSeq()); - } - - // Set the Metrics - for (SparkplugBProto.Payload.Metric protoMetric : protoPayload.getMetricsList()) { - builder.addMetric(convertMetric(protoMetric)); - } - - // Set the body - if (protoPayload.hasBody()) { - builder.setBody(protoPayload.getBody().toByteArray()); - } - - // Set the body - if (protoPayload.hasUuid()) { - builder.setUuid(protoPayload.getUuid()); - } - - return builder.createPayload(); - } - - private Metric convertMetric(SparkplugBProto.Payload.Metric protoMetric) throws Exception { - // Convert the dataType - MetricDataType dataType = MetricDataType.fromInteger((protoMetric.getDatatype())); - - // Build and return the Metric - return new Metric.MetricBuilder(protoMetric.getName(), dataType, getMetricValue(protoMetric)) - .isHistorical( - protoMetric.hasIsHistorical() ? protoMetric.getIsHistorical() : null) - .isTransient(protoMetric - .hasIsTransient() ? protoMetric.getIsTransient() : null) - .timestamp(protoMetric.hasTimestamp() ? new Date(protoMetric.getTimestamp()) : null) - .alias(protoMetric.hasAlias() ? protoMetric.getAlias() : null) - .metaData(protoMetric.hasMetadata() - ? new MetaData.MetaDataBuilder().contentType(protoMetric.getMetadata().getContentType()) - .size(protoMetric.getMetadata().getSize()).seq(protoMetric.getMetadata().getSeq()) - .fileName(protoMetric.getMetadata().getFileName()) - .fileType(protoMetric.getMetadata().getFileType()) - .md5(protoMetric.getMetadata().getMd5()) - .description(protoMetric.getMetadata().getDescription()).createMetaData() - : null) - .properties(protoMetric.hasProperties() - ? new PropertySet.PropertySetBuilder().addProperties(convertProperties(protoMetric.getProperties())) - .createPropertySet() - : null) - .createMetric(); - } - - private Map convertProperties(SparkplugBProto.Payload.PropertySet decodedPropSet) - throws Exception { - Map map = new HashMap(); - List keys = decodedPropSet.getKeysList(); - List values = decodedPropSet.getValuesList(); - for (int i = 0; i < keys.size(); i++) { - SparkplugBProto.Payload.PropertyValue value = values.get(i); - map.put(keys.get(i), - new PropertyValue(PropertyDataType.fromInteger(value.getType()), getPropertyValue(value))); - } - return map; - } - - private Object getPropertyValue(SparkplugBProto.Payload.PropertyValue value) throws Exception { - PropertyDataType type = PropertyDataType.fromInteger(value.getType()); - if (value.getIsNull()) { - return null; - } - switch (type) { - case Boolean: - return value.getBooleanValue(); - case DateTime: - return new Date(value.getLongValue()); - case Float: - return value.getFloatValue(); - case Double: - return value.getDoubleValue(); - case Int8: - return (byte) value.getIntValue(); - case Int16: - case UInt8: - return (short) value.getIntValue(); - case Int32: - case UInt16: - return value.getIntValue(); - case UInt32: - case Int64: - return value.getLongValue(); - case UInt64: - return BigInteger.valueOf(value.getLongValue()); - case String: - case Text: - return value.getStringValue(); - case PropertySet: - return new PropertySet.PropertySetBuilder().addProperties(convertProperties(value.getPropertysetValue())) - .createPropertySet(); - case PropertySetList: - List propertySetList = new ArrayList(); - List list = value.getPropertysetsValue().getPropertysetList(); - for (SparkplugBProto.Payload.PropertySet decodedPropSet : list) { - propertySetList.add(new PropertySet.PropertySetBuilder().addProperties(convertProperties(decodedPropSet)) - .createPropertySet()); - } - return propertySetList; - case Unknown: - default: - throw new Exception("Failed to decode: Unknown Property Data Type " + type); - } - } - - private Object getMetricValue(SparkplugBProto.Payload.Metric protoMetric) throws Exception { - // Check if the null flag has been set indicating that the value is null - if (protoMetric.getIsNull()) { - return null; - } - // Otherwise convert the value based on the type - int metricType = protoMetric.getDatatype(); - switch (MetricDataType.fromInteger(metricType)) { - case Boolean: - return protoMetric.getBooleanValue(); - case DateTime: - return new Date(protoMetric.getLongValue()); - case File: - String filename = protoMetric.getMetadata().getFileName(); - byte[] fileBytes = protoMetric.getBytesValue().toByteArray(); - return new File(filename, fileBytes); - case Float: - return protoMetric.getFloatValue(); - case Double: - return protoMetric.getDoubleValue(); - case Int8: - return (byte) protoMetric.getIntValue(); - case Int16: - case UInt8: - return (short) protoMetric.getIntValue(); - case Int32: - case UInt16: - return protoMetric.getIntValue(); - case UInt32: - case Int64: - return protoMetric.getLongValue(); - case UInt64: - return BigInteger.valueOf(protoMetric.getLongValue()); - case String: - case Text: - case UUID: - return protoMetric.getStringValue(); - case Bytes: - return protoMetric.getBytesValue().toByteArray(); - case DataSet: - SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); - // Build the and create the DataSet - return new DataSet.DataSetBuilder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) - .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) - .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) - .createDataSet(); - case Template: - SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue(); - List metrics = new ArrayList(); - List parameters = new ArrayList(); - - for (SparkplugBProto.Payload.Template.Parameter protoParameter : protoTemplate.getParametersList()) { - String name = protoParameter.getName(); - ParameterDataType type = ParameterDataType.fromInteger(protoParameter.getType()); - Object value = getParameterValue(protoParameter); - if (log.isTraceEnabled()) { - log.trace("Setting template parameter name: " + name + ", type: " + type + ", value: " - + value + ", valueType" + value.getClass()); - } - - parameters.add(new Parameter(name, type, value)); - } - - for (SparkplugBProto.Payload.Metric protoTemplateMetric : protoTemplate.getMetricsList()) { - Metric templateMetric = convertMetric(protoTemplateMetric); - if (log.isTraceEnabled()) { - log.trace("Setting template parameter name: " + templateMetric.getName() + ", type: " - + templateMetric.getDataType() + ", value: " + templateMetric.getValue()); - } - metrics.add(templateMetric); - } - - Template template = new Template.TemplateBuilder().version(protoTemplate.getVersion()) - .templateRef(protoTemplate.getTemplateRef()).definition(protoTemplate.getIsDefinition()) - .addMetrics(metrics).addParameters(parameters).createTemplate(); - - if (log.isTraceEnabled()) { - log.trace( - "Setting template - name: " + protoMetric.getName() + ", version: " + template.getVersion() - + ", ref: " + template.getTemplateRef() + ", isDef: " + template.isDefinition() - + ", metrics: " + metrics.size() + ", params: " + parameters.size()); - } - - return template; - case Unknown: - default: - throw new Exception("Failed to decode: Unknown Metric DataType " + metricType); - - } - } - - private Collection convertDataSetRows(List protoRows, - List protoTypes) throws Exception { - Collection rows = new ArrayList(); - if (protoRows != null) { - for (SparkplugBProto.Payload.DataSet.Row protoRow : protoRows) { - List protoValues = protoRow.getElementsList(); - List> values = new ArrayList>(); - for (int index = 0; index < protoRow.getElementsCount(); index++) { - values.add(convertDataSetValue(protoTypes.get(index), protoValues.get(index))); - } - // Add the values to the row and the row to the rows - rows.add(new Row.RowBuilder().addValues(values).createRow()); - } - } - return rows; - } - - private Collection convertDataSetDataTypes(List protoTypes) { - List types = new ArrayList(); - // Build up a List of column types - for (int type : protoTypes) { - types.add(DataSetDataType.fromInteger(type)); - } - return types; - } - - private Object getParameterValue(SparkplugBProto.Payload.Template.Parameter protoParameter) throws Exception { - // Otherwise convert the value based on the type - int type = protoParameter.getType(); - switch (MetricDataType.fromInteger(type)) { - case Boolean: - return protoParameter.getBooleanValue(); - case DateTime: - return new Date(protoParameter.getLongValue()); - case Float: - return protoParameter.getFloatValue(); - case Double: - return protoParameter.getDoubleValue(); - case Int8: - return (byte) protoParameter.getIntValue(); - case Int16: - case UInt8: - return (short) protoParameter.getIntValue(); - case Int32: - case UInt16: - return protoParameter.getIntValue(); - case UInt32: - case Int64: - return protoParameter.getLongValue(); - case UInt64: - return BigInteger.valueOf(protoParameter.getLongValue()); - case String: - case Text: - return protoParameter.getStringValue(); - case Unknown: - default: - throw new Exception("Failed to decode: Unknown Parameter Type " + type); - } - } - - private SparkplugValue convertDataSetValue(int protoType, SparkplugBProto.Payload.DataSet.DataSetValue protoValue) - throws Exception { - - DataSetDataType type = DataSetDataType.fromInteger(protoType); - switch (type) { - case Boolean: - return new SparkplugValue(type, protoValue.getBooleanValue()); - case DateTime: - // FIXME - remove after is_null is supported for dataset values - if (protoValue.getLongValue() == -9223372036854775808L) { - return new SparkplugValue(type, null); - } else { - return new SparkplugValue(type, new Date(protoValue.getLongValue())); - } - case Float: - return new SparkplugValue(type, protoValue.getFloatValue()); - case Double: - return new SparkplugValue(type, protoValue.getDoubleValue()); - case Int8: - return new SparkplugValue(type, (byte) protoValue.getIntValue()); - case UInt8: - case Int16: - return new SparkplugValue(type, (short) protoValue.getIntValue()); - case UInt16: - case Int32: - return new SparkplugValue(type, protoValue.getIntValue()); - case UInt32: - case Int64: - return new SparkplugValue(type, protoValue.getLongValue()); - case UInt64: - return new SparkplugValue(type, BigInteger.valueOf(protoValue.getLongValue())); - case String: - case Text: - if (protoValue.getStringValue().equals("null")) { - return new SparkplugValue(type, null); - } else { - return new SparkplugValue(type, protoValue.getStringValue()); - } - case Unknown: - default: - log.error("Unknown DataType: " + protoType); - throw new Exception("Failed to decode"); - } - } -} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java deleted file mode 100644 index 03ab231f42..0000000000 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java +++ /dev/null @@ -1,545 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.util.sparkplug; - -import com.google.protobuf.ByteString; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template; -import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue; - -import java.io.IOException; -import java.math.BigInteger; -import java.util.Date; -import java.util.List; -import java.util.Map; - -/** - * Created by nickAS21 on 13.12.22 - */ -public class SparkplugBPayloadEncoder implements SparkplugPayloadEncoder { - - private static Logger logger = LogManager.getLogger(SparkplugBPayloadEncoder.class.getName()); - - public SparkplugBPayloadEncoder() { - super(); - } - - public byte[] getBytes(SparkplugBPayload payload) throws IOException { - - SparkplugBProto.Payload.Builder protoMsg = SparkplugBProto.Payload.newBuilder(); - - // Set the timestamp - if (payload.getTimestamp() != null) { - protoMsg.setTimestamp(payload.getTimestamp().getTime()); - } - - // Set the sequence number - protoMsg.setSeq(payload.getSeq()); - - // Set the UUID if defined - if (payload.getUuid() != null) { - protoMsg.setUuid(payload.getUuid()); - } - - // Set the metrics - for (Metric metric : payload.getMetrics()) { - try { - protoMsg.addMetrics(convertMetric(metric)); - } catch(Exception e) { - logger.error("Failed to add metric: " + metric.getName()); - throw new RuntimeException(e); - } - } - - // Set the body - if (payload.getBody() != null) { - protoMsg.setBody(ByteString.copyFrom(payload.getBody())); - } - - return protoMsg.build().toByteArray(); - } - - private SparkplugBProto.Payload.Metric.Builder convertMetric(Metric metric) throws Exception { - - // build a metric - SparkplugBProto.Payload.Metric.Builder builder = SparkplugBProto.Payload.Metric.newBuilder(); - - // set the basic parameters - builder.setDatatype(metric.getDataType().toIntValue()); - builder = setMetricValue(builder, metric); - - // Set the name, data type, and value - if (metric.hasName()) { - builder.setName(metric.getName()); - } - - // Set the alias - if (metric.hasAlias()) { - builder.setAlias(metric.getAlias()); - } - - // Set the timestamp - if (metric.getTimestamp() != null) { - builder.setTimestamp(metric.getTimestamp().getTime()); - } - - // Set isHistorical - if (metric.getIsHistorical() != null) { - builder.setIsHistorical(metric.isHistorical()); - } - - // Set isTransient - if (metric.getIsTransient() != null) { - builder.setIsTransient(metric.isTransient()); - } - - // Set isNull - if (metric.getIsNull() != null) { - builder.setIsNull(metric.isNull()); - } - - // Set the metadata - if (metric.getMetaData() != null) { - builder = setMetaData(builder, metric); - } - - // Set the property set - if (metric.getProperties() != null) { - builder.setProperties(convertPropertySet(metric.getProperties())); - } - - return builder; - } - - private SparkplugBProto.Payload.Template.Parameter.Builder convertParameter(Parameter parameter) throws Exception { - - // build a metric - SparkplugBProto.Payload.Template.Parameter.Builder builder = - SparkplugBProto.Payload.Template.Parameter.newBuilder(); - - if (logger.isTraceEnabled()) { - logger.trace("Adding parameter: " + parameter.getName()); - logger.trace(" type: " + parameter.getType()); - } - - // Set the name - builder.setName(parameter.getName()); - - // Set the type and value - builder = setParameterValue(builder, parameter); - - return builder; - } - - private SparkplugBProto.Payload.PropertySet.Builder convertPropertySet(PropertySet propertySet) throws Exception { - SparkplugBProto.Payload.PropertySet.Builder setBuilder = SparkplugBProto.Payload.PropertySet.newBuilder(); - - Map map = propertySet.getPropertyMap(); - for (String key : map.keySet()) { - SparkplugBProto.Payload.PropertyValue.Builder builder = SparkplugBProto.Payload.PropertyValue.newBuilder(); - PropertyValue value = map.get(key); - PropertyDataType type = value.getType(); - builder.setType(type.toIntValue()); - if (value.getValue() == null) { - builder.setIsNull(true); - } else { - switch (type) { - case Boolean: - builder.setBooleanValue((Boolean) value.getValue()); - break; - case DateTime: - builder.setLongValue(((Date) value.getValue()).getTime()); - break; - case Double: - builder.setDoubleValue((Double) value.getValue()); - break; - case Float: - builder.setFloatValue((Float) value.getValue()); - break; - case Int8: - builder.setIntValue((Byte) value.getValue()); - break; - case Int16: - case UInt8: - builder.setIntValue((Short) value.getValue()); - break; - case Int32: - case UInt16: - builder.setIntValue((Integer) value.getValue()); - break; - case Int64: - case UInt32: - builder.setLongValue((Long) value.getValue()); - break; - case UInt64: - builder.setLongValue(((BigInteger) value.getValue()).longValue()); - break; - case String: - case Text: - builder.setStringValue((String) value.getValue()); - break; - case PropertySet: - builder.setPropertysetValue(convertPropertySet((PropertySet) value.getValue())); - break; - case PropertySetList: - List setList = (List) value.getValue(); - SparkplugBProto.Payload.PropertySetList.Builder listBuilder = - SparkplugBProto.Payload.PropertySetList.newBuilder(); - for (Object obj : setList) { - listBuilder.addPropertyset(convertPropertySet((PropertySet) obj)); - } - builder.setPropertysetsValue(listBuilder); - break; - case Unknown: - default: - logger.error("Unknown DataType: " + value.getType()); - throw new Exception("Failed to convert value " + value.getType()); - } - } - setBuilder.addKeys(key); - setBuilder.addValues(builder); - } - return setBuilder; - } - - private SparkplugBProto.Payload.Template.Parameter.Builder setParameterValue( - SparkplugBProto.Payload.Template.Parameter.Builder builder, Parameter parameter) throws Exception { - ParameterDataType type = parameter.getType(); - builder.setType(type.toIntValue()); - Object value = parameter.getValue(); - switch (type) { - case Boolean: - builder.setBooleanValue(toBoolean(value)); - break; - case DateTime: - builder.setLongValue(((Date) value).getTime()); - break; - case Double: - builder.setDoubleValue((Double) value); - break; - case Float: - builder.setFloatValue((Float) value); - break; - case Int8: - builder.setIntValue((Byte) value); - break; - case Int16: - case UInt8: - builder.setIntValue((Short) value); - break; - case Int32: - case UInt16: - builder.setIntValue((Integer) value); - break; - case Int64: - case UInt32: - builder.setLongValue((Long) value); - break; - case UInt64: - builder.setLongValue(((BigInteger) value).longValue()); - break; - case Text: - case String: - if (value == null) { - builder.setStringValue(""); - } else { - builder.setStringValue((String) value); - } - break; - case Unknown: - default: - logger.error("Unknown Type: " + type); - throw new Exception("Failed to encode"); - - } - return builder; - } - - private SparkplugBProto.Payload.Metric.Builder setMetricValue(SparkplugBProto.Payload.Metric.Builder metricBuilder, - Metric metric) throws Exception { - - // Set the data type - metricBuilder.setDatatype(metric.getDataType().toIntValue()); - - if (metric.getValue() == null) { - metricBuilder.setIsNull(true); - } else { - switch (metric.getDataType()) { - case Boolean: - metricBuilder.setBooleanValue(toBoolean(metric.getValue())); - break; - case DateTime: - metricBuilder.setLongValue(((Date) metric.getValue()).getTime()); - break; - case File: - metricBuilder.setBytesValue(ByteString.copyFrom(((File) metric.getValue()).getBytes())); - SparkplugBProto.Payload.MetaData.Builder metaDataBuilder = - SparkplugBProto.Payload.MetaData.newBuilder(); - metaDataBuilder.setFileName(((File) metric.getValue()).getFileName()); - metricBuilder.setMetadata(metaDataBuilder); - break; - case Float: - metricBuilder.setFloatValue((Float) metric.getValue()); - break; - case Double: - metricBuilder.setDoubleValue((Double) metric.getValue()); - break; - case Int8: - metricBuilder.setIntValue(((Byte)metric.getValue()).intValue()); - break; - case Int16: - case UInt8: - metricBuilder.setIntValue(((Short)metric.getValue()).intValue()); - break; - case Int32: - case UInt16: - metricBuilder.setIntValue((int) metric.getValue()); - break; - case UInt32: - case Int64: - metricBuilder.setLongValue((Long) metric.getValue()); - break; - case UInt64: - metricBuilder.setLongValue(((BigInteger) metric.getValue()).longValue()); - break; - case String: - case Text: - case UUID: - metricBuilder.setStringValue((String) metric.getValue()); - break; - case Bytes: - metricBuilder.setBytesValue(ByteString.copyFrom((byte[]) metric.getValue())); - break; - case DataSet: - DataSet dataSet = (DataSet) metric.getValue(); - SparkplugBProto.Payload.DataSet.Builder dataSetBuilder = - SparkplugBProto.Payload.DataSet.newBuilder(); - - dataSetBuilder.setNumOfColumns(dataSet.getNumOfColumns()); - - // Column names - List columnNames = dataSet.getColumnNames(); - if (columnNames != null && !columnNames.isEmpty()) { - for (String name : columnNames) { - // Add the column name - dataSetBuilder.addColumns(name); - } - } - - // Column types - List columnTypes = dataSet.getTypes(); - if (columnTypes != null && !columnTypes.isEmpty()) { - for (DataSetDataType type : columnTypes) { - // Add the column type - dataSetBuilder.addTypes(type.toIntValue()); - } - } - - // Dataset rows - List rows = dataSet.getRows(); - if (rows != null && !rows.isEmpty()) { - for (Row row : rows) { - SparkplugBProto.Payload.DataSet.Row.Builder protoRowBuilder = - SparkplugBProto.Payload.DataSet.Row.newBuilder(); - List> values = row.getValues(); - if (values != null && !values.isEmpty()) { - for (SparkplugValue value : values) { - // Add the converted element - protoRowBuilder.addElements(convertDataSetValue(value)); - } - - dataSetBuilder.addRows(protoRowBuilder); - } - } - } - - // Finally add the dataset - metricBuilder.setDatasetValue(dataSetBuilder); - break; - case Template: - Template template = (Template) metric.getValue(); - SparkplugBProto.Payload.Template.Builder templateBuilder = - SparkplugBProto.Payload.Template.newBuilder(); - - // Set isDefinition - templateBuilder.setIsDefinition(template.isDefinition()); - - // Set Version - if (template.getVersion() != null) { - templateBuilder.setVersion(template.getVersion()); - } - - // Set Template Reference - if (template.getTemplateRef() != null) { - templateBuilder.setTemplateRef(template.getTemplateRef()); - } - - // Set the template metrics - if (template.getMetrics() != null) { - for (Metric templateMetric : template.getMetrics()) { - templateBuilder.addMetrics(convertMetric(templateMetric)); - } - } - - // Set the template parameters - if (template.getParameters() != null) { - for (Parameter parameter : template.getParameters()) { - templateBuilder.addParameters(convertParameter(parameter)); - } - } - - // Add the template to the metric - metricBuilder.setTemplateValue(templateBuilder); - break; - case Unknown: - default: - logger.error("Unknown DataType: " + metric.getDataType()); - throw new Exception("Failed to encode"); - - } - } - return metricBuilder; - } - - private SparkplugBProto.Payload.Metric.Builder setMetaData(SparkplugBProto.Payload.Metric.Builder metricBuilder, - Metric metric) throws Exception { - - // If the builder has been built already - use it - SparkplugBProto.Payload.MetaData.Builder metaDataBuilder = metricBuilder.getMetadataBuilder() != null - ? metricBuilder.getMetadataBuilder() - : SparkplugBProto.Payload.MetaData.newBuilder(); - - MetaData metaData = metric.getMetaData(); - if (metaData.getContentType() != null) { - metaDataBuilder.setContentType(metaData.getContentType()); - } - if (metaData.getSize() != null) { - metaDataBuilder.setSize(metaData.getSize()); - } - if (metaData.getSeq() != null) { - metaDataBuilder.setSeq(metaData.getSeq()); - } - if (metaData.getFileName() != null) { - metaDataBuilder.setFileName(metaData.getFileName()); - } - if (metaData.getFileType() != null) { - metaDataBuilder.setFileType(metaData.getFileType()); - } - if (metaData.getMd5() != null) { - metaDataBuilder.setMd5(metaData.getMd5()); - } - if (metaData.getDescription() != null) { - metaDataBuilder.setDescription(metaData.getDescription()); - } - metricBuilder.setMetadata(metaDataBuilder); - - return metricBuilder; - } - - private SparkplugBProto.Payload.DataSet.DataSetValue.Builder convertDataSetValue(SparkplugValue value) throws Exception { - SparkplugBProto.Payload.DataSet.DataSetValue.Builder protoValueBuilder = - SparkplugBProto.Payload.DataSet.DataSetValue.newBuilder(); - - // Set the value - DataSetDataType type = value.getType(); - switch (type) { - case Int8: - protoValueBuilder.setIntValue((Byte) value.getValue()); - break; - case Int16: - case UInt8: - protoValueBuilder.setIntValue((Short) value.getValue()); - break; - case Int32: - case UInt16: - protoValueBuilder.setIntValue((Integer) value.getValue()); - break; - case Int64: - case UInt32: - protoValueBuilder.setLongValue((Long) value.getValue()); - break; - case UInt64: - protoValueBuilder.setLongValue(((BigInteger) value.getValue()).longValue()); - break; - case Float: - protoValueBuilder.setFloatValue((Float) value.getValue()); - break; - case Double: - protoValueBuilder.setDoubleValue((Double) value.getValue()); - break; - case String: - case Text: - if (value.getValue() != null) { - protoValueBuilder.setStringValue((String) value.getValue()); - } else { - logger.warn("String value for dataset is null"); - protoValueBuilder.setStringValue("null"); - } - break; - case Boolean: - protoValueBuilder.setBooleanValue(toBoolean(value.getValue())); - break; - case DateTime: - try { - protoValueBuilder.setLongValue(((Date) value.getValue()).getTime()); - } catch (NullPointerException npe) { - // FIXME - remove after is_null is supported for dataset values - logger.debug("Date in dataset was null - leaving it -9223372036854775808L"); - protoValueBuilder.setLongValue(-9223372036854775808L); - } - break; - default: - logger.error("Unknown DataType: " + value.getType()); - throw new Exception("Failed to convert value " + value.getType()); - } - - return protoValueBuilder; - } - - private Boolean toBoolean(Object value) { - if (value == null) { - return null; - } - if (value instanceof Integer) { - return ((Integer)value).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE; - } else if (value instanceof Long) { - return ((Long)value).longValue() == 0 ? Boolean.FALSE : Boolean.TRUE; - } else if (value instanceof Float) { - return ((Float)value).floatValue() == 0 ? Boolean.FALSE : Boolean.TRUE; - } else if (value instanceof Double) { - return ((Double)value).doubleValue() == 0 ? Boolean.FALSE : Boolean.TRUE; - } else if (value instanceof Short) { - return ((Short)value).shortValue() == 0 ? Boolean.FALSE : Boolean.TRUE; - } else if (value instanceof Byte) { - return ((Byte)value).byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE; - } else if (value instanceof String) { - return Boolean.parseBoolean(value.toString()); - } - return (Boolean)value; - } -} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java deleted file mode 100644 index cffdb133c6..0000000000 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java +++ /dev/null @@ -1,17414 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.util.sparkplug; - -/** - * Created by nickAS21 on 13.12.22 - */ -public final class SparkplugBProto { - private SparkplugBProto() {} - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistry registry) { - } - public interface PayloadOrBuilder extends - // @@protoc_insertion_point(interface_extends:org.thingsboard.server.transport.mqtt.util.sparkplug.Payload) - com.google.protobuf.GeneratedMessage. - ExtendableMessageOrBuilder { - - /** - * optional uint64 timestamp = 1; - * - *
-         * Timestamp at message sending time
-         * 
- */ - boolean hasTimestamp(); - /** - * optional uint64 timestamp = 1; - * - *
-         * Timestamp at message sending time
-         * 
- */ - long getTimestamp(); - - /** - * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; - * - *
-         * Repeated forever - no limit in Google Protobufs
-         * 
- */ - java.util.List - getMetricsList(); - /** - * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; - * - *
-         * Repeated forever - no limit in Google Protobufs
-         * 
- */ - org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.Metric getMetrics(int index); - /** - * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; - * - *
-         * Repeated forever - no limit in Google Protobufs
-         * 
- */ - int getMetricsCount(); - /** - * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; - * - *
-         * Repeated forever - no limit in Google Protobufs
-         * 
- */ - java.util.List - getMetricsOrBuilderList(); - /** - * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; - * - *
-         * Repeated forever - no limit in Google Protobufs
-         * 
- */ - org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.MetricOrBuilder getMetricsOrBuilder( - int index); - - /** - * optional uint64 seq = 3; - * - *
-         * Sequence number
-         * 
- */ - boolean hasSeq(); - /** - * optional uint64 seq = 3; - * - *
-         * Sequence number
-         * 
- */ - long getSeq(); - - /** - * optional string uuid = 4; - * - *
-         * UUID to track message type in terms of schema definitions
-         * 
- */ - boolean hasUuid(); - /** - * optional string uuid = 4; - * - *
-         * UUID to track message type in terms of schema definitions
-         * 
- */ - java.lang.String getUuid(); - /** - * optional string uuid = 4; - * - *
-         * UUID to track message type in terms of schema definitions
-         * 
- */ - com.google.protobuf.ByteString - getUuidBytes(); - - /** - * optional bytes body = 5; - * - *
-         * To optionally bypass the whole definition above
-         * 
- */ - boolean hasBody(); - /** - * optional bytes body = 5; - * - *
-         * To optionally bypass the whole definition above
-         * 
- */ - com.google.protobuf.ByteString getBody(); - } - /** - * Protobuf type {@code org.thingsboard.server.transport.mqtt.util.sparkplug.Payload} - * - *
-     * // Indexes of Data Types
-     * // Unknown placeholder for future expansion.
-     *Unknown         = 0;
-     * // Basic Types
-     *Int8            = 1;
-     *Int16           = 2;
-     *Int32           = 3;
-     *Int64           = 4;
-     *UInt8           = 5;
-     *UInt16          = 6;
-     *UInt32          = 7;
-     *UInt64          = 8;
-     *Float           = 9;
-     *Double          = 10;
-     *Boolean         = 11;
-     *String          = 12;
-     *DateTime        = 13;
-     *Text            = 14;
-     * // Additional Metric Types
-     *UUID            = 15;
-     *DataSet         = 16;
-     *Bytes           = 17;
-     *File            = 18;
-     *Template        = 19;
-     * // Additional PropertyValue Types
-     *PropertySet     = 20;
-     *PropertySetList = 21;
-     * 
- */ - public static final class Payload extends - com.google.protobuf.GeneratedMessage.ExtendableMessage< - Payload> implements - // @@protoc_insertion_point(message_implements:org.thingsboard.server.transport.mqtt.util.sparkplug.Payload) - PayloadOrBuilder { - // Use Payload.newBuilder() to construct. - private Payload(com.google.protobuf.GeneratedMessage.ExtendableBuilder builder) { - super(builder); - this.unknownFields = builder.getUnknownFields(); - } - private Payload(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } - - private static final Payload defaultInstance; - public static Payload getDefaultInstance() { - return defaultInstance; - } - - public Payload getDefaultInstanceForType() { - return defaultInstance; - } - - private final com.google.protobuf.UnknownFieldSet unknownFields; - @java.lang.Override - public final com.google.protobuf.UnknownFieldSet - getUnknownFields() { - return this.unknownFields; - } - private Payload( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - initFields(); - int mutable_bitField0_ = 0; - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder(); - try { - boolean done = false; - while (!done) { - int tag = input.readTag(); - switch (tag) { - case 0: - done = true; - break; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - done = true; - } - break; - } - case 8: { - bitField0_ |= 0x00000001; - timestamp_ = input.readUInt64(); - break; - } - case 18: { - if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { - metrics_ = new java.util.ArrayList(); - mutable_bitField0_ |= 0x00000002; - } - metrics_.add(input.readMessage(org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.Metric.PARSER, extensionRegistry)); - break; - } - case 24: { - bitField0_ |= 0x00000002; - seq_ = input.readUInt64(); - break; - } - case 34: { - com.google.protobuf.ByteString bs = input.readBytes(); - bitField0_ |= 0x00000004; - uuid_ = bs; - break; - } - case 42: { - bitField0_ |= 0x00000008; - body_ = input.readBytes(); - break; - } - } - } - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - throw e.setUnfinishedMessage(this); - } catch (java.io.IOException e) { - throw new com.google.protobuf.InvalidProtocolBufferException( - e.getMessage()).setUnfinishedMessage(this); - } finally { - if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { - metrics_ = java.util.Collections.unmodifiableList(metrics_); - } - this.unknownFields = unknownFields.build(); - makeExtensionsImmutable(); - } - } - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.internal_static_com_cirruslink_sparkplug_protobuf_Payload_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.internal_static_com_cirruslink_sparkplug_protobuf_Payload_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.class, org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.Builder.class); - } - - public static com.google.protobuf.Parser PARSER = - new com.google.protobuf.AbstractParser() { - public Payload parsePartialFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return new Payload(input, extensionRegistry); - } - }; - - @java.lang.Override - public com.google.protobuf.Parser getParserForType() { - return PARSER; - } - - public interface TemplateOrBuilder extends - // @@protoc_insertion_point(interface_extends:org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Template) - com.google.protobuf.GeneratedMessage. - ExtendableMessageOrBuilder