From 6570882b84e21cd896cd079df2f1d131b1fb1116 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Mon, 11 May 2026 17:49:40 +0300 Subject: [PATCH] spark[lug - refactoring review - 01 (With test) --- .../transport/DefaultTransportApiService.java | 20 +- .../AbstractMqttV5ClientSparkplugTest.java | 186 +++++++++++++++++- ...gBConnectionDevicesCreatingBeforeTest.java | 20 ++ 3 files changed, 215 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 09be48c641..ed43c71027 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -394,25 +394,31 @@ public class DefaultTransportApiService implements TransportApiService { } // Security check: verify that the device was created by this gateway + boolean isRelated = false; try { - boolean isRelated = relationService.checkRelation( + // Security check: verify that the device was originally created by this gateway + isRelated = relationService.checkRelation( gateway.getTenantId(), gateway.getId(), existingDevice.getId(), "Created", RelationTypeGroup.COMMON ); - - if (!isRelated) { - log.warn("[{}] Security breach attempt! Gateway tried to rename device [{}] without 'Created' relation.", - gateway.getId(), existingDevice.getId()); - return null; - } } catch (Exception e) { + // Log the error from the relation service but return null to allow potential recovery log.error("[{}] Error checking relation for device {}", gateway.getId(), existingDevice.getId(), e); return null; } + // If the device is found but not related to this gateway, it's a security breach + if (!isRelated) { + log.error("[{}] Security breach attempt! Gateway tried to rename device [{}] without 'Created' relation.", + gateway.getId(), existingDevice.getId()); + // Throwing exception to halt the entire connection process + throw new RuntimeException("Security breach attempt! Unauthorized device rename."); + } + + // Logic for renaming the device if it's related and no naming conflicts exist boolean changed = false; String newName = requestMsg.getDeviceName(); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index 1ba5d1ba3a..2adc7b08a5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -31,7 +31,9 @@ import org.junit.Assert; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.asset.AssetInfo; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; @@ -39,6 +41,7 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; @@ -57,6 +60,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.common.util.JacksonUtil.newArrayNode; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16; @@ -113,10 +117,22 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte .build(); processBeforeTest(configProperties); if (isCreateDevices) { - String deviceName = deviceId + "_1"; - createDevice(deviceName, deviceProfile.getName(), false); - deviceName = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceId + "_2"; - createDevice(deviceName, deviceProfile.getName(), false); + // 1. Create the first device with a short name (legacy style) + String deviceName1 = deviceId + "_1"; + Device device1 = createDevice(deviceName1, deviceProfile.getName(), false); + + // 2. Establish 'Created' relation so the transport identifies this gateway as the owner + String relationType = "Created"; + EntityRelation relation1 = createFromRelation(savedGateway, device1, relationType); + doPost("/api/relation", relation1).andExpect(status().isOk()); + + // 3. Create the second device with a full-path name + String deviceName2 = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceId + "_2"; + Device device2 = createDevice(deviceName2, deviceProfile.getName(), false); + + // 4. Establish 'Created' relation for the second device as well + EntityRelation relation2 = createFromRelation(savedGateway, device2, relationType); + doPost("/api/relation", relation2).andExpect(status().isOk()); } } @@ -282,6 +298,111 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte Assert.assertNull(device2.get().getLabel()); } + /** + * Coverage: Rename when a device with the target full-path name already exists (collision). + */ + protected void renameCollisionWhenTargetNameAlreadyExists_Test() throws Exception { + long ts = calendar.getTimeInMillis(); + String shortName = deviceId + "_1"; // Created in beforeTest + String fullPathName = groupId + ":" + edgeNode + ":" + shortName; + + // Manually create a device that already has the "new" full-path name to trigger a collision + createDevice(fullPathName, deviceProfile.getName(), false); + + clientWithCorrectNodeAccessTokenWithNDEATH(); + + SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getSeqNum()); + payload.addMetrics(createMetric(123, ts, metricBirthName_Int32, metricBirthDataType_Int32, -1L)); + + // Gateway sends DBIRTH for the short name. + // Transport will try to rename it but should find a conflict and handle it gracefully. + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + shortName, + payload.build().toByteArray(), 0, false); + + await("Checking stability after collision") + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + Device oldDevice = doGet("/api/tenant/devices?deviceName=" + shortName, Device.class); + Device conflictDevice = doGet("/api/tenant/devices?deviceName=" + fullPathName, Device.class); + // Both devices must still exist, proving no exception crashed the process + return oldDevice != null && conflictDevice != null; + }); + } + + /** + * Coverage: The privilege concern — attempt to rename a device not owned by the gateway. + * This test verifies that the original device's ID remains unchanged, meaning it was not hijacked. + */ + protected void unauthorizedRenameAttemptBad_Test() throws Exception { + long ts = calendar.getTimeInMillis(); + String strangerName = "unauthorized_device_rename"; + + // 1. Create a "stranger" device via API (it has no 'Created' relation to the gateway) + Device stranger = new Device(); + stranger.setName(strangerName); + stranger.setType("default"); + doPost("/api/device", stranger); + final DeviceId originalStrangerId = stranger.getId(); + + clientWithCorrectNodeAccessTokenWithNDEATH(); + + SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts).setSeq(getSeqNum()); + + // 2. Unauthorized gateway attempts to rename this device via Sparkplug topic path + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + strangerName, + payload.build().toByteArray(), 0, false); + + String expectedFullPath = groupId + ":" + edgeNode + ":" + strangerName; + + // 3. Verify security: the original device must still be linked to its short name with the same ID + await("Verify original device was not hijacked") + .atMost(40, TimeUnit.SECONDS) + .pollDelay(2, TimeUnit.SECONDS) + .untilAsserted(() -> { + // Check if the original device still exists with its original ID + Device currentStranger = doGet("/api/tenant/devices?deviceName=" + strangerName, Device.class); + Assert.assertNotNull("Original device disappeared!", currentStranger); + Assert.assertEquals("Security breach: Original device ID changed!", originalStrangerId, currentStranger.getId()); + + // Even if the gateway created a NEW device with a full path, it must have a different ID + Device newDevice = doGet("/api/tenant/devices?deviceName=" + expectedFullPath, Device.class); + if (newDevice != null) { + Assert.assertNotEquals("Stranger device was successfully hijacked (IDs match)!", originalStrangerId, newDevice.getId()); + } + }); + } + + /** + * Coverage: The privilege concern — attempt to rename a device not owned by the gateway. + */ + protected void unauthorizedRenameAttempt_Test() throws Exception { + long ts = calendar.getTimeInMillis(); + String strangerName = "unauthorized_device_rename"; + + // Create a device without a "Created" relation to the gateway + Device stranger = new Device(); + stranger.setName(strangerName); + stranger.setType("default"); + doPost("/api/device", stranger); + + clientWithCorrectNodeAccessTokenWithNDEATH(); + + SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts).setSeq(getSeqNum()); + + // Unauthorized gateway attempts to rename the device via Sparkplug topic + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + strangerName, + payload.build().toByteArray(), 0, false); + + String expectedFullPath = groupId + ":" + edgeNode + ":" + strangerName; + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + doGet("/api/tenant/devices?deviceName=" + expectedFullPath, Device.class, status().isNotFound()) + ); + } + protected void state_ONLINE_ALL (List devices, long ts) { TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name())); await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) @@ -309,6 +430,58 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte } } + /** + * Coverage: Concurrent first-message registration with the lock mechanism. + */ + protected void concurrentFirstMessageRegistration_Test() throws Exception { + int threadCount = 5; + String concurrentDeviceName = "concurrent_device"; + clientWithCorrectNodeAccessTokenWithNDEATH(); + + java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(threadCount); + long ts = calendar.getTimeInMillis(); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts).setSeq(0); + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + concurrentDeviceName, + payload.build().toByteArray(), 0, false); + } catch (Exception e) { + log.error("Concurrent publish failed", e); + } + }); + } + + String expectedName = groupId + ":" + edgeNode + ":" + concurrentDeviceName; + await("Wait for concurrent registration result") + .atMost(40, TimeUnit.SECONDS) // Restored to 40s as requested + .until(() -> doGet("/api/tenant/devices?deviceName=" + expectedName, Device.class) != null); + + executor.shutdown(); + } + + /** + * Coverage: Sparkplug-message handling when msgId <= 0 (#7). + * Verifies that the transport does not close the session for Sparkplug clients using msgId 0. + */ + protected void sparkplugSessionStaysAliveWithZeroMsgId_Test() throws Exception { + // clientMqttV5ConnectWithNDEATH internally sets msgId = 0 for the Will message. + // This validates that the connection is accepted despite msgId being 0. + IMqttToken connectionResult = clientMqttV5ConnectWithNDEATH(calendar.getTimeInMillis(), 0, -1L); + Assert.assertTrue("Sparkplug connection should be successful with msgId=0", client.isConnected()); + + // Publish NBIRTH message which usually goes through the aggregate callback. + // This verifies that msgId=0 in the callback does not trigger closeDeviceSession. + connectionWithNBirth(Int32, "test_metric_msgId_0", 555); + + // Awaitility to ensure the session remains open after processing. + await("Verify Sparkplug session remains open after receiving msgId=0") + .atMost(40, TimeUnit.SECONDS) + .until(() -> client.isConnected()); + } + protected List connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices(long ts) throws Exception { List devices = new ArrayList<>(); Long alias = 0L; @@ -630,4 +803,9 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte } } + private EntityRelation createFromRelation(Device mainDevice, Device device, String relationType) { + return new EntityRelation(mainDevice.getId(), device.getId(), relationType); + } + + } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java index 720e829371..92583e655c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java @@ -48,4 +48,24 @@ public class MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest extends A public void testClientWithCorrectAccessTokenWithNDEATHTwoDevicesCreatingBeforeFirstNameDeviceIdSecondNameFull() throws Exception { connectClientWithCorrectAccessTokenWithNDEATHDevicesCreatingBefore_Test(2); } + + @Test + public void testRenameWhenDeviceFullPathAlreadyExists_Collision() throws Exception { + renameCollisionWhenTargetNameAlreadyExists_Test(); + } + + @Test + public void testUnauthorizedRenameAttempt() throws Exception { + unauthorizedRenameAttempt_Test(); + } + + @Test + public void testConcurrentFirstMessageRegistration() throws Exception { + concurrentFirstMessageRegistration_Test(); + } + + @Test + public void testSparkplugSessionStaysAliveWithZeroMsgId() throws Exception { + sparkplugSessionStaysAliveWithZeroMsgId_Test(); + } }