Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions src/main/java/com/hivemq/codec/decoder/MQTTMessageDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;
import java.util.Objects;

import static com.hivemq.mqtt.message.MessageType.CONNECT;
import static com.hivemq.mqtt.message.MessageType.PUBLISH;

/**
* @author Dominik Obermaier
Expand All @@ -49,14 +47,16 @@ public class MQTTMessageDecoder extends ByteToMessageDecoder {
private static final int NOT_ENOUGH_BYTES_READABLE = -2;
private static final int MALFORMED_REMAINING_LENGTH = -1;
private static final int MIN_FIXED_HEADER_LENGTH = 2;
private static final int MIN_CONNECT_VAR_HEADER_LENGTH = 7;

private final @NotNull MqttConnectDecoder connectDecoder;
private final @NotNull MqttConnacker mqttConnacker;
private final @NotNull MqttConfigurationService mqttConfig;
private final @NotNull MqttDecoders mqttDecoders;
private final @NotNull MqttServerDisconnector mqttServerDisconnector;
private final @NotNull GlobalMQTTMessageCounter globalMQTTMessageCounter;

private final int maxPacketSize;

public MQTTMessageDecoder(
final @NotNull MqttConnectDecoder connectDecoder,
final @NotNull MqttConnacker mqttConnacker,
Expand All @@ -66,10 +66,10 @@ public MQTTMessageDecoder(
final @NotNull GlobalMQTTMessageCounter globalMQTTMessageCounter) {
this.connectDecoder = connectDecoder;
this.mqttConnacker = mqttConnacker;
this.mqttConfig = mqttConfig;
this.mqttDecoders = mqttDecoders;
this.mqttServerDisconnector = mqttServerDisconnector;
this.globalMQTTMessageCounter = globalMQTTMessageCounter;
this.maxPacketSize = mqttConfig.maxPacketSize();
}

public MQTTMessageDecoder(final ChannelDependencies channelDependencies) {
Expand Down Expand Up @@ -116,15 +116,22 @@ protected void decode(
return;
}

final int fixedHeaderSize = getFixedHeaderSize(remainingLength);
final int packetSize = fixedHeaderSize + remainingLength;

final MessageType messageType = getMessageType(fixedHeader);

//this is the message size HiveMQ allows for incoming messages
if (packetSize > maxPacketSize) {
handlePacketTooLarge(buf, messageType, clientConnectionContext);
return;
}

if (buf.readableBytes() < remainingLength) {
buf.resetReaderIndex();
return;
}

final int fixedHeaderSize = getFixedHeaderSize(remainingLength);
final int packetSize = fixedHeaderSize + remainingLength;

final MessageType messageType = getMessageType(fixedHeader);
final Message message;
if (messageType == CONNECT) {
message = handleConnect(buf, clientConnectionContext, fixedHeader, packetSize, remainingLength);
Expand All @@ -140,25 +147,50 @@ protected void decode(
out.add(message);
}

private @Nullable Message handleConnect(
private void handlePacketTooLarge(
final @NotNull ByteBuf buf,
final @NotNull ClientConnectionContext clientConnectionContext,
final byte fixedHeader,
final int packetSize,
final int remainingLength) {

//this is the message size HiveMQ allows for incoming messages
if (packetSize > mqttConfig.maxPacketSize()) {
//connack with PACKET_TOO_LARGE for Mqtt5
final @NotNull MessageType messageType,
final @NotNull ClientConnectionContext clientConnectionContext) {
//connack with PACKET_TOO_LARGE for Mqtt5
if (messageType == MessageType.CONNECT) {
// Theoretically, remaining length could be too short to read the protocol version.
// But in this case we can never reach a "packet too large" as the minimum configurable max packet size is 15
// while the broker needs only 7 bytes to read the version. Therefore, broker ignores this case and continues reading until
// it has at least 7 bytes of the variable CONNECT header to determine the protocol version,
// see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901035:~:text=identify%20MQTT%20traffic.-,3.1.2.2%20Protocol%20Version,-Figure%203%E2%80%913
if (buf.readableBytes() < MIN_CONNECT_VAR_HEADER_LENGTH) {
buf.resetReaderIndex();
return;
}
connectDecoder.decodeProtocolVersion(clientConnectionContext, buf);
mqttConnacker.connackError(clientConnectionContext.getChannel(),
"A client (IP: {}) connect packet exceeded the maximum permissible size.",
"A client (ID: {}, IP: {}) connect packet exceeded the maximum permissible size.",
"Sent CONNECT exceeded the maximum permissible size",
Mqtt5ConnAckReasonCode.PACKET_TOO_LARGE,
ReasonStrings.CONNACK_PACKET_TOO_LARGE);

return null;
} else {
final ProtocolVersion protocolVersion = clientConnectionContext.getProtocolVersion();
//force channel close for Mqtt3.1, Mqtt3.1.1 and null (before connect)
final boolean forceClose = protocolVersion != ProtocolVersion.MQTTv5;
mqttServerDisconnector.disconnect(clientConnectionContext.getChannel(),
"A client (ID: {}, IP: {}) sent a message, that was bigger than the maximum message size. Disconnecting client.",
"Sent a message that was bigger than the maximum size",
Mqtt5DisconnectReasonCode.PACKET_TOO_LARGE,
ReasonStrings.DISCONNECT_PACKET_TOO_LARGE_MESSAGE,
Mqtt5UserProperties.NO_USER_PROPERTIES,
false,
forceClose);
}
buf.clear();
}


private @Nullable Message handleConnect(
final @NotNull ByteBuf buf,
final @NotNull ClientConnectionContext clientConnectionContext,
final byte fixedHeader,
final int packetSize,
final int remainingLength) {

// Check if the client is already connected
if (clientConnectionContext.getProtocolVersion() != null) {
Expand Down Expand Up @@ -195,22 +227,6 @@ protected void decode(

final ProtocolVersion protocolVersion = clientConnectionContext.getProtocolVersion();

//this is the message size HiveMQ allows for incoming messages
if (packetSize > mqttConfig.maxPacketSize()) {

//force channel close for Mqtt3.1, Mqtt3.1.1 and null (before connect)
final boolean forceClose = protocolVersion != ProtocolVersion.MQTTv5;
mqttServerDisconnector.disconnect(clientConnectionContext.getChannel(),
"A client (IP: {}) sent a message, that was bigger than the maximum message size. Disconnecting client.",
"Sent a message that was bigger than the maximum size",
Mqtt5DisconnectReasonCode.PACKET_TOO_LARGE,
ReasonStrings.DISCONNECT_PACKET_TOO_LARGE_MESSAGE,
Mqtt5UserProperties.NO_USER_PROPERTIES,
false,
forceClose);
return null;
}

// Check if client is connected
if (protocolVersion == null) {
mqttServerDisconnector.logAndClose(clientConnectionContext.getChannel(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hivemq.configuration.service.entity.Listener;
import com.hivemq.configuration.service.entity.Tls;
import com.hivemq.configuration.service.entity.TlsTcpListener;
import com.hivemq.configuration.service.impl.MqttConfigurationServiceImpl;
import com.hivemq.logging.EventLog;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl;
Expand Down Expand Up @@ -106,6 +107,7 @@ public void before() throws Exception {
when(socketChannel.attr(any(AttributeKey.class))).thenReturn(attribute);
when(socketChannel.isActive()).thenReturn(true);
when(channelDependencies.getConfigurationService()).thenReturn(fullConfigurationService);
when(fullConfigurationService.mqttConfiguration()).thenReturn(new MqttConfigurationServiceImpl());
when(channelDependencies.getRestrictionsConfigurationService()).thenReturn(restrictionsConfigurationService);
when(restrictionsConfigurationService.incomingLimit()).thenReturn(0L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hivemq.configuration.service.entity.Listener;
import com.hivemq.configuration.service.entity.Tls;
import com.hivemq.configuration.service.entity.TlsWebsocketListener;
import com.hivemq.configuration.service.impl.MqttConfigurationServiceImpl;
import com.hivemq.logging.EventLog;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl;
Expand Down Expand Up @@ -110,6 +111,7 @@ public void before() {
when(sslFactory.getSslHandler(any(SocketChannel.class), any(Tls.class), any(SslContext.class))).thenReturn(
sslHandler);
when(channelDependencies.getConfigurationService()).thenReturn(fullConfigurationService);
when(fullConfigurationService.mqttConfiguration()).thenReturn(new MqttConfigurationServiceImpl());
when(mockListener.getTls()).thenReturn(tls);
when(channelDependencies.getConfigurationService()).thenReturn(fullConfigurationService);
when(channelDependencies.getRestrictionsConfigurationService()).thenReturn(restrictionsConfigurationService);
Expand Down
Loading