@ -1060,7 +1060,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void checkSparkplugNodeSession ( MqttConnectMessage connectMessage , ChannelHandlerContext ctx ) {
private void checkSparkplugNodeSession ( MqttConnectMessage connectMessage , ChannelHandlerContext ctx , SessionMetaData sessionMetaData ) {
try {
if ( sparkplugSessionHandler = = null ) {
SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode ( connectMessage ) ;
@ -1069,6 +1069,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
sparkplugSessionHandler = new SparkplugNodeSessionHandler ( this , deviceSessionCtx , sessionId , sparkplugTopicNode ) ;
sparkplugSessionHandler . onAttributesTelemetryProto ( 0 , sparkplugBProtoNode ,
deviceSessionCtx . getDeviceInfo ( ) . getDeviceName ( ) , sparkplugTopicNode ) ;
sessionMetaData . setOverwriteActivityTime ( true ) ;
} else {
log . trace ( "[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH." , sessionId , deviceSessionCtx . getDeviceInfo ( ) . getDeviceName ( ) ) ;
throw new ThingsboardException ( "Invalid request body" , ThingsboardErrorCode . BAD_REQUEST_PARAMS ) ;
@ -1145,7 +1146,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onSuccess ( Void msg ) {
SessionMetaData sessionMetaData = transportService . registerAsyncSession ( deviceSessionCtx . getSessionInfo ( ) , MqttTransportHandler . this ) ;
if ( deviceSessionCtx . isSparkplug ( ) ) {
checkSparkplugNodeSession ( connectMessage , ctx ) ;
checkSparkplugNodeSession ( connectMessage , ctx , sessionMetaData ) ;
} else {
checkGatewaySession ( sessionMetaData ) ;
}