Browse Source

spark[lug - refactoring review - 01 (Without test)

pull/14987/head
nickAS21 3 weeks ago
parent
commit
2ee1a1d3b5
  1. 74
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  2. 40
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java
  3. 4
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java
  4. 3
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java
  5. 6
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java
  6. 29
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java
  7. 1
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java
  8. 18
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java
  9. 10
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java
  10. 6
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java

74
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.exception.EntitiesLimitExceededException;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@ -114,7 +115,7 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.service.transport.BasicCredentialsValidationResult.PASSWORD_MISMATCH;
import static org.thingsboard.server.service.transport.BasicCredentialsValidationResult.VALID;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_SEPARATOR;
/**
* Created by ashvayka on 05.10.18.
@ -347,7 +348,6 @@ public class DefaultTransportApiService implements TransportApiService {
return buildLimitErrorResponse(e, gatewayId);
} finally {
lock.unlock();
deviceCreationLocks.remove(deviceName, lock);
}
}
@ -367,45 +367,87 @@ public class DefaultTransportApiService implements TransportApiService {
if (device != null) {
return device;
}
device = tryRenameSparkplugDevice(requestMsg, gateway);
String[] topicPath = requestMsg.getDeviceName().split(DEVICE_NAME_SPLIT_SEPARATOR);
device = tryRenameSparkplugDevice(requestMsg, gateway, topicPath);
if (device != null) {
return device;
}
device = createNewDevice(requestMsg, gateway, gatewayId);
device = createNewDevice(requestMsg, gateway, gatewayId, topicPath);
pushCreatedEvent(device, gateway);
return device;
}
private Device tryRenameSparkplugDevice(GetOrCreateDeviceFromGatewayRequestMsg requestMsg,
Device gateway) {
private Device tryRenameSparkplugDevice(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, Device gateway, String[] topicPath) {
if (!requestMsg.getIsSparkplug()) {
return null;
}
String[] topicPath = requestMsg.getDeviceName().split(DEVICE_NAME_SPLIT_REGEXP);
if (topicPath.length != 3) {
return null;
}
String deviceId = topicPath[2];
Device existingDevice =
deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceId);
Device existingDevice = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceId);
if (existingDevice == null) {
return null;
}
existingDevice.setName(requestMsg.getDeviceName());
existingDevice.setLabel(deviceId);
return deviceService.saveDevice(existingDevice);
// Security check: verify that the device was created by this gateway
try {
boolean isRelated = relationService.checkRelation(
gateway.getTenantId(),
gateway.getId(),
existingDevice.getId(),
"Created",
RelationTypeGroup.COMMON
);
if (!isRelated) {
log.warn("[{}] Security breach attempt! Gateway tried to rename device [{}] without 'Created' relation.",
gateway.getId(), existingDevice.getId());
return null;
}
} catch (Exception e) {
log.error("[{}] Error checking relation for device {}", gateway.getId(), existingDevice.getId(), e);
return null;
}
boolean changed = false;
String newName = requestMsg.getDeviceName();
if (!newName.equals(existingDevice.getName())) {
// Check if the new name is already taken by another device
Device conflictDevice = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), newName);
if (conflictDevice != null) {
log.warn("[{}] Cannot rename device [{}] to [{}]: name already exists!",
gateway.getId(), existingDevice.getId(), newName);
return existingDevice;
}
existingDevice.setName(newName);
// Update label only if it's empty to avoid overwriting user changes
if (existingDevice.getLabel() == null || existingDevice.getLabel().isEmpty()) {
existingDevice.setLabel(deviceId);
}
changed = true;
}
return changed ? deviceService.saveDevice(existingDevice) : existingDevice;
}
private Device createNewDevice(GetOrCreateDeviceFromGatewayRequestMsg requestMsg,
Device gateway,
DeviceId gatewayId) {
DeviceId gatewayId, String[] topicPath) {
TenantId tenantId = gateway.getTenantId();
Device device = new Device();
device.setTenantId(tenantId);
device.setName(requestMsg.getDeviceName());
if (requestMsg.getIsSparkplug()){
String [] topicDevice = requestMsg.getDeviceName().split(DEVICE_NAME_SPLIT_REGEXP);
if (topicDevice.length == 3) device.setLabel(topicDevice[2]);
if (requestMsg.getIsSparkplug()) {
if (topicPath.length == 3) device.setLabel(topicPath[2]);
}
device.setType(requestMsg.getDeviceType());
device.setCustomerId(gateway.getCustomerId());

40
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java

@ -71,9 +71,9 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConn
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_SEPARATOR;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR;
/**
* Created by nickAS21 on 12.01.23
@ -115,7 +115,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
if (isCreateDevices) {
String deviceName = deviceId + "_1";
createDevice(deviceName, deviceProfile.getName(), false);
deviceName = groupId + DEVICE_NAME_SPLIT_REGEXP + edgeNode + DEVICE_NAME_SPLIT_REGEXP + deviceId + "_2";
deviceName = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceId + "_2";
createDevice(deviceName, deviceProfile.getName(), false);
}
}
@ -158,7 +158,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
options.setSessionExpiryInterval(0L);
options.setUserName(gatewayAccessToken);
String nameSpace = nameSpaceBad.length == 0 ? TOPIC_ROOT_SPB_V_1_0 : nameSpaceBad[0];
String topic = nameSpace + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NDEATH.name() + TOPIC_SPLIT_REGEXP + edgeNode;
String topic = nameSpace + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NDEATH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode;
// The NDEATH message MUST set the MQTT Will QoS to 1 and Retained flag to false
MqttMessage msg = new MqttMessage();
msg.setId(0);
@ -181,7 +181,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
@ -192,14 +192,14 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
.setTimestamp(ts)
.setSeq(getSeqNum());
String deviceIdName = deviceId + "_" + i;
String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName;
String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName;
payloadBirthDevice.addMetrics(metric);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdName,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdName,
payloadBirthDevice.build().toByteArray(), 0, false);
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceIdName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.atMost(40, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
@ -227,7 +227,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
@ -241,14 +241,14 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
.setTimestamp(ts)
.setSeq(getSeqNum());
payloadBirthDevice1.addMetrics(metric);
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdNameLabel1,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdNameLabel1,
payloadBirthDevice1.build().toByteArray(), 0, false);
}
String deviceName1 = groupId + DEVICE_NAME_SPLIT_REGEXP + edgeNode + DEVICE_NAME_SPLIT_REGEXP + deviceIdNameLabel1;;
String deviceName1 = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceIdNameLabel1;
AtomicReference<Device> device1 = new AtomicReference<>();
await(alias + "find device [" + deviceName1 + "] before connecting")
.atMost(200, TimeUnit.SECONDS)
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
device1.set(doGet("/api/tenant/devices?deviceName=" + deviceName1, Device.class));
return device1.get() != null;
@ -262,13 +262,13 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
.setTimestamp(ts)
.setSeq(getSeqNum());
payloadBirthDevice2.addMetrics(metric);
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdName2,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdName2,
payloadBirthDevice2.build().toByteArray(), 0, false);
}
String deviceName2 = groupId + DEVICE_NAME_SPLIT_REGEXP + edgeNode + DEVICE_NAME_SPLIT_REGEXP + deviceIdName2;
String deviceName2 = groupId + DEVICE_NAME_SPLIT_SEPARATOR + edgeNode + DEVICE_NAME_SPLIT_SEPARATOR + deviceIdName2;
AtomicReference<Device> device2 = new AtomicReference<>();
await(alias + "find device [" + deviceName2 + "] before connecting")
.atMost(200, TimeUnit.SECONDS)
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
device2.set(doGet("/api/tenant/devices?deviceName=" + deviceName2, Device.class));
return device2.get() != null;
@ -323,7 +323,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
@ -333,15 +333,15 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
.setTimestamp(ts)
.setSeq(getSeqNum());
String deviceIdName = deviceId + "_1";
String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName;
String deviceName = groupId + ":" + edgeNode + ":" + deviceIdName;
payloadBirthDevice.addMetrics(metric);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + deviceIdName,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + deviceIdName,
payloadBirthDevice.build().toByteArray(), 0, false);
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.atMost(40, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
@ -387,7 +387,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
listKeys.add(metricKey);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_REGEXP + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.NBIRTH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
return listKeys;

4
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java

@ -40,7 +40,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConn
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR;
/**
* Created by nickAS21 on 12.01.23
@ -111,7 +111,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
if (client.isConnected()) {
List<Device> devicesList = new ArrayList<>(devices);
Device device = devicesList.get(indexDeviceDisconnect);
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + SparkplugMessageType.DDEATH.name() + TOPIC_SPLIT_REGEXP + edgeNode + TOPIC_SPLIT_REGEXP + device.getName(),
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + SparkplugMessageType.DDEATH.name() + TOPIC_SPLIT_SEPARATOR + edgeNode + TOPIC_SPLIT_SEPARATOR + device.getName(),
payloadDeathDevice.build().toByteArray(), 0, false);
await(alias + messageName(STATE) + ", device: " + device.getName())
.atMost(40, TimeUnit.SECONDS)

3
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest.java

@ -47,4 +47,5 @@ public class MqttV5ClientSparkplugBConnectionDevicesCreatingBeforeTest extends A
@Test
public void testClientWithCorrectAccessTokenWithNDEATHTwoDevicesCreatingBeforeFirstNameDeviceIdSecondNameFull() throws Exception {
connectClientWithCorrectAccessTokenWithNDEATHDevicesCreatingBefore_Test(2);
}}
}
}

6
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java

@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR;
/**
* Created by nickAS21 on 12.01.23
@ -68,7 +68,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + messageTypeName + TOPIC_SPLIT_REGEXP + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + messageTypeName + TOPIC_SPLIT_SEPARATOR + edgeNode,
ndataPayload.build().toByteArray(), 0, false);
}
@ -97,7 +97,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
createdAddMetricValueArraysPrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + groupId + TOPIC_SPLIT_REGEXP + messageTypeName + TOPIC_SPLIT_REGEXP + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + groupId + TOPIC_SPLIT_SEPARATOR + messageTypeName + TOPIC_SPLIT_SEPARATOR + edgeNode,
ndataPayload.build().toByteArray(), 0, false);
}

29
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java

@ -298,11 +298,22 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
ListenableFuture<T> onDeviceConnectSparkplug(SparkplugTopic topic, String deviceType) {
T result = devices.get(topic.getNodeDeviceName());
String fullPath = topic.getNodeDeviceNameAllPath();
// Primary lookup: try to find the device by its full-path name (standard for new devices)
T result = devices.get(fullPath);
if (result == null) {
return onDeviceConnect(topic.getNodeDeviceNameAllPath(), deviceType, true);
} else {
// Secondary lookup (Legacy Fallback): check for the short name if full path is not found.
// This supports devices migrated from older versions.
String shortName = topic.getNodeDeviceName();
result = devices.get(shortName);
}
if (result != null) {
return Futures.immediateFuture(result);
} else {
// If not found in cache at all, proceed with connection/creation using full path
return onDeviceConnect(fullPath, deviceType, true);
}
}
@ -855,8 +866,16 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
}
}
if (!deviceSessionCtx.isSparkplug() && msgId <= 0) {
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
// Check for malformed packets (msgId <= 0)
if (msgId <= 0) {
if (deviceSessionCtx.isSparkplug()) {
// Sparkplug devices may use msgId = 0 during the initial connection or for QoS 0 messages.
// We bypass the MALFORMED_PACKET check to allow these sessions to proceed.
log.trace("[{}] Sparkplug session: allowed msgId [{}] for device: [{}]", sessionId, msgId, deviceName);
} else {
// Standard MQTT defense: close session for invalid message IDs
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
}
}
}

1
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java

@ -128,4 +128,5 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC
rpcRequest.getMethodName() + ". " + e.getMessage());
}
}
}

18
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java

@ -48,7 +48,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -61,8 +60,8 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetr
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_STATE_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_STATE_SEPARATOR;
@Slf4j
@SpecVersion(spec = "sparkplug", version = "3.0.0")
@ -117,18 +116,23 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
contextListenableFuture = Futures.immediateFuture(this.deviceSessionCtx);
} else {
try {
deviceName = checkDeviceName(topic.getNodeDeviceNameAllPath());
ListenableFuture<SparkplugDeviceSessionContext> deviceCtx = this.onDeviceConnectProto(topic);
deviceName = checkDeviceName(deviceCtx.get().getDeviceInfo().getDeviceName());
String finalDeviceName = deviceName;
contextListenableFuture = Futures.transform(deviceCtx, ctx -> {
if (topic.isType(DBIRTH)) {
sendSparkplugStateOnTelemetry(ctx.getSessionInfo(), finalDeviceName, ONLINE,
sparkplugBProto.getTimestamp());
try {
ctx.setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
} catch (IllegalArgumentException | DuplicateKeyException e) {
log.error("[{}] Failed to set birth metrics", finalDeviceName, e);
throw new RuntimeException(e);
}
}
return ctx;
}, MoreExecutors.directExecutor());
} catch (IllegalArgumentException | DuplicateKeyException | ExecutionException | InterruptedException e) {
} catch (IllegalArgumentException | DuplicateKeyException e) {
throw new RuntimeException(e);
}
}
@ -200,7 +204,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
*/
public void handleSparkplugSubscribeMsg(MqttTopicSubscription subscription) throws ThingsboardException {
String topic = subscription.topicFilter();
if (topic != null && topic.startsWith(TOPIC_STATE_REGEXP)) {
if (topic != null && topic.startsWith(TOPIC_STATE_SEPARATOR)) {
log.trace("Subscribing on it’s own spBv1.0/STATE/[the Sparkplug Host Application] - Implemented as status via checkSparkplugNodeSession");
} else if (this.validateTopicDataSubscribe(topic)) {
// TODO if need subscription DATA
@ -384,7 +388,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
* @throws ThingsboardException if an error occurs while parsing
*/
public boolean validateTopicDataSubscribe(String topic) throws ThingsboardException {
String[] splitTopic = topic.split(TOPIC_SPLIT_REGEXP);
String[] splitTopic = topic.split(TOPIC_SPLIT_SEPARATOR);
if (splitTopic.length >= 4 && splitTopic.length <= 5 &&
splitTopic[0].equals(this.sparkplugTopicNode.getNamespace()) &&
splitTopic[1].equals(this.sparkplugTopicNode.getGroupId()) &&

10
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java

@ -21,9 +21,9 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.parseMessageType;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.DEVICE_NAME_SPLIT_SEPARATOR;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_SEPARATOR;
/**
* Created by nickAS21 on 12.12.22
@ -197,7 +197,7 @@ public class SparkplugTopic {
try {
if (isValidIdElementToUTF8(topicString)) {
SparkplugMessageType messageType;
String[] splitTopic = topicString.split(TOPIC_SPLIT_REGEXP);
String[] splitTopic = topicString.split(TOPIC_SPLIT_SEPARATOR);
if (TOPIC_ROOT_SPB_V_1_0.equals(splitTopic[0])) {
if (splitTopic.length == 3) {
messageType = parseMessageType(splitTopic[1]);
@ -333,9 +333,9 @@ public class SparkplugTopic {
public String getNodeDeviceNameAllPath() {
StringBuilder sb = new StringBuilder();
if (hostApplicationId == null) {
sb.append(getGroupId()).append(DEVICE_NAME_SPLIT_REGEXP).append(getEdgeNodeId());
sb.append(getGroupId()).append(DEVICE_NAME_SPLIT_SEPARATOR).append(getEdgeNodeId());
if (getDeviceId() != null) {
sb.append(DEVICE_NAME_SPLIT_REGEXP).append(getDeviceId());
sb.append(DEVICE_NAME_SPLIT_SEPARATOR).append(getDeviceId());
}
}
return sb.toString();

6
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java

@ -34,9 +34,9 @@ public class SparkplugTopicService {
private static final Map<String, SparkplugTopic> SPLIT_TOPIC_CACHE = new HashMap<>();
public static final String TOPIC_ROOT_SPB_V_1_0 = "spBv1.0";
public static final String TOPIC_ROOT_CERT_SP = "$sparkplug/certificates/";
public static final String TOPIC_SPLIT_REGEXP = "/";
public static final String DEVICE_NAME_SPLIT_REGEXP = ":";
public static final String TOPIC_STATE_REGEXP = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + STATE.name() + TOPIC_SPLIT_REGEXP;
public static final String TOPIC_SPLIT_SEPARATOR = "/";
public static final String DEVICE_NAME_SPLIT_SEPARATOR = ":";
public static final String TOPIC_STATE_SEPARATOR = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_SEPARATOR + STATE.name() + TOPIC_SPLIT_SEPARATOR;
public static SparkplugTopic getSplitTopic(String topic) throws ThingsboardException {
SparkplugTopic sparkplugTopic = SPLIT_TOPIC_CACHE.get(topic);

Loading…
Cancel
Save