@ -31,6 +31,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage ;
import io.netty.handler.codec.mqtt.MqttPublishMessage ;
import io.netty.handler.codec.mqtt.MqttQoS ;
import io.netty.handler.codec.mqtt.MqttReasonCodes ;
import io.netty.handler.codec.mqtt.MqttSubAckMessage ;
import io.netty.handler.codec.mqtt.MqttSubAckPayload ;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage ;
@ -68,7 +69,6 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator ;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo ;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse ;
import org.thingsboard.server.common.transport.service.DefaultTransportService ;
import org.thingsboard.server.common.transport.service.SessionMetaData ;
import org.thingsboard.server.common.transport.util.SslUtil ;
import org.thingsboard.server.gen.transport.TransportProtos ;
@ -82,7 +82,6 @@ import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler ;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher ;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler ;
import org.thingsboard.server.transport.mqtt.util.ReturnCode ;
import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver ;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType ;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader ;
@ -196,23 +195,54 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processMqttMsg ( ctx , message ) ;
} else {
log . error ( "[{}] Message decoding failed: {}" , sessionId , message . decoderResult ( ) . cause ( ) . getMessage ( ) ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . MALFORMED_PACKET ) ;
}
} else {
log . debug ( "[{}] Received non mqtt message: {}" , sessionId , msg . getClass ( ) . getSimpleName ( ) ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , ( MqttMessage ) null ) ;
}
} finally {
ReferenceCountUtil . safeRelease ( msg ) ;
}
}
private void closeCtx ( ChannelHandlerContext ctx ) {
private void closeCtx ( ChannelHandlerContext ctx , MqttReasonCodes . Disconnect returnCode ) {
closeCtx ( ctx , returnCode . byteValue ( ) ) ;
}
private void closeCtx ( ChannelHandlerContext ctx , MqttConnectReturnCode returnCode ) {
closeCtx ( ctx , ReturnCodeResolver . getConnectionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , returnCode ) . byteValue ( ) ) ;
}
private void closeCtx ( ChannelHandlerContext ctx , byte returnCode ) {
closeCtx ( ctx , createMqttDisconnectMsg ( deviceSessionCtx , returnCode ) ) ;
}
private void closeCtx ( ChannelHandlerContext ctx , MqttMessage msg ) {
if ( ! rpcAwaitingAck . isEmpty ( ) ) {
log . debug ( "[{}] Cleanup RPC awaiting ack map due to session close!" , sessionId ) ;
rpcAwaitingAck . clear ( ) ;
}
ctx . close ( ) ;
if ( ctx . channel ( ) ! = null & & ctx . channel ( ) . isOpen ( ) ) {
if ( msg ! = null & & MqttVersion . MQTT_5 = = deviceSessionCtx . getMqttVersion ( ) ) {
ChannelFuture channelFuture = ctx . writeAndFlush ( msg ) . addListener ( future - > ctx . close ( ) ) ;
scheduler . schedule ( ( ) - > {
if ( ! channelFuture . isDone ( ) ) {
log . debug ( "[{}] Closing channel due to timeout!" , sessionId ) ;
ctx . close ( ) ;
}
} , context . getDisconnectTimeout ( ) , TimeUnit . MILLISECONDS ) ;
} else {
ctx . close ( ) ;
}
} else {
if ( ctx . channel ( ) ! = null ) {
log . debug ( "[{}] Channel is already closed!" , sessionId ) ;
} else {
log . debug ( "[{}] Channel is null, closing ctx..." , sessionId ) ;
ctx . close ( ) ;
}
}
}
InetSocketAddress getAddress ( ChannelHandlerContext ctx ) {
@ -231,7 +261,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
void processMqttMsg ( ChannelHandlerContext ctx , MqttMessage msg ) {
if ( msg . fixedHeader ( ) = = null ) {
log . info ( "[{}:{}] Invalid message received" , address . getHostName ( ) , address . getPort ( ) ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . PROTOCOL_ERROR ) ;
return ;
}
deviceSessionCtx . setChannel ( ctx ) ;
@ -268,21 +298,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
} else {
log . debug ( "[{}] Unsupported topic for provisioning requests: {}!" , sessionId , topicName ) ;
closeCtx ( ctx ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . TOPIC_NAME_INVALID ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . TOPIC_NAME_INVALID ) ;
}
} catch ( RuntimeException e ) {
log . warn ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
closeCtx ( ctx ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . IMPLEMENTATION_SPECIFIC_ERROR ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
} catch ( AdaptorException e ) {
log . debug ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
closeCtx ( ctx ) ;
sendResponseForAdaptorErrorOrCloseContext ( ctx , topicName , msgId ) ;
}
break ;
case PINGREQ :
ctx . writeAndFlush ( new MqttMessage ( new MqttFixedHeader ( PINGRESP , false , AT_MOST_ONCE , false , 0 ) ) ) ;
break ;
case DISCONNECT :
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . NORMAL_DISCONNECT ) ;
break ;
}
}
@ -292,7 +324,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if ( queueSize > = context . getMessageQueueSizePerDeviceLimit ( ) ) {
log . info ( "Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}" ,
deviceSessionCtx . getDeviceId ( ) , context . getMessageQueueSizePerDeviceLimit ( ) , queueSize , deviceSessionCtx . getMsgQueueSize ( ) ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . QUOTA_EXCEEDED ) ;
return ;
}
@ -329,7 +361,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
break ;
case DISCONNECT :
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . NORMAL_DISCONNECT ) ;
break ;
case PUBACK :
int msgId = ( ( MqttPubAckMessage ) msg ) . variableHeader ( ) . messageId ( ) ;
@ -389,15 +421,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
gatewaySessionHandler . onDeviceDisconnect ( mqttMsg ) ;
break ;
default :
ack ( ctx , msgId , ReturnCode . TOPIC_NAME_INVALID ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . TOPIC_NAME_INVALID ) ;
}
} catch ( RuntimeException e ) {
log . warn ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
ack ( ctx , msgId , ReturnCode . IMPLEMENTATION_SPECIFIC ) ;
closeCtx ( ctx ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . IMPLEMENTATION_SPECIFIC_ERROR ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
} catch ( AdaptorException e ) {
log . debug ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
sendAckOrCloseSession ( ctx , topicName , msgId ) ;
sendResponseForAdaptorErrorOrCloseContext ( ctx , topicName , msgId ) ;
}
}
@ -433,11 +465,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
} catch ( RuntimeException e ) {
log . error ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
ack ( ctx , msgId , ReturnCode . IMPLEMENTATION_SPECIFIC ) ;
closeCtx ( ctx ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . IMPLEMENTATION_SPECIFIC_ERROR ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
} catch ( AdaptorException | ThingsboardException | InvalidProtocolBufferException e ) {
log . error ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
sendAckOrCloseSession ( ctx , topicName , msgId ) ;
sendResponseForAdaptorErrorOrCloseContext ( ctx , topicName , msgId ) ;
}
}
@ -530,11 +562,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
attrReqTopicType = TopicType . V2 ;
} else {
transportService . recordActivity ( deviceSessionCtx . getSessionInfo ( ) ) ;
ack ( ctx , msgId , ReturnCode . TOPIC_NAME_INVALID ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . TOPIC_NAME_INVALID ) ;
}
} catch ( AdaptorException e ) {
log . debug ( "[{}] Failed to process publish msg [{}][{}]" , sessionId , topicName , msgId , e ) ;
sendAckOrCloseSession ( ctx , topicName , msgId ) ;
sendResponseForAdaptorErrorOrCloseContext ( ctx , topicName , msgId ) ;
}
}
@ -548,13 +580,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void sendAckOrCloseSession ( ChannelHandlerContext ctx , String topicName , int msgId ) {
private void sendResponseForAdaptorErrorOrCloseContext ( ChannelHandlerContext ctx , String topicName , int msgId ) {
if ( ( deviceSessionCtx . isSendAckOnValidationException ( ) | | MqttVersion . MQTT_5 . equals ( deviceSessionCtx . getMqttVersion ( ) ) ) & & msgId > 0 ) {
log . debug ( "[{}] Send pub ack on invalid publish msg [{}][{}]" , sessionId , topicName , msgId ) ;
ctx . writeAndFlush ( createMqttPubAckMsg ( deviceSessionCtx , msgId , ReturnCode . PAYLOAD_FORMAT_INVALID ) ) ;
ctx . writeAndFlush ( createMqttPubAckMsg ( deviceSessionCtx , msgId , MqttReasonCodes . PubAck . PAYLOAD_FORMAT_INVALID . byteValue ( ) ) ) ;
} else {
log . info ( "[{}] Closing current session due to invalid publish msg [{}][{}]" , sessionId , topicName , msgId ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . PAYLOAD_FORMAT_INVALID ) ;
}
}
@ -593,7 +625,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void ack ( ChannelHandlerContext ctx , int msgId , ReturnCode returnCode ) {
private void ack ( ChannelHandlerContext ctx , int msgId , MqttReasonCodes . PubAck returnCode ) {
ack ( ctx , msgId , returnCode . byteValue ( ) ) ;
}
private void ack ( ChannelHandlerContext ctx , int msgId , byte returnCode ) {
if ( msgId > 0 ) {
ctx . writeAndFlush ( createMqttPubAckMsg ( deviceSessionCtx , msgId , returnCode ) ) ;
}
@ -604,13 +640,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess ( Void dummy ) {
log . trace ( "[{}] Published msg: {}" , sessionId , msg ) ;
ack ( ctx , msgId , ReturnCode . SUCCESS ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . SUCCESS ) ;
}
@Override
public void onError ( Throwable e ) {
log . trace ( "[{}] Failed to publish msg: {}" , sessionId , msg , e ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
} ;
}
@ -629,7 +665,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess ( TransportProtos . ProvisionDeviceResponseMsg provisionResponseMsg ) {
log . trace ( "[{}] Published msg: {}" , sessionId , msg ) ;
ack ( ctx , msgId , ReturnCode . SUCCESS ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . SUCCESS ) ;
try {
if ( deviceSessionCtx . getProvisionPayloadType ( ) . equals ( TransportPayloadType . JSON ) ) {
deviceSessionCtx . getContext ( ) . getJsonMqttAdaptor ( ) . convertToPublish ( deviceSessionCtx , provisionResponseMsg ) . ifPresent ( deviceSessionCtx . getChannel ( ) : : writeAndFlush ) ;
@ -645,8 +681,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError ( Throwable e ) {
log . trace ( "[{}] Failed to publish msg: {}" , sessionId , msg , e ) ;
ack ( ctx , msgId , ReturnCode . IMPLEMENTATION_SPECIFIC ) ;
closeCtx ( ctx ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . IMPLEMENTATION_SPECIFIC_ERROR ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
}
@ -681,13 +717,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError ( Throwable e ) {
log . trace ( "[{}] Failed to get firmware: {}" , sessionId , msg , e ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
}
private void sendOtaPackage ( ChannelHandlerContext ctx , int msgId , String firmwareId , String requestId , int chunkSize , int chunk , OtaPackageType type ) {
log . trace ( "[{}] Send firmware [{}] to device!" , sessionId , firmwareId ) ;
ack ( ctx , msgId , ReturnCode . SUCCESS ) ;
ack ( ctx , msgId , MqttReasonCodes . PubAck . SUCCESS ) ;
try {
byte [ ] firmwareChunk = context . getOtaPackageDataCache ( ) . get ( firmwareId , chunkSize , chunk ) ;
deviceSessionCtx . getPayloadAdaptor ( )
@ -703,13 +739,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
deviceSessionCtx . getChannel ( ) . writeAndFlush ( deviceSessionCtx
. getPayloadAdaptor ( )
. createMqttPublishMsg ( deviceSessionCtx , MqttTopics . DEVICE_FIRMWARE_ERROR_TOPIC , error . getBytes ( ) ) ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
private void processSubscribe ( ChannelHandlerContext ctx , MqttSubscribeMessage mqttMsg ) {
if ( ! checkConnected ( ctx , mqttMsg ) ) {
int returnCode = ReturnCodeResolver . getSubscriptionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , ReturnCode . NOT_AUTHORIZED_5 ) ;
ctx . writeAndFlush ( createSubAckMessage ( mqttMsg . variableHeader ( ) . messageId ( ) , Collections . singletonList ( returnCode ) ) ) ;
ctx . writeAndFlush ( createSubAckMessage ( mqttMsg . variableHeader ( ) . messageId ( ) , Collections . singletonList ( MqttReasonCodes . SubAck . NOT_AUTHORIZED . byteValue ( ) & 0xFF ) ) ) ;
return ;
}
log . trace ( "[{}] Processing subscription [{}]!" , sessionId , mqttMsg . variableHeader ( ) . messageId ( ) ) ;
@ -718,7 +753,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
for ( MqttTopicSubscription subscription : mqttMsg . payload ( ) . topicSubscriptions ( ) ) {
String topic = subscription . topicName ( ) ;
MqttQoS reqQoS = subscription . qualityOfService ( ) ;
if ( deviceSessionCtx . isDeviceSubscriptionAttributesTopic ( topic ) ) {
if ( deviceSessionCtx . isDeviceSubscriptionAttributesTopic ( topic ) ) {
processAttributesSubscribe ( grantedQoSList , topic , reqQoS , TopicType . V1 ) ;
activityReported = true ;
continue ;
@ -789,13 +824,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
break ;
default :
log . warn ( "[{}] Failed to subscribe to [{}][{}]" , sessionId , topic , reqQoS ) ;
grantedQoSList . add ( ReturnCodeResolver . getSubscriptionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , ReturnCode . TOPIC_FILTER_INVALID ) ) ;
grantedQoSList . add ( ReturnCodeResolver . getSubscriptionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , MqttReasonCodes . SubAck . TOPIC_FILTER_INVALID ) ) ;
break ;
}
}
} catch ( Exception e ) {
log . warn ( "[{}] Failed to subscribe to [{}][{}]" , sessionId , topic , reqQoS , e ) ;
grantedQoSList . add ( ReturnCodeResolver . getSubscriptionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , ReturnCode . IMPLEMENTATION_SPECIFIC ) ) ;
grantedQoSList . add ( ReturnCodeResolver . getSubscriptionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , MqttReasonCodes . SubAck . IMPLEMENTATION_SPECIFIC_ERROR ) ) ;
}
}
if ( ! activityReported ) {
@ -832,7 +867,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processUnsubscribe ( ChannelHandlerContext ctx , MqttUnsubscribeMessage mqttMsg ) {
if ( ! checkConnected ( ctx , mqttMsg ) ) {
ctx . writeAndFlush ( createUnSubAckMessage ( mqttMsg . variableHeader ( ) . messageId ( ) , Collections . singletonList ( ReturnCode . NOT_AUTHORIZED_5 . shortValue ( ) ) ) ) ;
ctx . writeAndFlush ( createUnSubAckMessage ( mqttMsg . variableHeader ( ) . messageId ( ) ,
Collections . singletonList ( ( short ) MqttReasonCodes . UnsubAck . NOT_AUTHORIZED . byteValue ( ) ) ) ) ;
return ;
}
boolean activityReported = false ;
@ -843,7 +879,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if ( mqttQoSMap . containsKey ( matcher ) ) {
mqttQoSMap . remove ( matcher ) ;
try {
short resultValue = ReturnCode . SUCCESS . short Value( ) ;
short resultValue = MqttReasonCodes . UnsubAck . SUCCESS . byte Value( ) ;
switch ( topicName ) {
case MqttTopics . DEVICE_ATTRIBUTES_TOPIC :
case MqttTopics . DEVICE_ATTRIBUTES_SHORT_TOPIC :
@ -884,16 +920,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
default :
log . trace ( "[{}] Failed to process unsubscription [{}] to [{}]" , sessionId , mqttMsg . variableHeader ( ) . messageId ( ) , topicName ) ;
resultValue = ReturnCode . TOPIC_FILTER_INVALID . short Value( ) ;
resultValue = MqttReasonCodes . UnsubAck . TOPIC_FILTER_INVALID . byte Value( ) ;
}
unSubResults . add ( resultValue ) ;
} catch ( Exception e ) {
log . debug ( "[{}] Failed to process unsubscription [{}] to [{}]" , sessionId , mqttMsg . variableHeader ( ) . messageId ( ) , topicName ) ;
unSubResults . add ( ReturnCode . IMPLEMENTATION_SPECIFIC . short Value( ) ) ;
unSubResults . add ( ( short ) MqttReasonCodes . UnsubAck . IMPLEMENTATION_SPECIFIC_ERROR . byte Value( ) ) ;
}
} else {
log . debug ( "[{}] Failed to process unsubscription [{}] to [{}] - Subscription not found" , sessionId , mqttMsg . variableHeader ( ) . messageId ( ) , topicName ) ;
unSubResults . add ( ReturnCode . NO_SUBSCRIPTION_EXISTED . short Value( ) ) ;
unSubResults . add ( ( short ) MqttReasonCodes . UnsubAck . NO_SUBSCRIPTION_EXISTED . byte Value( ) ) ;
}
}
if ( ! activityReported ) {
@ -918,7 +954,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
deviceSessionCtx . setMqttVersion ( getMqttVersion ( msg . variableHeader ( ) . version ( ) ) ) ;
if ( DataConstants . PROVISION . equals ( userName ) | | DataConstants . PROVISION . equals ( clientId ) ) {
deviceSessionCtx . setProvisionOnly ( true ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . SUCCESS , msg ) ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnect ReturnCode. CONNECTION_ACCEPTED , msg ) ) ;
} else {
X509Certificate cert ;
if ( sslHandler ! = null & & ( cert = getX509Certificate ( ) ) ! = null ) {
@ -952,8 +988,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError ( Throwable e ) {
log . trace ( "[{}] Failed to process credentials: {}" , address , userName , e ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . SERVER_UNAVAILABLE_5 , connectMessage ) ) ;
closeCtx ( ctx ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnect ReturnCode. CONNECTION_REFUSED_ SERVER_UNAVAILABLE_5, connectMessage ) ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . SERVER_BUSY ) ;
}
} ) ;
}
@ -975,15 +1011,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError ( Throwable e ) {
log . trace ( "[{}] Failed to process credentials: {}" , address , sha3Hash , e ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . SERVER_UNAVAILABLE_5 , connectMessage ) ) ;
closeCtx ( ctx ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnect ReturnCode. CONNECTION_REFUSED_ SERVER_UNAVAILABLE_5, connectMessage ) ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
} ) ;
} catch ( Exception e ) {
context . onAuthFailure ( address ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . NOT_AUTHORIZED_5 , connectMessage ) ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnect ReturnCode. CONNECTION_REFUSED_ NOT_AUTHORIZED_5, connectMessage ) ) ;
log . trace ( "[{}] X509 auth failure: {}" , sessionId , address , e ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . NOT_AUTHORIZED ) ;
}
}
@ -1000,7 +1036,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return null ;
}
private MqttConnAckMessage createMqttConnAckMsg ( ReturnCode returnCode , MqttConnectMessage msg ) {
private MqttConnAckMessage createMqttConnAckMsg ( MqttConnect ReturnCode returnCode , MqttConnectMessage msg ) {
MqttMessageBuilders . ConnAckBuilder connAckBuilder = MqttMessageBuilders . connAck ( ) ;
connAckBuilder . sessionPresent ( ! msg . variableHeader ( ) . isCleanSession ( ) ) ;
MqttConnectReturnCode finalReturnCode = ReturnCodeResolver . getConnectionReturnCode ( deviceSessionCtx . getMqttVersion ( ) , returnCode ) ;
@ -1031,18 +1067,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log . error ( "[{}] Unexpected Exception" , sessionId , cause ) ;
}
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . SERVER_SHUTTING_DOWN ) ;
if ( cause instanceof OutOfMemoryError ) {
log . error ( "Received critical error. Going to shutdown the service." ) ;
System . exit ( 1 ) ;
}
}
private static MqttSubAckMessage createSubAckMessage ( Integer msgId , List < Integer > grantedQoSList ) {
private static MqttSubAckMessage createSubAckMessage ( Integer msgId , List < Integer > reasonCodes ) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader ( SUBACK , false , AT_MOST_ONCE , false , 0 ) ;
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader . from ( msgId ) ;
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload ( grantedQoSList ) ;
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload ( reasonCodes ) ;
return new MqttSubAckMessage ( mqttFixedHeader , mqttMessageIdVariableHeader , mqttSubAckPayload ) ;
}
@ -1061,14 +1097,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
public static MqttMessage createMqttPubAckMsg ( DeviceSessionCtx deviceSessionCtx , int requestId , ReturnCod e returnCode ) {
public static MqttMessage createMqttPubAckMsg ( DeviceSessionCtx deviceSessionCtx , int requestId , byt e returnCode ) {
MqttMessageBuilders . PubAckBuilder pubAckMsgBuilder = MqttMessageBuilders . pubAck ( ) . packetId ( requestId ) ;
if ( MqttVersion . MQTT_5 . equals ( deviceSessionCtx . getMqttVersion ( ) ) ) {
pubAckMsgBuilder . reasonCode ( returnCode . byteValue ( ) ) ;
pubAckMsgBuilder . reasonCode ( returnCode ) ;
}
return pubAckMsgBuilder . build ( ) ;
}
public static MqttMessage createMqttDisconnectMsg ( DeviceSessionCtx deviceSessionCtx , byte returnCode ) {
MqttMessageBuilders . DisconnectBuilder disconnectBuilder = MqttMessageBuilders . disconnect ( ) ;
if ( MqttVersion . MQTT_5 . equals ( deviceSessionCtx . getMqttVersion ( ) ) ) {
disconnectBuilder . reasonCode ( returnCode ) ;
}
return disconnectBuilder . build ( ) ;
}
private boolean checkConnected ( ChannelHandlerContext ctx , MqttMessage msg ) {
if ( deviceSessionCtx . isConnected ( ) ) {
return true ;
@ -1115,8 +1159,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
} catch ( Exception e ) {
log . trace ( "[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName" , sessionId , deviceSessionCtx . getDeviceInfo ( ) . getDeviceName ( ) , e ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . SERVER_UNAVAILABLE_5 , connectMessage ) ) ;
closeCtx ( ctx ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnect ReturnCode. CONNECTION_REFUSED_ SERVER_UNAVAILABLE_5, connectMessage ) ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
}
@ -1160,20 +1204,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void onValidateDeviceResponse ( ValidateDeviceCredentialsResponse msg , ChannelHandlerContext ctx , MqttConnectMessage connectMessage ) {
if ( ! msg . hasDeviceInfo ( ) ) {
context . onAuthFailure ( address ) ;
ReturnCode returnCode = ReturnCode . NOT_AUTHORIZED_5 ;
MqttConnect ReturnCode returnCode = MqttConnect ReturnCode. CONNECTION_REFUSED_ NOT_AUTHORIZED_5;
if ( sslHandler = = null | | getX509Certificate ( ) = = null ) {
String username = connectMessage . payload ( ) . userName ( ) ;
byte [ ] passwordBytes = connectMessage . payload ( ) . passwordInBytes ( ) ;
String clientId = connectMessage . payload ( ) . clientIdentifier ( ) ;
if ( ( username ! = null & & passwordBytes ! = null & & clientId ! = null )
| | ( username = = null ^ passwordBytes = = null ) ) {
returnCode = ReturnCode . BAD_USERNAME_OR_PASSWORD ;
returnCode = MqttConnect ReturnCode. CONNECTION_REFUSED_ BAD_USERNAME_OR_PASSWORD;
} else if ( ! StringUtils . isBlank ( clientId ) ) {
returnCode = ReturnCode . CLIENT_IDENTIFIER_NOT_VALID ;
returnCode = MqttConnect ReturnCode. CONNECTION_REFUSED_ CLIENT_IDENTIFIER_NOT_VALID;
}
}
ctx . writeAndFlush ( createMqttConnAckMsg ( returnCode , connectMessage ) ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , returnCode ) ;
} else {
context . onAuthSuccess ( address ) ;
deviceSessionCtx . setDeviceInfo ( msg . getDeviceInfo ( ) ) ;
@ -1188,7 +1232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else {
checkGatewaySession ( sessionMetaData ) ;
}
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . SUCCESS , connectMessage ) ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnect ReturnCode. CONNECTION_ACCEPTED , connectMessage ) ) ;
deviceSessionCtx . setConnected ( true ) ;
log . debug ( "[{}] Client connected!" , sessionId ) ;
transportService . getCallbackExecutor ( ) . execute ( ( ) - > processMsgQueue ( ctx ) ) ; //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
@ -1198,11 +1242,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onError ( Throwable e ) {
if ( e instanceof TbRateLimitsException ) {
log . trace ( "[{}] Failed to submit session event: {}" , sessionId , e . getMessage ( ) ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnectReturnCode . CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED , connectMessage ) ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . MESSAGE_RATE_TOO_HIGH ) ;
} else {
log . warn ( "[{}] Failed to submit session event" , sessionId , e ) ;
ctx . writeAndFlush ( createMqttConnAckMsg ( MqttConnectReturnCode . CONNECTION_REFUSED_SERVER_UNAVAILABLE_5 , connectMessage ) ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ) ;
}
ctx . writeAndFlush ( createMqttConnAckMsg ( ReturnCode . SERVER_UNAVAILABLE_5 , connectMessage ) ) ;
closeCtx ( ctx ) ;
}
} ) ;
}
@ -1231,8 +1277,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
SparkplugTopic sparkplugTopic = new SparkplugTopic ( sparkplugSessionHandler . getSparkplugTopicNode ( ) ,
SparkplugMessageType . NCMD ) ;
sparkplugSessionHandler . createSparkplugMqttPublishMsg ( tsKvProto ,
sparkplugTopic . toString ( ) ,
sparkplugSessionHandler . getNodeBirthMetrics ( ) . get ( tsKvProto . getKv ( ) . getKey ( ) ) )
sparkplugTopic . toString ( ) ,
sparkplugSessionHandler . getNodeBirthMetrics ( ) . get ( tsKvProto . getKv ( ) . getKey ( ) ) )
. ifPresent ( sparkplugSessionHandler : : writeAndFlush ) ;
}
} ) ;
@ -1250,7 +1296,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onRemoteSessionCloseCommand ( UUID sessionId , TransportProtos . SessionCloseNotificationProto sessionCloseNotification ) {
log . trace ( "[{}] Received the remote command to close the session: {}" , sessionId , sessionCloseNotification . getMessage ( ) ) ;
transportService . deregisterSession ( deviceSessionCtx . getSessionInfo ( ) ) ;
closeCtx ( deviceSessionCtx . getChannel ( ) ) ;
MqttReasonCodes . Disconnect returnCode = MqttReasonCodes . Disconnect . IMPLEMENTATION_SPECIFIC_ERROR ;
switch ( sessionCloseNotification . getReason ( ) ) {
case CREDENTIALS_UPDATED :
returnCode = MqttReasonCodes . Disconnect . ADMINISTRATIVE_ACTION ;
break ;
case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED :
returnCode = MqttReasonCodes . Disconnect . SESSION_TAKEN_OVER ;
break ;
case SESSION_TIMEOUT :
returnCode = MqttReasonCodes . Disconnect . MAXIMUM_CONNECT_TIME ;
break ;
}
closeCtx ( deviceSessionCtx . getChannel ( ) , returnCode ) ;
}
@Override
@ -1287,8 +1345,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
SparkplugTopic sparkplugTopic = new SparkplugTopic ( sparkplugSessionHandler . getSparkplugTopicNode ( ) ,
messageType ) ;
sparkplugSessionHandler . createSparkplugMqttPublishMsg ( tsKvProto ,
sparkplugTopic . toString ( ) ,
sparkplugSessionHandler . getNodeBirthMetrics ( ) . get ( tsKvProto . getKv ( ) . getKey ( ) ) )
sparkplugTopic . toString ( ) ,
sparkplugSessionHandler . getNodeBirthMetrics ( ) . get ( tsKvProto . getKv ( ) . getKey ( ) ) )
. ifPresent ( payload - > sendToDeviceRpcRequest ( payload , rpcRequest , deviceSessionCtx . getSessionInfo ( ) ) ) ;
} else {
sendErrorRpcResponse ( deviceSessionCtx . getSessionInfo ( ) , rpcRequest . getRequestId ( ) ,
@ -1369,7 +1427,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onDeviceDeleted ( DeviceId deviceId ) {
context . onAuthFailure ( address ) ;
ChannelHandlerContext ctx = deviceSessionCtx . getChannel ( ) ;
closeCtx ( ctx ) ;
closeCtx ( ctx , MqttReasonCodes . Disconnect . ADMINISTRATIVE_ACTION ) ;
}
public void sendErrorRpcResponse ( TransportProtos . SessionInfoProto sessionInfo , int requestId , ThingsboardErrorCode result , String errorMsg ) {