diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index f8b42ecc1e..9c383a4cdb 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -65,7 +65,6 @@ import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.dao.device.DeviceCredentialsService; import org.thingsboard.server.dao.device.DeviceProvisionService; import org.thingsboard.server.dao.device.DeviceService; @@ -95,6 +94,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; @@ -161,6 +161,8 @@ public class DefaultTransportApiService implements TransportApiService { result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); + } else if (transportApiRequestMsg.hasGetOrCreateDeviceSparlplugRequestMsg()) { + result = handle(transportApiRequestMsg.getGetOrCreateDeviceSparlplugRequestMsg()); } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { result = handle(transportApiRequestMsg.getEntityProfileRequestMsg()); } else if (transportApiRequestMsg.hasLwM2MRequestMsg()) { @@ -345,6 +347,78 @@ public class DefaultTransportApiService implements TransportApiService { }, dbCallbackExecutorService); } + private ListenableFuture handle(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg requestMsg) { + DeviceId sparkplugNodeId = new DeviceId(new UUID(requestMsg.getSparkplugIdMSB(), requestMsg.getSparkplugIdLSB())); + ListenableFuture sparkplugNodeFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, sparkplugNodeId); + return Futures.transform(sparkplugNodeFuture, sparkplugNode -> { + Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); + deviceCreationLock.lock(); + try { + Device device = deviceService.findDeviceByTenantIdAndName(sparkplugNode.getTenantId(), requestMsg.getDeviceName()); + if (device == null) { + TenantId tenantId = sparkplugNode.getTenantId(); + device = new Device(); + device.setTenantId(tenantId); + device.setName(requestMsg.getDeviceName()); + device.setType(requestMsg.getDeviceType()); + device.setCustomerId(sparkplugNode.getCustomerId()); + DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(sparkplugNode.getTenantId(), requestMsg.getDeviceType()); + device.setDeviceProfileId(deviceProfile.getId()); + ObjectNode additionalInfo = JacksonUtil.newObjectNode(); + additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, sparkplugNodeId.toString()); + device.setAdditionalInfo(additionalInfo); + Device savedDevice = deviceService.saveDevice(device); + tbClusterService.onDeviceUpdated(savedDevice, null); + device = savedDevice; + + relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(sparkplugNode.getId(), device.getId(), "Created")); + + TbMsgMetaData metaData = new TbMsgMetaData(); + CustomerId customerId = sparkplugNode.getCustomerId(); + if (customerId != null && !customerId.isNullUid()) { + metaData.putValue("customerId", customerId.toString()); + } + metaData.putValue("sparkplugNodeId", sparkplugNodeId.toString()); + + DeviceId deviceId = device.getId(); + ObjectNode entityNode = mapper.valueToTree(device); + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); + tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); + } else { + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo == null) { + deviceAdditionalInfo = JacksonUtil.newObjectNode(); + } + if (deviceAdditionalInfo.isObject() && + (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_SPARKPLUG) + || !sparkplugNodeId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_SPARKPLUG).asText()))) { + ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; + newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_SPARKPLUG, sparkplugNodeId.toString()); + Device savedDevice = deviceService.saveDevice(device); + tbClusterService.onDeviceUpdated(savedDevice, device); + } + } + TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.Builder builder = + TransportProtos.GetOrCreateDeviceFromSparkplugResponseMsg.newBuilder() + .setDeviceInfo(getDeviceInfoProto(device)); + DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); + if (deviceProfile != null) { + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + } else { + log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); + } + return TransportApiResponseMsg.newBuilder() + .setGetOrCreateDeviceSparkResponseMsg(builder.build()) + .build(); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to lookup device by sparkplug Node id and name: [{}]", sparkplugNodeId, requestMsg.getDeviceName(), e); + throw new RuntimeException(e); + } finally { + deviceCreationLock.unlock(); + } + }, dbCallbackExecutorService); + } + private ListenableFuture handle(ProvisionDeviceRequestMsg requestMsg) { ListenableFuture provisionResponseFuture = null; try { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 5a28d42546..da425c7b2c 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -193,6 +193,18 @@ message GetOrCreateDeviceFromGatewayResponseMsg { bytes profileBody = 2; } +message GetOrCreateDeviceFromSparkplugRequestMsg { + string deviceName = 1; + string deviceType = 2; + int64 sparkplugIdMSB = 3; + int64 sparkplugIdLSB = 4; +} + +message GetOrCreateDeviceFromSparkplugResponseMsg { + DeviceInfoProto deviceInfo = 1; + bytes profileBody = 2; +} + message GetEntityProfileRequestMsg { string entityType = 1; int64 entityIdMSB = 2; @@ -897,6 +909,7 @@ message TransportApiRequestMsg { GetDeviceRequestMsg deviceRequestMsg = 12; GetDeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 13; GetAllQueueRoutingInfoRequestMsg getAllQueueRoutingInfoRequestMsg = 14; + GetOrCreateDeviceFromSparkplugRequestMsg getOrCreateDeviceSparlplugRequestMsg = 15; } /* Response from ThingsBoard Core Service to Transport Service */ @@ -912,6 +925,7 @@ message TransportApiResponseMsg { GetDeviceResponseMsg deviceResponseMsg = 9; GetDeviceCredentialsResponseMsg deviceCredentialsResponseMsg = 10; repeated GetQueueRoutingInfoResponseMsg getQueueRoutingInfoResponseMsgs = 11; + GetOrCreateDeviceFromSparkplugResponseMsg getOrCreateDeviceSparkResponseMsg = 12; } /* Messages that are handled by ThingsBoard Core Service */ diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 970fd6177e..3ff3c42c47 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -118,4 +118,5 @@ public class DataConstants { public static final String MSG_SOURCE_KEY = "source"; public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway"; + public static final String LAST_CONNECTED_SPARKPLUG = "lastConnectedSparkplug"; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 2a320cc2ac..791f28c8cf 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -41,12 +41,13 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; @@ -72,6 +73,8 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; +import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugTopic; import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; @@ -103,6 +106,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic; /** * @author Andrew Shvayka @@ -128,6 +132,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement final DeviceSessionCtx deviceSessionCtx; volatile InetSocketAddress address; volatile GatewaySessionHandler gatewaySessionHandler; + volatile SparkplugNodeSessionHandler sparkPlugSessionHandler; private final ConcurrentHashMap otaPackSessions; private final ConcurrentHashMap chunkSizes; @@ -325,6 +330,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } + } else if (sparkPlugSessionHandler != null) { + try { + SparkplugTopic sparkplugTopic = parseTopic(topicName); + log.error("Publish [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); + handleSparkplugPublishMsg(ctx, sparkplugTopic, msgId, mqttMsg); + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); + } catch (Exception e) { + e.printStackTrace(); + } } else { processDevicePublish(ctx, mqttMsg, topicName, msgId); } @@ -366,6 +380,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, SparkplugTopic sparkplugTopic, int msgId, MqttPublishMessage mqttMsg) { + String topicName = sparkplugTopic.toString(); + try { + switch(sparkplugTopic.getType()) { + case NBIRTH: + // TODO regular publish +// sparkPlugSessionHandler.onNodeConnectProto(mqttMsg); + break; + case DBIRTH: + sparkPlugSessionHandler.onDeviceConnectProto(mqttMsg); + break; + default: + + } + + } catch (RuntimeException e) { + log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); + ctx.close(); + } catch (AdaptorException e) { + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); + sendAckOrCloseSession(ctx, topicName, msgId); + } + } + private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { Matcher fwMatcher; @@ -617,6 +655,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement return; } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); + try { + SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName()); + log.error("Subscribe [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); + } catch (Exception e) { + e.printStackTrace(); + } + List grantedQoSList = new ArrayList<>(); boolean activityReported = false; for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { @@ -953,6 +998,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void checkSparkPlugConnected(SessionMetaData sessionMetaData, MqttConnectMessage connectMessage) { + if (((MqttDeviceProfileTransportConfiguration) deviceSessionCtx.getDeviceProfile().getProfileData().getTransportConfiguration()) + .isSparkPlug()) { + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo(); + try { + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo()); + if (infoNode != null) { + SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic()); + if (sparkPlugSessionHandler == null) { + log.error("Connected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); + sparkPlugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString()); + } else { + log.error("ReConnected [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device", sparkplugTopic.getType()); + } + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { + sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); + } + } + } catch (Exception e) { + log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, device.getDeviceName(), e); + } + } + } + @Override public void operationComplete(Future future) throws Exception { log.trace("[{}] Channel closed!", sessionId); @@ -988,6 +1057,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onSuccess(Void msg) { SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); checkGatewaySession(sessionMetaData); + checkSparkPlugConnected(sessionMetaData, connectMessage); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); deviceSessionCtx.setConnected(true); log.debug("[{}] Client connected!", sessionId); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java new file mode 100644 index 0000000000..46c1b64f54 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -0,0 +1,288 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromSparkplugResponse; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.gen.transport.TransportApiProtos; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.transport.mqtt.MqttTransportContext; +import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; +import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; + +import javax.annotation.Nullable; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic; + +/** + * Created by nickAS21 on 12.12.22 + */ +@Slf4j +public class SparkplugNodeSessionHandler { + + private static final String DEFAULT_DEVICE_TYPE = "default"; + private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; + private static final String DEVICE_PROPERTY = "device"; + + private final MqttTransportContext context; + private final TransportService transportService; + private final TransportDeviceInfo nodeSparkplugInfo; + private final UUID sessionId; + private final ConcurrentMap deviceCreationLockMap; + private final ConcurrentMap devices = new ConcurrentHashMap<>(); + private final ConcurrentMap> deviceFutures = new ConcurrentHashMap<>(); + private final ConcurrentMap mqttQoSMap; + private final ChannelHandlerContext channel; + private final DeviceSessionCtx deviceSessionCtx; + private String nodeTopic; + + public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, String nodeTopic) { + this.context = deviceSessionCtx.getContext(); + this.transportService = context.getTransportService(); + this.deviceSessionCtx = deviceSessionCtx; + this.nodeSparkplugInfo = deviceSessionCtx.getDeviceInfo(); + this.sessionId = sessionId; + this.deviceCreationLockMap = createWeakMap(); + this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); + this.channel = deviceSessionCtx.getChannel(); + this.nodeTopic = nodeTopic; + } + + ConcurrentReferenceHashMap createWeakMap() { + return new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + } + + public String getNodeId() { + return context.getNodeId(); + } + + public UUID getSessionId() { + return sessionId; + } + + public String getNodeTopic() { + return nodeTopic; + } + + public int nextMsgId() { + return deviceSessionCtx.nextMsgId(); + } + + public void deregisterSession(String deviceName) { + SparkplugSessionCtx deviceSessionCtx = devices.remove(deviceName); + if (deviceSessionCtx != null) { + deregisterSession(deviceName, deviceSessionCtx); + } else { + log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName); + } + } + + private void deregisterSession(String deviceName, SparkplugSessionCtx deviceSessionCtx) { + transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); + log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); + } + + public void onDeviceDeleted(String deviceName) { + deregisterSession(deviceName); + } + + private int getMsgId(MqttPublishMessage mqttMsg) { + return mqttMsg.variableHeader().packetId(); + } + + public void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + try { + String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId(); + String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType(); + processOnConnect(mqttMsg, deviceName, deviceType); + } catch (Exception e) { + throw new AdaptorException(e); + } + } + + private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + try { + TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); + String deviceName = checkDeviceName(connectProto.getDeviceName()); + processOnDisconnect(mqttMsg, deviceName); + } catch (RuntimeException | InvalidProtocolBufferException e) { + throw new AdaptorException(e); + } + } + + private void processOnDisconnect(MqttPublishMessage msg, String deviceName) { + deregisterSession(deviceName); + ack(msg); + } + + + private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException { + return JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload()); + } + + private byte[] getBytes(ByteBuf payload) { + return ProtoMqttAdaptor.toBytes(payload); + } + + private void ack(MqttPublishMessage msg) { + int msgId = getMsgId(msg); + if (msgId > 0) { + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId)); + } + } + + ChannelFuture writeAndFlush(MqttMessage mqttMessage) { + return channel.writeAndFlush(mqttMessage); + } + + private String checkDeviceName(String deviceName) { + if (StringUtils.isEmpty(deviceName)) { + throw new RuntimeException("Device name is empty!"); + } else { + return deviceName; + } + } + + private String getDeviceName(JsonElement json) { + return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString(); + } + + + private String getDeviceType(JsonElement json) { + JsonElement type = json.getAsJsonObject().get("type"); + return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString(); + } + + + private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) { + log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName); + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable SparkplugSessionCtx result) { + ack(msg); + log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t); + + } + }, context.getExecutor()); + } + + + private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { + SparkplugSessionCtx result = devices.get(deviceName); + if (result == null) { + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); + deviceCreationLock.lock(); + try { + result = devices.get(deviceName); + if (result == null) { + return getDeviceCreationFuture(deviceName, deviceType); + } else { + return Futures.immediateFuture(result); + } + } finally { + deviceCreationLock.unlock(); + } + } else { + return Futures.immediateFuture(result); + } + } + + private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { + final SettableFuture futureToSet = SettableFuture.create(); + ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); + if (future != null) { + return future; + } + try { + transportService.process(TransportProtos.GetOrCreateDeviceFromSparkplugRequestMsg.newBuilder() + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .setSparkplugIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits()) + .setSparkplugIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits()) + .build(), + new TransportServiceCallback<>() { + @Override + public void onSuccess(GetOrCreateDeviceFromSparkplugResponse msg) { + if (msg.getDeviceInfo() == null) { + System.out.println("DeviceInfo == null"); + } + SparkplugSessionCtx nodeSparkplugSessionCtx = new SparkplugSessionCtx(SparkplugNodeSessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + if (devices.putIfAbsent(deviceName, nodeSparkplugSessionCtx) == null) { + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); + SessionInfoProto deviceSessionInfo = nodeSparkplugSessionCtx.getSessionInfo(); + transportService.registerAsyncSession(deviceSessionInfo, nodeSparkplugSessionCtx); + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() + .setSessionInfo(deviceSessionInfo) + .setSessionEvent(SESSION_EVENT_MSG_OPEN) + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) + .build(), null); + } + futureToSet.set(devices.get(deviceName)); + deviceFutures.remove(deviceName); + } + + @Override + public void onError(Throwable e) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); + futureToSet.setException(e); + deviceFutures.remove(deviceName); + } + }); + return futureToSet; + } catch (Throwable e) { + deviceFutures.remove(deviceName); + throw e; + } + } + + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java new file mode 100644 index 0000000000..467b4d8d7a --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayload.java @@ -0,0 +1,175 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +/** + * Created by nickAS21 on 13.12.22 + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SparkplugBPayload { + + private Date timestamp; + private List metrics; + private long seq = -1; + private String uuid; + private byte[] body; + + public SparkplugBPayload() {}; + + public SparkplugBPayload(Date timestamp, List metrics, long seq, String uuid, byte[] body) { + this.timestamp = timestamp; + this.metrics = metrics; + this.seq = seq; + this.uuid = uuid; + this.body = body; + } + + public Date getTimestamp() { + return timestamp; + } + + public void setTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + + public void addMetric(Metric metric) { + metrics.add(metric); + } + + public void addMetric(int index, Metric metric) { + metrics.add(index, metric); + } + + public void addMetrics(List metrics) { + this.metrics.addAll(metrics); + } + + public Metric removeMetric(int index) { + return metrics.remove(index); + } + + public boolean removeMetric(Metric metric) { + return metrics.remove(metric); + } + + public List getMetrics() { + return metrics; + } + + @JsonIgnore + public Integer getMetricCount() { + return metrics.size(); + } + + public void setMetrics(List metrics) { + this.metrics = metrics; + } + + public long getSeq() { + return seq; + } + + public void setSeq(long seq) { + this.seq = seq; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + public byte[] getBody() { + return body; + } + + public void setBody(byte[] body) { + this.body = body; + } + + @Override + public String toString() { + return "SparkplugBPayload [timestamp=" + timestamp + ", metrics=" + metrics + ", seq=" + seq + ", uuid=" + uuid + + ", body=" + Arrays.toString(body) + "]"; + } + + /** + * A builder for creating a {@link SparkplugBPayload} instance. + */ + public static class SparkplugBPayloadBuilder { + + private Date timestamp; + private List metrics; + private long seq = -1; + private String uuid; + private byte[] body; + + public SparkplugBPayloadBuilder(long sequenceNumber) { + this.seq = sequenceNumber; + metrics = new ArrayList(); + } + + public SparkplugBPayloadBuilder() { + metrics = new ArrayList(); + } + + public SparkplugBPayloadBuilder addMetric(Metric metric) { + this.metrics.add(metric); + return this; + } + + public SparkplugBPayloadBuilder addMetrics(Collection metrics) { + this.metrics.addAll(metrics); + return this; + } + + public SparkplugBPayloadBuilder setTimestamp(Date timestamp) { + this.timestamp = timestamp; + return this; + } + + public SparkplugBPayloadBuilder setSeq(long seq) { + this.seq = seq; + return this; + } + + public SparkplugBPayloadBuilder setUuid(String uuid) { + this.uuid = uuid; + return this; + } + + public SparkplugBPayloadBuilder setBody(byte[] body) { + this.body = body; + return this; + } + + public SparkplugBPayload createPayload() { + return new SparkplugBPayload(timestamp, metrics, seq, uuid, body); + } + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java new file mode 100644 index 0000000000..910a47286d --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadDecoder.java @@ -0,0 +1,371 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +/** + * Created by nickAS21 on 13.12.22 + */ + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetricDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@link SparkplugPayloadDecoder} implementation for decoding Sparkplug B payloads. + */ +@Slf4j +public class SparkplugBPayloadDecoder implements SparkplugPayloadDecoder { + + + public SparkplugBPayloadDecoder() { + super(); + } + + public SparkplugBPayload buildFromByteArray(byte[] bytes) throws Exception { + SparkplugBProto.Payload protoPayload = SparkplugBProto.Payload.parseFrom(bytes); + SparkplugBPayload.SparkplugBPayloadBuilder builder = new SparkplugBPayload.SparkplugBPayloadBuilder(protoPayload.getSeq()); + + // Set the timestamp + if (protoPayload.hasTimestamp()) { + builder.setTimestamp(new Date(protoPayload.getTimestamp())); + } + + // Set the sequence number + if (protoPayload.hasSeq()) { + builder.setSeq(protoPayload.getSeq()); + } + + // Set the Metrics + for (SparkplugBProto.Payload.Metric protoMetric : protoPayload.getMetricsList()) { + builder.addMetric(convertMetric(protoMetric)); + } + + // Set the body + if (protoPayload.hasBody()) { + builder.setBody(protoPayload.getBody().toByteArray()); + } + + // Set the body + if (protoPayload.hasUuid()) { + builder.setUuid(protoPayload.getUuid()); + } + + return builder.createPayload(); + } + + private Metric convertMetric(SparkplugBProto.Payload.Metric protoMetric) throws Exception { + // Convert the dataType + MetricDataType dataType = MetricDataType.fromInteger((protoMetric.getDatatype())); + + // Build and return the Metric + return new Metric.MetricBuilder(protoMetric.getName(), dataType, getMetricValue(protoMetric)) + .isHistorical( + protoMetric.hasIsHistorical() ? protoMetric.getIsHistorical() : null) + .isTransient(protoMetric + .hasIsTransient() ? protoMetric.getIsTransient() : null) + .timestamp(protoMetric.hasTimestamp() ? new Date(protoMetric.getTimestamp()) : null) + .alias(protoMetric.hasAlias() ? protoMetric.getAlias() : null) + .metaData(protoMetric.hasMetadata() + ? new MetaData.MetaDataBuilder().contentType(protoMetric.getMetadata().getContentType()) + .size(protoMetric.getMetadata().getSize()).seq(protoMetric.getMetadata().getSeq()) + .fileName(protoMetric.getMetadata().getFileName()) + .fileType(protoMetric.getMetadata().getFileType()) + .md5(protoMetric.getMetadata().getMd5()) + .description(protoMetric.getMetadata().getDescription()).createMetaData() + : null) + .properties(protoMetric.hasProperties() + ? new PropertySet.PropertySetBuilder().addProperties(convertProperties(protoMetric.getProperties())) + .createPropertySet() + : null) + .createMetric(); + } + + private Map convertProperties(SparkplugBProto.Payload.PropertySet decodedPropSet) + throws Exception { + Map map = new HashMap(); + List keys = decodedPropSet.getKeysList(); + List values = decodedPropSet.getValuesList(); + for (int i = 0; i < keys.size(); i++) { + SparkplugBProto.Payload.PropertyValue value = values.get(i); + map.put(keys.get(i), + new PropertyValue(PropertyDataType.fromInteger(value.getType()), getPropertyValue(value))); + } + return map; + } + + private Object getPropertyValue(SparkplugBProto.Payload.PropertyValue value) throws Exception { + PropertyDataType type = PropertyDataType.fromInteger(value.getType()); + if (value.getIsNull()) { + return null; + } + switch (type) { + case Boolean: + return value.getBooleanValue(); + case DateTime: + return new Date(value.getLongValue()); + case Float: + return value.getFloatValue(); + case Double: + return value.getDoubleValue(); + case Int8: + return (byte) value.getIntValue(); + case Int16: + case UInt8: + return (short) value.getIntValue(); + case Int32: + case UInt16: + return value.getIntValue(); + case UInt32: + case Int64: + return value.getLongValue(); + case UInt64: + return BigInteger.valueOf(value.getLongValue()); + case String: + case Text: + return value.getStringValue(); + case PropertySet: + return new PropertySet.PropertySetBuilder().addProperties(convertProperties(value.getPropertysetValue())) + .createPropertySet(); + case PropertySetList: + List propertySetList = new ArrayList(); + List list = value.getPropertysetsValue().getPropertysetList(); + for (SparkplugBProto.Payload.PropertySet decodedPropSet : list) { + propertySetList.add(new PropertySet.PropertySetBuilder().addProperties(convertProperties(decodedPropSet)) + .createPropertySet()); + } + return propertySetList; + case Unknown: + default: + throw new Exception("Failed to decode: Unknown Property Data Type " + type); + } + } + + private Object getMetricValue(SparkplugBProto.Payload.Metric protoMetric) throws Exception { + // Check if the null flag has been set indicating that the value is null + if (protoMetric.getIsNull()) { + return null; + } + // Otherwise convert the value based on the type + int metricType = protoMetric.getDatatype(); + switch (MetricDataType.fromInteger(metricType)) { + case Boolean: + return protoMetric.getBooleanValue(); + case DateTime: + return new Date(protoMetric.getLongValue()); + case File: + String filename = protoMetric.getMetadata().getFileName(); + byte[] fileBytes = protoMetric.getBytesValue().toByteArray(); + return new File(filename, fileBytes); + case Float: + return protoMetric.getFloatValue(); + case Double: + return protoMetric.getDoubleValue(); + case Int8: + return (byte) protoMetric.getIntValue(); + case Int16: + case UInt8: + return (short) protoMetric.getIntValue(); + case Int32: + case UInt16: + return protoMetric.getIntValue(); + case UInt32: + case Int64: + return protoMetric.getLongValue(); + case UInt64: + return BigInteger.valueOf(protoMetric.getLongValue()); + case String: + case Text: + case UUID: + return protoMetric.getStringValue(); + case Bytes: + return protoMetric.getBytesValue().toByteArray(); + case DataSet: + SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); + // Build the and create the DataSet + return new DataSet.DataSetBuilder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) + .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) + .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) + .createDataSet(); + case Template: + SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue(); + List metrics = new ArrayList(); + List parameters = new ArrayList(); + + for (SparkplugBProto.Payload.Template.Parameter protoParameter : protoTemplate.getParametersList()) { + String name = protoParameter.getName(); + ParameterDataType type = ParameterDataType.fromInteger(protoParameter.getType()); + Object value = getParameterValue(protoParameter); + if (log.isTraceEnabled()) { + log.trace("Setting template parameter name: " + name + ", type: " + type + ", value: " + + value + ", valueType" + value.getClass()); + } + + parameters.add(new Parameter(name, type, value)); + } + + for (SparkplugBProto.Payload.Metric protoTemplateMetric : protoTemplate.getMetricsList()) { + Metric templateMetric = convertMetric(protoTemplateMetric); + if (log.isTraceEnabled()) { + log.trace("Setting template parameter name: " + templateMetric.getName() + ", type: " + + templateMetric.getDataType() + ", value: " + templateMetric.getValue()); + } + metrics.add(templateMetric); + } + + Template template = new Template.TemplateBuilder().version(protoTemplate.getVersion()) + .templateRef(protoTemplate.getTemplateRef()).definition(protoTemplate.getIsDefinition()) + .addMetrics(metrics).addParameters(parameters).createTemplate(); + + if (log.isTraceEnabled()) { + log.trace( + "Setting template - name: " + protoMetric.getName() + ", version: " + template.getVersion() + + ", ref: " + template.getTemplateRef() + ", isDef: " + template.isDefinition() + + ", metrics: " + metrics.size() + ", params: " + parameters.size()); + } + + return template; + case Unknown: + default: + throw new Exception("Failed to decode: Unknown Metric DataType " + metricType); + + } + } + + private Collection convertDataSetRows(List protoRows, + List protoTypes) throws Exception { + Collection rows = new ArrayList(); + if (protoRows != null) { + for (SparkplugBProto.Payload.DataSet.Row protoRow : protoRows) { + List protoValues = protoRow.getElementsList(); + List> values = new ArrayList>(); + for (int index = 0; index < protoRow.getElementsCount(); index++) { + values.add(convertDataSetValue(protoTypes.get(index), protoValues.get(index))); + } + // Add the values to the row and the row to the rows + rows.add(new Row.RowBuilder().addValues(values).createRow()); + } + } + return rows; + } + + private Collection convertDataSetDataTypes(List protoTypes) { + List types = new ArrayList(); + // Build up a List of column types + for (int type : protoTypes) { + types.add(DataSetDataType.fromInteger(type)); + } + return types; + } + + private Object getParameterValue(SparkplugBProto.Payload.Template.Parameter protoParameter) throws Exception { + // Otherwise convert the value based on the type + int type = protoParameter.getType(); + switch (MetricDataType.fromInteger(type)) { + case Boolean: + return protoParameter.getBooleanValue(); + case DateTime: + return new Date(protoParameter.getLongValue()); + case Float: + return protoParameter.getFloatValue(); + case Double: + return protoParameter.getDoubleValue(); + case Int8: + return (byte) protoParameter.getIntValue(); + case Int16: + case UInt8: + return (short) protoParameter.getIntValue(); + case Int32: + case UInt16: + return protoParameter.getIntValue(); + case UInt32: + case Int64: + return protoParameter.getLongValue(); + case UInt64: + return BigInteger.valueOf(protoParameter.getLongValue()); + case String: + case Text: + return protoParameter.getStringValue(); + case Unknown: + default: + throw new Exception("Failed to decode: Unknown Parameter Type " + type); + } + } + + private SparkplugValue convertDataSetValue(int protoType, SparkplugBProto.Payload.DataSet.DataSetValue protoValue) + throws Exception { + + DataSetDataType type = DataSetDataType.fromInteger(protoType); + switch (type) { + case Boolean: + return new SparkplugValue(type, protoValue.getBooleanValue()); + case DateTime: + // FIXME - remove after is_null is supported for dataset values + if (protoValue.getLongValue() == -9223372036854775808L) { + return new SparkplugValue(type, null); + } else { + return new SparkplugValue(type, new Date(protoValue.getLongValue())); + } + case Float: + return new SparkplugValue(type, protoValue.getFloatValue()); + case Double: + return new SparkplugValue(type, protoValue.getDoubleValue()); + case Int8: + return new SparkplugValue(type, (byte) protoValue.getIntValue()); + case UInt8: + case Int16: + return new SparkplugValue(type, (short) protoValue.getIntValue()); + case UInt16: + case Int32: + return new SparkplugValue(type, protoValue.getIntValue()); + case UInt32: + case Int64: + return new SparkplugValue(type, protoValue.getLongValue()); + case UInt64: + return new SparkplugValue(type, BigInteger.valueOf(protoValue.getLongValue())); + case String: + case Text: + if (protoValue.getStringValue().equals("null")) { + return new SparkplugValue(type, null); + } else { + return new SparkplugValue(type, protoValue.getStringValue()); + } + case Unknown: + default: + log.error("Unknown DataType: " + protoType); + throw new Exception("Failed to decode"); + } + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java new file mode 100644 index 0000000000..03ab231f42 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBPayloadEncoder.java @@ -0,0 +1,545 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.google.protobuf.ByteString; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSet; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.DataSetDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.File; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.MetaData; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Metric; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Parameter; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.ParameterDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertySet; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.PropertyValue; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Row; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.Template; +import org.thingsboard.server.transport.mqtt.util.sparkplug.message.SparkplugValue; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Created by nickAS21 on 13.12.22 + */ +public class SparkplugBPayloadEncoder implements SparkplugPayloadEncoder { + + private static Logger logger = LogManager.getLogger(SparkplugBPayloadEncoder.class.getName()); + + public SparkplugBPayloadEncoder() { + super(); + } + + public byte[] getBytes(SparkplugBPayload payload) throws IOException { + + SparkplugBProto.Payload.Builder protoMsg = SparkplugBProto.Payload.newBuilder(); + + // Set the timestamp + if (payload.getTimestamp() != null) { + protoMsg.setTimestamp(payload.getTimestamp().getTime()); + } + + // Set the sequence number + protoMsg.setSeq(payload.getSeq()); + + // Set the UUID if defined + if (payload.getUuid() != null) { + protoMsg.setUuid(payload.getUuid()); + } + + // Set the metrics + for (Metric metric : payload.getMetrics()) { + try { + protoMsg.addMetrics(convertMetric(metric)); + } catch(Exception e) { + logger.error("Failed to add metric: " + metric.getName()); + throw new RuntimeException(e); + } + } + + // Set the body + if (payload.getBody() != null) { + protoMsg.setBody(ByteString.copyFrom(payload.getBody())); + } + + return protoMsg.build().toByteArray(); + } + + private SparkplugBProto.Payload.Metric.Builder convertMetric(Metric metric) throws Exception { + + // build a metric + SparkplugBProto.Payload.Metric.Builder builder = SparkplugBProto.Payload.Metric.newBuilder(); + + // set the basic parameters + builder.setDatatype(metric.getDataType().toIntValue()); + builder = setMetricValue(builder, metric); + + // Set the name, data type, and value + if (metric.hasName()) { + builder.setName(metric.getName()); + } + + // Set the alias + if (metric.hasAlias()) { + builder.setAlias(metric.getAlias()); + } + + // Set the timestamp + if (metric.getTimestamp() != null) { + builder.setTimestamp(metric.getTimestamp().getTime()); + } + + // Set isHistorical + if (metric.getIsHistorical() != null) { + builder.setIsHistorical(metric.isHistorical()); + } + + // Set isTransient + if (metric.getIsTransient() != null) { + builder.setIsTransient(metric.isTransient()); + } + + // Set isNull + if (metric.getIsNull() != null) { + builder.setIsNull(metric.isNull()); + } + + // Set the metadata + if (metric.getMetaData() != null) { + builder = setMetaData(builder, metric); + } + + // Set the property set + if (metric.getProperties() != null) { + builder.setProperties(convertPropertySet(metric.getProperties())); + } + + return builder; + } + + private SparkplugBProto.Payload.Template.Parameter.Builder convertParameter(Parameter parameter) throws Exception { + + // build a metric + SparkplugBProto.Payload.Template.Parameter.Builder builder = + SparkplugBProto.Payload.Template.Parameter.newBuilder(); + + if (logger.isTraceEnabled()) { + logger.trace("Adding parameter: " + parameter.getName()); + logger.trace(" type: " + parameter.getType()); + } + + // Set the name + builder.setName(parameter.getName()); + + // Set the type and value + builder = setParameterValue(builder, parameter); + + return builder; + } + + private SparkplugBProto.Payload.PropertySet.Builder convertPropertySet(PropertySet propertySet) throws Exception { + SparkplugBProto.Payload.PropertySet.Builder setBuilder = SparkplugBProto.Payload.PropertySet.newBuilder(); + + Map map = propertySet.getPropertyMap(); + for (String key : map.keySet()) { + SparkplugBProto.Payload.PropertyValue.Builder builder = SparkplugBProto.Payload.PropertyValue.newBuilder(); + PropertyValue value = map.get(key); + PropertyDataType type = value.getType(); + builder.setType(type.toIntValue()); + if (value.getValue() == null) { + builder.setIsNull(true); + } else { + switch (type) { + case Boolean: + builder.setBooleanValue((Boolean) value.getValue()); + break; + case DateTime: + builder.setLongValue(((Date) value.getValue()).getTime()); + break; + case Double: + builder.setDoubleValue((Double) value.getValue()); + break; + case Float: + builder.setFloatValue((Float) value.getValue()); + break; + case Int8: + builder.setIntValue((Byte) value.getValue()); + break; + case Int16: + case UInt8: + builder.setIntValue((Short) value.getValue()); + break; + case Int32: + case UInt16: + builder.setIntValue((Integer) value.getValue()); + break; + case Int64: + case UInt32: + builder.setLongValue((Long) value.getValue()); + break; + case UInt64: + builder.setLongValue(((BigInteger) value.getValue()).longValue()); + break; + case String: + case Text: + builder.setStringValue((String) value.getValue()); + break; + case PropertySet: + builder.setPropertysetValue(convertPropertySet((PropertySet) value.getValue())); + break; + case PropertySetList: + List setList = (List) value.getValue(); + SparkplugBProto.Payload.PropertySetList.Builder listBuilder = + SparkplugBProto.Payload.PropertySetList.newBuilder(); + for (Object obj : setList) { + listBuilder.addPropertyset(convertPropertySet((PropertySet) obj)); + } + builder.setPropertysetsValue(listBuilder); + break; + case Unknown: + default: + logger.error("Unknown DataType: " + value.getType()); + throw new Exception("Failed to convert value " + value.getType()); + } + } + setBuilder.addKeys(key); + setBuilder.addValues(builder); + } + return setBuilder; + } + + private SparkplugBProto.Payload.Template.Parameter.Builder setParameterValue( + SparkplugBProto.Payload.Template.Parameter.Builder builder, Parameter parameter) throws Exception { + ParameterDataType type = parameter.getType(); + builder.setType(type.toIntValue()); + Object value = parameter.getValue(); + switch (type) { + case Boolean: + builder.setBooleanValue(toBoolean(value)); + break; + case DateTime: + builder.setLongValue(((Date) value).getTime()); + break; + case Double: + builder.setDoubleValue((Double) value); + break; + case Float: + builder.setFloatValue((Float) value); + break; + case Int8: + builder.setIntValue((Byte) value); + break; + case Int16: + case UInt8: + builder.setIntValue((Short) value); + break; + case Int32: + case UInt16: + builder.setIntValue((Integer) value); + break; + case Int64: + case UInt32: + builder.setLongValue((Long) value); + break; + case UInt64: + builder.setLongValue(((BigInteger) value).longValue()); + break; + case Text: + case String: + if (value == null) { + builder.setStringValue(""); + } else { + builder.setStringValue((String) value); + } + break; + case Unknown: + default: + logger.error("Unknown Type: " + type); + throw new Exception("Failed to encode"); + + } + return builder; + } + + private SparkplugBProto.Payload.Metric.Builder setMetricValue(SparkplugBProto.Payload.Metric.Builder metricBuilder, + Metric metric) throws Exception { + + // Set the data type + metricBuilder.setDatatype(metric.getDataType().toIntValue()); + + if (metric.getValue() == null) { + metricBuilder.setIsNull(true); + } else { + switch (metric.getDataType()) { + case Boolean: + metricBuilder.setBooleanValue(toBoolean(metric.getValue())); + break; + case DateTime: + metricBuilder.setLongValue(((Date) metric.getValue()).getTime()); + break; + case File: + metricBuilder.setBytesValue(ByteString.copyFrom(((File) metric.getValue()).getBytes())); + SparkplugBProto.Payload.MetaData.Builder metaDataBuilder = + SparkplugBProto.Payload.MetaData.newBuilder(); + metaDataBuilder.setFileName(((File) metric.getValue()).getFileName()); + metricBuilder.setMetadata(metaDataBuilder); + break; + case Float: + metricBuilder.setFloatValue((Float) metric.getValue()); + break; + case Double: + metricBuilder.setDoubleValue((Double) metric.getValue()); + break; + case Int8: + metricBuilder.setIntValue(((Byte)metric.getValue()).intValue()); + break; + case Int16: + case UInt8: + metricBuilder.setIntValue(((Short)metric.getValue()).intValue()); + break; + case Int32: + case UInt16: + metricBuilder.setIntValue((int) metric.getValue()); + break; + case UInt32: + case Int64: + metricBuilder.setLongValue((Long) metric.getValue()); + break; + case UInt64: + metricBuilder.setLongValue(((BigInteger) metric.getValue()).longValue()); + break; + case String: + case Text: + case UUID: + metricBuilder.setStringValue((String) metric.getValue()); + break; + case Bytes: + metricBuilder.setBytesValue(ByteString.copyFrom((byte[]) metric.getValue())); + break; + case DataSet: + DataSet dataSet = (DataSet) metric.getValue(); + SparkplugBProto.Payload.DataSet.Builder dataSetBuilder = + SparkplugBProto.Payload.DataSet.newBuilder(); + + dataSetBuilder.setNumOfColumns(dataSet.getNumOfColumns()); + + // Column names + List columnNames = dataSet.getColumnNames(); + if (columnNames != null && !columnNames.isEmpty()) { + for (String name : columnNames) { + // Add the column name + dataSetBuilder.addColumns(name); + } + } + + // Column types + List columnTypes = dataSet.getTypes(); + if (columnTypes != null && !columnTypes.isEmpty()) { + for (DataSetDataType type : columnTypes) { + // Add the column type + dataSetBuilder.addTypes(type.toIntValue()); + } + } + + // Dataset rows + List rows = dataSet.getRows(); + if (rows != null && !rows.isEmpty()) { + for (Row row : rows) { + SparkplugBProto.Payload.DataSet.Row.Builder protoRowBuilder = + SparkplugBProto.Payload.DataSet.Row.newBuilder(); + List> values = row.getValues(); + if (values != null && !values.isEmpty()) { + for (SparkplugValue value : values) { + // Add the converted element + protoRowBuilder.addElements(convertDataSetValue(value)); + } + + dataSetBuilder.addRows(protoRowBuilder); + } + } + } + + // Finally add the dataset + metricBuilder.setDatasetValue(dataSetBuilder); + break; + case Template: + Template template = (Template) metric.getValue(); + SparkplugBProto.Payload.Template.Builder templateBuilder = + SparkplugBProto.Payload.Template.newBuilder(); + + // Set isDefinition + templateBuilder.setIsDefinition(template.isDefinition()); + + // Set Version + if (template.getVersion() != null) { + templateBuilder.setVersion(template.getVersion()); + } + + // Set Template Reference + if (template.getTemplateRef() != null) { + templateBuilder.setTemplateRef(template.getTemplateRef()); + } + + // Set the template metrics + if (template.getMetrics() != null) { + for (Metric templateMetric : template.getMetrics()) { + templateBuilder.addMetrics(convertMetric(templateMetric)); + } + } + + // Set the template parameters + if (template.getParameters() != null) { + for (Parameter parameter : template.getParameters()) { + templateBuilder.addParameters(convertParameter(parameter)); + } + } + + // Add the template to the metric + metricBuilder.setTemplateValue(templateBuilder); + break; + case Unknown: + default: + logger.error("Unknown DataType: " + metric.getDataType()); + throw new Exception("Failed to encode"); + + } + } + return metricBuilder; + } + + private SparkplugBProto.Payload.Metric.Builder setMetaData(SparkplugBProto.Payload.Metric.Builder metricBuilder, + Metric metric) throws Exception { + + // If the builder has been built already - use it + SparkplugBProto.Payload.MetaData.Builder metaDataBuilder = metricBuilder.getMetadataBuilder() != null + ? metricBuilder.getMetadataBuilder() + : SparkplugBProto.Payload.MetaData.newBuilder(); + + MetaData metaData = metric.getMetaData(); + if (metaData.getContentType() != null) { + metaDataBuilder.setContentType(metaData.getContentType()); + } + if (metaData.getSize() != null) { + metaDataBuilder.setSize(metaData.getSize()); + } + if (metaData.getSeq() != null) { + metaDataBuilder.setSeq(metaData.getSeq()); + } + if (metaData.getFileName() != null) { + metaDataBuilder.setFileName(metaData.getFileName()); + } + if (metaData.getFileType() != null) { + metaDataBuilder.setFileType(metaData.getFileType()); + } + if (metaData.getMd5() != null) { + metaDataBuilder.setMd5(metaData.getMd5()); + } + if (metaData.getDescription() != null) { + metaDataBuilder.setDescription(metaData.getDescription()); + } + metricBuilder.setMetadata(metaDataBuilder); + + return metricBuilder; + } + + private SparkplugBProto.Payload.DataSet.DataSetValue.Builder convertDataSetValue(SparkplugValue value) throws Exception { + SparkplugBProto.Payload.DataSet.DataSetValue.Builder protoValueBuilder = + SparkplugBProto.Payload.DataSet.DataSetValue.newBuilder(); + + // Set the value + DataSetDataType type = value.getType(); + switch (type) { + case Int8: + protoValueBuilder.setIntValue((Byte) value.getValue()); + break; + case Int16: + case UInt8: + protoValueBuilder.setIntValue((Short) value.getValue()); + break; + case Int32: + case UInt16: + protoValueBuilder.setIntValue((Integer) value.getValue()); + break; + case Int64: + case UInt32: + protoValueBuilder.setLongValue((Long) value.getValue()); + break; + case UInt64: + protoValueBuilder.setLongValue(((BigInteger) value.getValue()).longValue()); + break; + case Float: + protoValueBuilder.setFloatValue((Float) value.getValue()); + break; + case Double: + protoValueBuilder.setDoubleValue((Double) value.getValue()); + break; + case String: + case Text: + if (value.getValue() != null) { + protoValueBuilder.setStringValue((String) value.getValue()); + } else { + logger.warn("String value for dataset is null"); + protoValueBuilder.setStringValue("null"); + } + break; + case Boolean: + protoValueBuilder.setBooleanValue(toBoolean(value.getValue())); + break; + case DateTime: + try { + protoValueBuilder.setLongValue(((Date) value.getValue()).getTime()); + } catch (NullPointerException npe) { + // FIXME - remove after is_null is supported for dataset values + logger.debug("Date in dataset was null - leaving it -9223372036854775808L"); + protoValueBuilder.setLongValue(-9223372036854775808L); + } + break; + default: + logger.error("Unknown DataType: " + value.getType()); + throw new Exception("Failed to convert value " + value.getType()); + } + + return protoValueBuilder; + } + + private Boolean toBoolean(Object value) { + if (value == null) { + return null; + } + if (value instanceof Integer) { + return ((Integer)value).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE; + } else if (value instanceof Long) { + return ((Long)value).longValue() == 0 ? Boolean.FALSE : Boolean.TRUE; + } else if (value instanceof Float) { + return ((Float)value).floatValue() == 0 ? Boolean.FALSE : Boolean.TRUE; + } else if (value instanceof Double) { + return ((Double)value).doubleValue() == 0 ? Boolean.FALSE : Boolean.TRUE; + } else if (value instanceof Short) { + return ((Short)value).shortValue() == 0 ? Boolean.FALSE : Boolean.TRUE; + } else if (value instanceof Byte) { + return ((Byte)value).byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE; + } else if (value instanceof String) { + return Boolean.parseBoolean(value.toString()); + } + return (Boolean)value; + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java new file mode 100644 index 0000000000..cffdb133c6 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugBProto.java @@ -0,0 +1,17414 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +/** + * Created by nickAS21 on 13.12.22 + */ +public final class SparkplugBProto { + private SparkplugBProto() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface PayloadOrBuilder extends + // @@protoc_insertion_point(interface_extends:org.thingsboard.server.transport.mqtt.util.sparkplug.Payload) + com.google.protobuf.GeneratedMessage. + ExtendableMessageOrBuilder { + + /** + * optional uint64 timestamp = 1; + * + *
+         * Timestamp at message sending time
+         * 
+ */ + boolean hasTimestamp(); + /** + * optional uint64 timestamp = 1; + * + *
+         * Timestamp at message sending time
+         * 
+ */ + long getTimestamp(); + + /** + * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; + * + *
+         * Repeated forever - no limit in Google Protobufs
+         * 
+ */ + java.util.List + getMetricsList(); + /** + * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; + * + *
+         * Repeated forever - no limit in Google Protobufs
+         * 
+ */ + org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.Metric getMetrics(int index); + /** + * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; + * + *
+         * Repeated forever - no limit in Google Protobufs
+         * 
+ */ + int getMetricsCount(); + /** + * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; + * + *
+         * Repeated forever - no limit in Google Protobufs
+         * 
+ */ + java.util.List + getMetricsOrBuilderList(); + /** + * repeated .org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Metric metrics = 2; + * + *
+         * Repeated forever - no limit in Google Protobufs
+         * 
+ */ + org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.MetricOrBuilder getMetricsOrBuilder( + int index); + + /** + * optional uint64 seq = 3; + * + *
+         * Sequence number
+         * 
+ */ + boolean hasSeq(); + /** + * optional uint64 seq = 3; + * + *
+         * Sequence number
+         * 
+ */ + long getSeq(); + + /** + * optional string uuid = 4; + * + *
+         * UUID to track message type in terms of schema definitions
+         * 
+ */ + boolean hasUuid(); + /** + * optional string uuid = 4; + * + *
+         * UUID to track message type in terms of schema definitions
+         * 
+ */ + java.lang.String getUuid(); + /** + * optional string uuid = 4; + * + *
+         * UUID to track message type in terms of schema definitions
+         * 
+ */ + com.google.protobuf.ByteString + getUuidBytes(); + + /** + * optional bytes body = 5; + * + *
+         * To optionally bypass the whole definition above
+         * 
+ */ + boolean hasBody(); + /** + * optional bytes body = 5; + * + *
+         * To optionally bypass the whole definition above
+         * 
+ */ + com.google.protobuf.ByteString getBody(); + } + /** + * Protobuf type {@code org.thingsboard.server.transport.mqtt.util.sparkplug.Payload} + * + *
+     * // Indexes of Data Types
+     * // Unknown placeholder for future expansion.
+     *Unknown         = 0;
+     * // Basic Types
+     *Int8            = 1;
+     *Int16           = 2;
+     *Int32           = 3;
+     *Int64           = 4;
+     *UInt8           = 5;
+     *UInt16          = 6;
+     *UInt32          = 7;
+     *UInt64          = 8;
+     *Float           = 9;
+     *Double          = 10;
+     *Boolean         = 11;
+     *String          = 12;
+     *DateTime        = 13;
+     *Text            = 14;
+     * // Additional Metric Types
+     *UUID            = 15;
+     *DataSet         = 16;
+     *Bytes           = 17;
+     *File            = 18;
+     *Template        = 19;
+     * // Additional PropertyValue Types
+     *PropertySet     = 20;
+     *PropertySetList = 21;
+     * 
+ */ + public static final class Payload extends + com.google.protobuf.GeneratedMessage.ExtendableMessage< + Payload> implements + // @@protoc_insertion_point(message_implements:org.thingsboard.server.transport.mqtt.util.sparkplug.Payload) + PayloadOrBuilder { + // Use Payload.newBuilder() to construct. + private Payload(com.google.protobuf.GeneratedMessage.ExtendableBuilder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private Payload(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final Payload defaultInstance; + public static Payload getDefaultInstance() { + return defaultInstance; + } + + public Payload getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Payload( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + timestamp_ = input.readUInt64(); + break; + } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + metrics_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000002; + } + metrics_.add(input.readMessage(org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.Metric.PARSER, extensionRegistry)); + break; + } + case 24: { + bitField0_ |= 0x00000002; + seq_ = input.readUInt64(); + break; + } + case 34: { + com.google.protobuf.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000004; + uuid_ = bs; + break; + } + case 42: { + bitField0_ |= 0x00000008; + body_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + metrics_ = java.util.Collections.unmodifiableList(metrics_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.internal_static_com_cirruslink_sparkplug_protobuf_Payload_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.internal_static_com_cirruslink_sparkplug_protobuf_Payload_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.class, org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugBProto.Payload.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public Payload parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new Payload(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + public interface TemplateOrBuilder extends + // @@protoc_insertion_point(interface_extends:org.thingsboard.server.transport.mqtt.util.sparkplug.Payload.Template) + com.google.protobuf.GeneratedMessage. + ExtendableMessageOrBuilder