From 6426dc4f6bf35ded16699c944234415db7c943fb Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 26 May 2023 10:28:12 +0300 Subject: [PATCH] MqttChannelHandler: add decoderResult check before casting to prevent class cast exception --- .../thingsboard/mqtt/MqttChannelHandler.java | 57 +++++++++++-------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index e243f66633..fa88fa0254 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -35,7 +35,9 @@ import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Promise; +import lombok.extern.slf4j.Slf4j; +@Slf4j final class MqttChannelHandler extends SimpleChannelInboundHandler { private final MqttClientImpl client; @@ -48,31 +50,36 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler @Override protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { - switch (msg.fixedHeader().messageType()) { - case CONNACK: - handleConack(ctx.channel(), (MqttConnAckMessage) msg); - break; - case SUBACK: - handleSubAck((MqttSubAckMessage) msg); - break; - case PUBLISH: - handlePublish(ctx.channel(), (MqttPublishMessage) msg); - break; - case UNSUBACK: - handleUnsuback((MqttUnsubAckMessage) msg); - break; - case PUBACK: - handlePuback((MqttPubAckMessage) msg); - break; - case PUBREC: - handlePubrec(ctx.channel(), msg); - break; - case PUBREL: - handlePubrel(ctx.channel(), msg); - break; - case PUBCOMP: - handlePubcomp(msg); - break; + if (msg.decoderResult().isSuccess()) { + switch (msg.fixedHeader().messageType()) { + case CONNACK: + handleConack(ctx.channel(), (MqttConnAckMessage) msg); + break; + case SUBACK: + handleSubAck((MqttSubAckMessage) msg); + break; + case PUBLISH: + handlePublish(ctx.channel(), (MqttPublishMessage) msg); + break; + case UNSUBACK: + handleUnsuback((MqttUnsubAckMessage) msg); + break; + case PUBACK: + handlePuback((MqttPubAckMessage) msg); + break; + case PUBREC: + handlePubrec(ctx.channel(), msg); + break; + case PUBREL: + handlePubrel(ctx.channel(), msg); + break; + case PUBCOMP: + handlePubcomp(msg); + break; + } + } else { + log.error("[{}] Message decoding failed: {}", client.getClientConfig().getClientId(), msg.decoderResult().cause().getMessage()); + ctx.close(); } }