diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index ad2d66d66a..09be48c641 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.exception.EntitiesLimitExceededException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -114,7 +115,7 @@ import java.util.stream.Collectors; import static org.thingsboard.server.service.transport.BasicCredentialsValidationResult.PASSWORD_MISMATCH; import static org.thingsboard.server.service.transport.BasicCredentialsValidationResult.VALID; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_SEPARATOR; /** * Created by ashvayka on 05.10.18. @@ -347,7 +348,6 @@ public class DefaultTransportApiService implements TransportApiService { return buildLimitErrorResponse(e, gatewayId); } finally { lock.unlock(); - deviceCreationLocks.remove(deviceName, lock); } } @@ -367,45 +367,87 @@ public class DefaultTransportApiService implements TransportApiService { if (device != null) { return device; } - device = tryRenameSparkplugDevice(requestMsg, gateway); + String[] topicPath = requestMsg.getDeviceName().split(DEVICE_NAME_SPLIT_SEPARATOR); + device = tryRenameSparkplugDevice(requestMsg, gateway, topicPath); if (device != null) { return device; } - device = createNewDevice(requestMsg, gateway, gatewayId); + device = createNewDevice(requestMsg, gateway, gatewayId, topicPath); pushCreatedEvent(device, gateway); return device; } - private Device tryRenameSparkplugDevice(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, - Device gateway) { + private Device tryRenameSparkplugDevice(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, Device gateway, String[] topicPath) { if (!requestMsg.getIsSparkplug()) { return null; } - String[] topicPath = requestMsg.getDeviceName().split(DEVICE_NAME_SPLIT_REGEXP); + if (topicPath.length != 3) { return null; } + String deviceId = topicPath[2]; - Device existingDevice = - deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceId); + Device existingDevice = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceId); + if (existingDevice == null) { return null; } - existingDevice.setName(requestMsg.getDeviceName()); - existingDevice.setLabel(deviceId); - return deviceService.saveDevice(existingDevice); + + // Security check: verify that the device was created by this gateway + try { + boolean isRelated = relationService.checkRelation( + gateway.getTenantId(), + gateway.getId(), + existingDevice.getId(), + "Created", + RelationTypeGroup.COMMON + ); + + if (!isRelated) { + log.warn("[{}] Security breach attempt! Gateway tried to rename device [{}] without 'Created' relation.", + gateway.getId(), existingDevice.getId()); + return null; + } + } catch (Exception e) { + log.error("[{}] Error checking relation for device {}", gateway.getId(), existingDevice.getId(), e); + return null; + } + + boolean changed = false; + String newName = requestMsg.getDeviceName(); + + if (!newName.equals(existingDevice.getName())) { + // Check if the new name is already taken by another device + Device conflictDevice = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), newName); + + if (conflictDevice != null) { + log.warn("[{}] Cannot rename device [{}] to [{}]: name already exists!", + gateway.getId(), existingDevice.getId(), newName); + return existingDevice; + } + + existingDevice.setName(newName); + + // Update label only if it's empty to avoid overwriting user changes + if (existingDevice.getLabel() == null || existingDevice.getLabel().isEmpty()) { + existingDevice.setLabel(deviceId); + } + + changed = true; + } + + return changed ? deviceService.saveDevice(existingDevice) : existingDevice; } private Device createNewDevice(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, Device gateway, - DeviceId gatewayId) { + DeviceId gatewayId, String[] topicPath) { TenantId tenantId = gateway.getTenantId(); Device device = new Device(); device.setTenantId(tenantId); device.setName(requestMsg.getDeviceName()); - if (requestMsg.getIsSparkplug()){ - String [] topicDevice = requestMsg.getDeviceName().split(DEVICE_NAME_SPLIT_REGEXP); - if (topicDevice.length == 3) device.setLabel(topicDevice[2]); + if (requestMsg.getIsSparkplug()) { + if (topicPath.length == 3) device.setLabel(topicPath[2]); } device.setType(requestMsg.getDeviceType()); device.setCustomerId(gateway.getCustomerId()); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index b52f38f55e..1ba5d1ba3a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -71,9 +71,9 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConn import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_SEPARATOR; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR; /** * Created by nickAS21 on 12.01.23 @@ -115,7 +115,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte if (isCreateDevices) { String deviceName = deviceId + "_1"; createDevice(deviceName, deviceProfile.getName(), false); - deviceName = groupId + DEVICE_NAME_SPLIT_REGEXP + edgeNode + DEVICE_NAME_SPLIT_REGEXP + deviceId + "_2"; + deviceName = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceId + "_2"; createDevice(deviceName, deviceProfile.getName(), false); } } @@ -158,7 +158,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte options.setSessionExpiryInterval(0L); options.setUserName(gatewayAccessToken); String nameSpace = nameSpaceBad.length == 0 ? TOPIC_ROOT_SPB_V_1_0 : nameSpaceBad[0]; - String topic = nameSpace + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NDEATH.name() + TOPIC_SPLIT_REGEXP + edgeNode; + String topic = nameSpace + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NDEATH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode; // The NDEATH message MUST set the MQTT Will QoS to 1 and Retained flag to false MqttMessage msg = new MqttMessage(); msg.setId(0); @@ -181,7 +181,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte payloadBirthNode.addMetrics(metric); payloadBirthNode.setTimestamp(ts); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } @@ -192,14 +192,14 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte .setTimestamp(ts) .setSeq(getSeqNum()); String deviceIdName = deviceId + "_" + i; - String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName; + String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName; payloadBirthDevice.addMetrics(metric); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdName, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdName, payloadBirthDevice.build().toByteArray(), 0, false); AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceIdName + "] after created") - .atMost(200, TimeUnit.SECONDS) + .atMost(40, TimeUnit.SECONDS) .ignoreExceptions() .until(() -> { device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); @@ -227,7 +227,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte payloadBirthNode.addMetrics(metric); payloadBirthNode.setTimestamp(ts); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } @@ -241,14 +241,14 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte .setTimestamp(ts) .setSeq(getSeqNum()); payloadBirthDevice1.addMetrics(metric); - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdNameLabel1, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdNameLabel1, payloadBirthDevice1.build().toByteArray(), 0, false); } - String deviceName1 = groupId + DEVICE_NAME_SPLIT_REGEXP + edgeNode + DEVICE_NAME_SPLIT_REGEXP + deviceIdNameLabel1;; + String deviceName1 = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceIdNameLabel1; AtomicReference device1 = new AtomicReference<>(); await(alias + "find device [" + deviceName1 + "] before connecting") - .atMost(200, TimeUnit.SECONDS) + .atMost(40, TimeUnit.SECONDS) .until(() -> { device1.set(doGet("/api/tenant/devices?deviceName=" + deviceName1, Device.class)); return device1.get() != null; @@ -262,13 +262,13 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte .setTimestamp(ts) .setSeq(getSeqNum()); payloadBirthDevice2.addMetrics(metric); - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdName2, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdName2, payloadBirthDevice2.build().toByteArray(), 0, false); } - String deviceName2 = groupId + DEVICE_NAME_SPLIT_REGEXP + edgeNode + DEVICE_NAME_SPLIT_REGEXP + deviceIdName2; + String deviceName2 = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceIdName2; AtomicReference device2 = new AtomicReference<>(); await(alias + "find device [" + deviceName2 + "] before connecting") - .atMost(200, TimeUnit.SECONDS) + .atMost(40, TimeUnit.SECONDS) .until(() -> { device2.set(doGet("/api/tenant/devices?deviceName=" + deviceName2, Device.class)); return device2.get() != null; @@ -323,7 +323,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte payloadBirthNode.addMetrics(metric); payloadBirthNode.setTimestamp(ts); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } @@ -333,15 +333,15 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte .setTimestamp(ts) .setSeq(getSeqNum()); String deviceIdName = deviceId + "_1"; - String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName; + String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName; payloadBirthDevice.addMetrics(metric); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdName, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdName, payloadBirthDevice.build().toByteArray(), 0, false); AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceName + "] after created") - .atMost(200, TimeUnit.SECONDS) + .atMost(40, TimeUnit.SECONDS) .ignoreExceptions() .until(() -> { device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); @@ -387,7 +387,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte listKeys.add(metricKey); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } return listKeys; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index d0f3430fe7..1402e5c4c3 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -40,7 +40,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConn import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR; /** * Created by nickAS21 on 12.01.23 @@ -111,7 +111,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra if (client.isConnected()) { List devicesList = new ArrayList<>(devices); Device device = devicesList.get(indexDeviceDisconnect); - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DDEATH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + device.getName(), + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DDEATH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + device.getName(), payloadDeathDevice.build().toByteArray(), 0, false); await(alias + messageName(STATE) + ", device: " + device.getName()) .atMost(40, TimeUnit.SECONDS) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java index a28b04d70e..720e829371 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java @@ -47,4 +47,5 @@ public class MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest extends A @Test public void testClientWithCorrectAccessTokenWithNDEATHTwoDevicesCreatingBeforeFirstNameDeviceIdSecondNameFull() throws Exception { connectClientWithCorrectAccessTokenWithNDEATHDevicesCreatingBefore_Test(2); - }} + } +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index dc513421e6..8f2d0659af 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR; /** * Created by nickAS21 on 12.01.23 @@ -68,7 +68,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + messageTypeName + TOPIC_SPLIT_REGEXP + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + messageTypeName + TOPIC_SPLIT_SEPARATOR + edgeNode, ndataPayload.build().toByteArray(), 0, false); } @@ -97,7 +97,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac createdAddMetricValueArraysPrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); if (client.isConnected()) { - client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + messageTypeName + TOPIC_SPLIT_REGEXP + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + messageTypeName + TOPIC_SPLIT_SEPARATOR + edgeNode, ndataPayload.build().toByteArray(), 0, false); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 799a0fdf21..94c24f5a1d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -298,11 +298,22 @@ public abstract class AbstractGatewaySessionHandler onDeviceConnectSparkplug(SparkplugTopic topic, String deviceType) { - T result = devices.get(topic.getNodeDeviceName()); + String fullPath = topic.getNodeDeviceNameAllPath(); + // Primary lookup: try to find the device by its full-path name (standard for new devices) + T result = devices.get(fullPath); + if (result == null) { - return onDeviceConnect(topic.getNodeDeviceNameAllPath(), deviceType, true); - } else { + // Secondary lookup (Legacy Fallback): check for the short name if full path is not found. + // This supports devices migrated from older versions. + String shortName = topic.getNodeDeviceName(); + result = devices.get(shortName); + } + + if (result != null) { return Futures.immediateFuture(result); + } else { + // If not found in cache at all, proceed with connection/creation using full path + return onDeviceConnect(fullPath, deviceType, true); } } @@ -855,8 +866,16 @@ public abstract class AbstractGatewaySessionHandler deviceCtx = this.onDeviceConnectProto(topic); - deviceName = checkDeviceName(deviceCtx.get().getDeviceInfo().getDeviceName()); String finalDeviceName = deviceName; contextListenableFuture = Futures.transform(deviceCtx, ctx -> { if (topic.isType(DBIRTH)) { sendSparkplugStateOnTelemetry(ctx.getSessionInfo(), finalDeviceName, ONLINE, sparkplugBProto.getTimestamp()); + try { ctx.setDeviceBirthMetrics(sparkplugBProto.getMetricsList()); + } catch (IllegalArgumentException | DuplicateKeyException e) { + log.error("[{}] Failed to set birth metrics", finalDeviceName, e); + throw new RuntimeException(e); + } } return ctx; }, MoreExecutors.directExecutor()); - } catch (IllegalArgumentException | DuplicateKeyException | ExecutionException | InterruptedException e) { + } catch (IllegalArgumentException | DuplicateKeyException e) { throw new RuntimeException(e); } } @@ -200,7 +204,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler= 4 && splitTopic.length <= 5 && splitTopic[0].equals(this.sparkplugTopicNode.getNamespace()) && splitTopic[1].equals(this.sparkplugTopicNode.getGroupId()) && diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java index b27b36e837..9b0cb62ca6 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java @@ -21,9 +21,9 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.parseMessageType; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_SEPARATOR; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR; /** * Created by nickAS21 on 12.12.22 @@ -197,7 +197,7 @@ public class SparkplugTopic { try { if (isValidIdElementToUTF8(topicString)) { SparkplugMessageType messageType; - String[] splitTopic = topicString.split(TOPIC_SPLIT_REGEXP); + String[] splitTopic = topicString.split(TOPIC_SPLIT_SEPARATOR); if (TOPIC_ROOT_SPB_V_1_0.equals(splitTopic[0])) { if (splitTopic.length == 3) { messageType = parseMessageType(splitTopic[1]); @@ -333,9 +333,9 @@ public class SparkplugTopic { public String getNodeDeviceNameAllPath() { StringBuilder sb = new StringBuilder(); if (hostApplicationId == null) { - sb.append(getGroupId()).append(DEVICE_NAME_SPLIT_REGEXP).append(getEdgeNodeId()); + sb.append(getGroupId()).append(DEVICE_NAME_SPLIT_SEPARATOR).append(getEdgeNodeId()); if (getDeviceId() != null) { - sb.append(DEVICE_NAME_SPLIT_REGEXP).append(getDeviceId()); + sb.append(DEVICE_NAME_SPLIT_SEPARATOR).append(getDeviceId()); } } return sb.toString(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java index 6558ba618f..377b759957 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java @@ -34,9 +34,9 @@ public class SparkplugTopicService { private static final Map SPLIT_TOPIC_CACHE = new HashMap<>(); public static final String TOPIC_ROOT_SPB_V_1_0 = "spBv1.0"; public static final String TOPIC_ROOT_CERT_SP = "$sparkplug/certificates/"; - public static final String TOPIC_SPLIT_REGEXP = "/"; - public static final String DEVICE_NAME_SPLIT_REGEXP = ":"; - public static final String TOPIC_STATE_REGEXP = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + STATE.name() + TOPIC_SPLIT_REGEXP; + public static final String TOPIC_SPLIT_SEPARATOR = "/"; + public static final String DEVICE_NAME_SPLIT_SEPARATOR = ":"; + public static final String TOPIC_STATE_SEPARATOR = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + STATE.name() + TOPIC_SPLIT_SEPARATOR; public static SparkplugTopic getSplitTopic(String topic) throws ThingsboardException { SparkplugTopic sparkplugTopic = SPLIT_TOPIC_CACHE.get(topic);