Browse Source

spark[lug - refactoring review - 01 (With test)

pull/14987/head
nickAS21 3 weeks ago
parent
commit
6570882b84
  1. 20
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  2. 186
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java
  3. 20
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java

20
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -394,25 +394,31 @@ public class DefaultTransportApiService implements TransportApiService {
}
// Security check: verify that the device was created by this gateway
boolean isRelated = false;
try {
boolean isRelated = relationService.checkRelation(
// Security check: verify that the device was originally created by this gateway
isRelated = relationService.checkRelation(
gateway.getTenantId(),
gateway.getId(),
existingDevice.getId(),
"Created",
RelationTypeGroup.COMMON
);
if (!isRelated) {
log.warn("[{}] Security breach attempt! Gateway tried to rename device [{}] without 'Created' relation.",
gateway.getId(), existingDevice.getId());
return null;
}
} catch (Exception e) {
// Log the error from the relation service but return null to allow potential recovery
log.error("[{}] Error checking relation for device {}", gateway.getId(), existingDevice.getId(), e);
return null;
}
// If the device is found but not related to this gateway, it's a security breach
if (!isRelated) {
log.error("[{}] Security breach attempt! Gateway tried to rename device [{}] without 'Created' relation.",
gateway.getId(), existingDevice.getId());
// Throwing exception to halt the entire connection process
throw new RuntimeException("Security breach attempt! Unauthorized device rename.");
}
// Logic for renaming the device if it's related and no naming conflicts exist
boolean changed = false;
String newName = requestMsg.getDeviceName();

186
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java

@ -31,7 +31,9 @@ import org.junit.Assert;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.asset.AssetInfo;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@ -39,6 +41,7 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@ -57,6 +60,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16;
@ -113,10 +117,22 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
.build();
processBeforeTest(configProperties);
if (isCreateDevices) {
String deviceName = deviceId + "_1";
createDevice(deviceName, deviceProfile.getName(), false);
deviceName = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceId + "_2";
createDevice(deviceName, deviceProfile.getName(), false);
// 1. Create the first device with a short name (legacy style)
String deviceName1 = deviceId + "_1";
Device device1 = createDevice(deviceName1, deviceProfile.getName(), false);
// 2. Establish 'Created' relation so the transport identifies this gateway as the owner
String relationType = "Created";
EntityRelation relation1 = createFromRelation(savedGateway, device1, relationType);
doPost("/api/relation", relation1).andExpect(status().isOk());
// 3. Create the second device with a full-path name
String deviceName2 = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceId + "_2";
Device device2 = createDevice(deviceName2, deviceProfile.getName(), false);
// 4. Establish 'Created' relation for the second device as well
EntityRelation relation2 = createFromRelation(savedGateway, device2, relationType);
doPost("/api/relation", relation2).andExpect(status().isOk());
}
}
@ -282,6 +298,111 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
Assert.assertNull(device2.get().getLabel());
}
/**
* Coverage: Rename when a device with the target full-path name already exists (collision).
*/
protected void renameCollisionWhenTargetNameAlreadyExists_Test() throws Exception {
long ts = calendar.getTimeInMillis();
String shortName = deviceId + "_1"; // Created in beforeTest
String fullPathName = groupId + ":" + edgeNode + ":" + shortName;
// Manually create a device that already has the "new" full-path name to trigger a collision
createDevice(fullPathName, deviceProfile.getName(), false);
clientWithCorrectNodeAccessTokenWithNDEATH();
SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getSeqNum());
payload.addMetrics(createMetric(123, ts, metricBirthName_Int32, metricBirthDataType_Int32, -1L));
// Gateway sends DBIRTH for the short name.
// Transport will try to rename it but should find a conflict and handle it gracefully.
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + shortName,
payload.build().toByteArray(), 0, false);
await("Checking stability after collision")
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
Device oldDevice = doGet("/api/tenant/devices?deviceName=" + shortName, Device.class);
Device conflictDevice = doGet("/api/tenant/devices?deviceName=" + fullPathName, Device.class);
// Both devices must still exist, proving no exception crashed the process
return oldDevice != null && conflictDevice != null;
});
}
/**
* Coverage: The privilege concern attempt to rename a device not owned by the gateway.
* This test verifies that the original device's ID remains unchanged, meaning it was not hijacked.
*/
protected void unauthorizedRenameAttemptBad_Test() throws Exception {
long ts = calendar.getTimeInMillis();
String strangerName = "unauthorized_device_rename";
// 1. Create a "stranger" device via API (it has no 'Created' relation to the gateway)
Device stranger = new Device();
stranger.setName(strangerName);
stranger.setType("default");
doPost("/api/device", stranger);
final DeviceId originalStrangerId = stranger.getId();
clientWithCorrectNodeAccessTokenWithNDEATH();
SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts).setSeq(getSeqNum());
// 2. Unauthorized gateway attempts to rename this device via Sparkplug topic path
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + strangerName,
payload.build().toByteArray(), 0, false);
String expectedFullPath = groupId + ":" + edgeNode + ":" + strangerName;
// 3. Verify security: the original device must still be linked to its short name with the same ID
await("Verify original device was not hijacked")
.atMost(40, TimeUnit.SECONDS)
.pollDelay(2, TimeUnit.SECONDS)
.untilAsserted(() -> {
// Check if the original device still exists with its original ID
Device currentStranger = doGet("/api/tenant/devices?deviceName=" + strangerName, Device.class);
Assert.assertNotNull("Original device disappeared!", currentStranger);
Assert.assertEquals("Security breach: Original device ID changed!", originalStrangerId, currentStranger.getId());
// Even if the gateway created a NEW device with a full path, it must have a different ID
Device newDevice = doGet("/api/tenant/devices?deviceName=" + expectedFullPath, Device.class);
if (newDevice != null) {
Assert.assertNotEquals("Stranger device was successfully hijacked (IDs match)!", originalStrangerId, newDevice.getId());
}
});
}
/**
* Coverage: The privilege concern attempt to rename a device not owned by the gateway.
*/
protected void unauthorizedRenameAttempt_Test() throws Exception {
long ts = calendar.getTimeInMillis();
String strangerName = "unauthorized_device_rename";
// Create a device without a "Created" relation to the gateway
Device stranger = new Device();
stranger.setName(strangerName);
stranger.setType("default");
doPost("/api/device", stranger);
clientWithCorrectNodeAccessTokenWithNDEATH();
SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts).setSeq(getSeqNum());
// Unauthorized gateway attempts to rename the device via Sparkplug topic
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + strangerName,
payload.build().toByteArray(), 0, false);
String expectedFullPath = groupId + ":" + edgeNode + ":" + strangerName;
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
doGet("/api/tenant/devices?deviceName=" + expectedFullPath, Device.class, status().isNotFound())
);
}
protected void state_ONLINE_ALL (List<Device> devices, long ts) {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name()));
await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
@ -309,6 +430,58 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
}
}
/**
* Coverage: Concurrent first-message registration with the lock mechanism.
*/
protected void concurrentFirstMessageRegistration_Test() throws Exception {
int threadCount = 5;
String concurrentDeviceName = "concurrent_device";
clientWithCorrectNodeAccessTokenWithNDEATH();
java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(threadCount);
long ts = calendar.getTimeInMillis();
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
SparkplugBProto.Payload.Builder payload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts).setSeq(0);
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/DBIRTH/" + edgeNode + "/" + concurrentDeviceName,
payload.build().toByteArray(), 0, false);
} catch (Exception e) {
log.error("Concurrent publish failed", e);
}
});
}
String expectedName = groupId + ":" + edgeNode + ":" + concurrentDeviceName;
await("Wait for concurrent registration result")
.atMost(40, TimeUnit.SECONDS) // Restored to 40s as requested
.until(() -> doGet("/api/tenant/devices?deviceName=" + expectedName, Device.class) != null);
executor.shutdown();
}
/**
* Coverage: Sparkplug-message handling when msgId <= 0 (#7).
* Verifies that the transport does not close the session for Sparkplug clients using msgId 0.
*/
protected void sparkplugSessionStaysAliveWithZeroMsgId_Test() throws Exception {
// clientMqttV5ConnectWithNDEATH internally sets msgId = 0 for the Will message.
// This validates that the connection is accepted despite msgId being 0.
IMqttToken connectionResult = clientMqttV5ConnectWithNDEATH(calendar.getTimeInMillis(), 0, -1L);
Assert.assertTrue("Sparkplug connection should be successful with msgId=0", client.isConnected());
// Publish NBIRTH message which usually goes through the aggregate callback.
// This verifies that msgId=0 in the callback does not trigger closeDeviceSession.
connectionWithNBirth(Int32, "test_metric_msgId_0", 555);
// Awaitility to ensure the session remains open after processing.
await("Verify Sparkplug session remains open after receiving msgId=0")
.atMost(40, TimeUnit.SECONDS)
.until(() -> client.isConnected());
}
protected List<Device> connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices(long ts) throws Exception {
List<Device> devices = new ArrayList<>();
Long alias = 0L;
@ -630,4 +803,9 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
}
}
private EntityRelation createFromRelation(Device mainDevice, Device device, String relationType) {
return new EntityRelation(mainDevice.getId(), device.getId(), relationType);
}
}

20
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java

@ -48,4 +48,24 @@ public class MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest extends A
public void testClientWithCorrectAccessTokenWithNDEATHTwoDevicesCreatingBeforeFirstNameDeviceIdSecondNameFull() throws Exception {
connectClientWithCorrectAccessTokenWithNDEATHDevicesCreatingBefore_Test(2);
}
@Test
public void testRenameWhenDeviceFullPathAlreadyExists_Collision() throws Exception {
renameCollisionWhenTargetNameAlreadyExists_Test();
}
@Test
public void testUnauthorizedRenameAttempt() throws Exception {
unauthorizedRenameAttempt_Test();
}
@Test
public void testConcurrentFirstMessageRegistration() throws Exception {
concurrentFirstMessageRegistration_Test();
}
@Test
public void testSparkplugSessionStaysAliveWithZeroMsgId() throws Exception {
sparkplugSessionStaysAliveWithZeroMsgId_Test();
}
}

Loading…
Cancel
Save