Skip to content

Add max inbound message size to ConnectionFactory #1063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -203,6 +203,13 @@ public class ConnectionFactory implements Cloneable {

private CredentialsRefreshService credentialsRefreshService;

/**
* Maximum body size of inbound (received) messages in bytes.
*
* <p>Default value is 67,108,864 (64 MiB).
*/
private int maxInboundMessageBodySize = 1_048_576 * 64;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -997,11 +1004,15 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
this.nioParams.setThreadFactory(getThreadFactory());
}
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContextFactory);
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(
connectionTimeout, nioParams, isSSL(), sslContextFactory,
this.maxInboundMessageBodySize);
}
return this.frameHandlerFactory;
} else {
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, socketConf, isSSL(), this.shutdownExecutor, sslContextFactory);
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory,
socketConf, isSSL(), this.shutdownExecutor, sslContextFactory,
this.maxInboundMessageBodySize);
}

}
Expand Down Expand Up @@ -1300,6 +1311,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
result.setTrafficListener(trafficListener);
result.setCredentialsRefreshService(credentialsRefreshService);
result.setMaxInboundMessageBodySize(maxInboundMessageBodySize);
return result;
}

Expand Down Expand Up @@ -1590,6 +1602,21 @@ public int getChannelRpcTimeout() {
return channelRpcTimeout;
}

/**
* Maximum body size of inbound (received) messages in bytes.
*
* <p>Default value is 67,108,864 (64 MiB).
*
* @param maxInboundMessageBodySize the maximum size of inbound messages
*/
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
if (maxInboundMessageBodySize <= 0) {
throw new IllegalArgumentException("Max inbound message body size must be greater than 0: "
+ maxInboundMessageBodySize);
}
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
}

/**
* The factory to create SSL contexts.
* This provides more flexibility to create {@link SSLContext}s
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -62,7 +62,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
private final int _channelNumber;

/** Command being assembled */
private AMQCommand _command = new AMQCommand();
private AMQCommand _command;

/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcWrapper _activeRpc = null;
Expand All @@ -76,6 +76,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
private final boolean _checkRpcResponseType;

private final TrafficListener _trafficListener;
private final int maxInboundMessageBodySize;

/**
* Construct a channel on the given connection, with the given channel number.
Expand All @@ -91,6 +92,8 @@ public AMQChannel(AMQConnection connection, int channelNumber) {
this._rpcTimeout = connection.getChannelRpcTimeout();
this._checkRpcResponseType = connection.willCheckRpcResponseType();
this._trafficListener = connection.getTrafficListener();
this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize();
this._command = new AMQCommand(this.maxInboundMessageBodySize);
}

/**
Expand All @@ -110,7 +113,7 @@ public int getChannelNumber() {
void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
_command = new AMQCommand(this.maxInboundMessageBodySize); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/AMQCommand.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -44,17 +44,21 @@ public class AMQCommand implements Command {
/** The assembler for this command - synchronised on - contains all the state */
private final CommandAssembler assembler;

AMQCommand(int maxBodyLength) {
this(null, null, null, maxBodyLength);
}

/** Construct a command ready to fill in by reading frames */
public AMQCommand() {
this(null, null, null);
this(null, null, null, Integer.MAX_VALUE);
}

/**
* Construct a command with just a method, and without header or body.
* @param method the wrapped method
*/
public AMQCommand(com.rabbitmq.client.Method method) {
this(method, null, null);
this(method, null, null, Integer.MAX_VALUE);
}

/**
Expand All @@ -64,7 +68,19 @@ public AMQCommand(com.rabbitmq.client.Method method) {
* @param body the message body data
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
}

/**
* Construct a command with a specified method, header and body.
* @param method the wrapped method
* @param contentHeader the wrapped content header
* @param body the message body data
* @param maxBodyLength the maximum size for an inbound message body
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
int maxBodyLength) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
}

/** Public API - {@inheritDoc} */
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public static Map<String, Object> defaultClientProperties() {
private volatile ChannelManager _channelManager;
/** Saved server properties field from connection.start */
private volatile Map<String, Object> _serverProperties;
private final int maxInboundMessageBodySize;

/**
* Protected API - respond, in the main I/O loop thread, to a ShutdownSignal.
Expand Down Expand Up @@ -244,6 +245,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics

this.credentialsRefreshService = params.getCredentialsRefreshService();


this._channel0 = createChannel0();

this._channelManager = null;
Expand All @@ -257,6 +259,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
(connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections
this.workPoolTimeout = params.getWorkPoolTimeout();
this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize();
}

AMQChannel createChannel0() {
Expand Down Expand Up @@ -1191,4 +1194,8 @@ public boolean willCheckRpcResponseType() {
public TrafficListener getTrafficListener() {
return trafficListener;
}

int getMaxInboundMessageBodySize() {
return maxInboundMessageBodySize;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
// Copyright (c) 2016-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl;

import com.rabbitmq.client.SocketConfigurator;
Expand All @@ -10,10 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory
protected final int connectionTimeout;
protected final SocketConfigurator configurator;
protected final boolean ssl;
protected final int maxInboundMessageBodySize;

protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl) {
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator,
boolean ssl, int maxInboundMessageBodySize) {
this.connectionTimeout = connectionTimeout;
this.configurator = configurator;
this.ssl = ssl;
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
}
}
22 changes: 18 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/CommandAssembler.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -21,6 +21,7 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.UnexpectedFrameError;
import static java.lang.String.format;

/**
* Class responsible for piecing together a command from a series of {@link Frame}s.
Expand Down Expand Up @@ -52,12 +53,16 @@ private enum CAState {
/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;

public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
private final int maxBodyLength;

public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
int maxBodyLength) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList<byte[]>(2);
this.bodyN = new ArrayList<>(2);
this.bodyLength = 0;
this.remainingBodyBytes = 0;
this.maxBodyLength = maxBodyLength;
appendBodyFragment(body);
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
Expand Down Expand Up @@ -99,7 +104,16 @@ private void consumeMethodFrame(Frame f) throws IOException {
private void consumeHeaderFrame(Frame f) throws IOException {
if (f.getType() == AMQP.FRAME_HEADER) {
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
this.remainingBodyBytes = this.contentHeader.getBodySize();
long bodySize = this.contentHeader.getBodySize();
if (bodySize >= this.maxBodyLength) {
throw new IllegalStateException(format(
"Message body is too large (%d), maximum configured size is %d. " +
"See ConnectionFactory#setMaxInboundMessageBodySize " +
"if you need to increase the limit.",
bodySize, this.maxBodyLength
));
}
this.remainingBodyBytes = bodySize;
updateContentBodyState();
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -63,6 +63,8 @@ public class ConnectionParams {

private CredentialsRefreshService credentialsRefreshService;

private int maxInboundMessageBodySize;

public ConnectionParams() {}

public CredentialsProvider getCredentialsProvider() {
Expand Down Expand Up @@ -296,4 +298,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe
public CredentialsRefreshService getCredentialsRefreshService() {
return credentialsRefreshService;
}

public int getMaxInboundMessageBodySize() {
return maxInboundMessageBodySize;
}

public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
}
}
13 changes: 11 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/Frame.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -25,6 +25,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import static java.lang.String.format;

/**
* Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes.
Expand Down Expand Up @@ -81,7 +82,7 @@ public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset,
*
* @return a new Frame if we read a frame successfully, otherwise null
*/
public static Frame readFrom(DataInputStream is) throws IOException {
public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException {
int type;
int channel;

Expand All @@ -107,6 +108,14 @@ public static Frame readFrom(DataInputStream is) throws IOException {

channel = is.readUnsignedShort();
int payloadSize = is.readInt();
if (payloadSize >= maxPayloadSize) {
throw new IllegalStateException(format(
"Frame body is too large (%d), maximum configured size is %d. " +
"See ConnectionFactory#setMaxInboundMessageBodySize " +
"if you need to increase the limit.",
payloadSize, maxPayloadSize
));
}
byte[] payload = new byte[payloadSize];
is.readFully(payload);

Expand Down
12 changes: 8 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -52,22 +52,26 @@ public class SocketFrameHandler implements FrameHandler {
/** Socket's outputstream - data to the broker - synchronized on */
private final DataOutputStream _outputStream;

private final int maxInboundMessageBodySize;

/** Time to linger before closing the socket forcefully. */
public static final int SOCKET_CLOSING_TIMEOUT = 1;

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket) throws IOException {
this(socket, null);
this(socket, null, Integer.MAX_VALUE);
}

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor,
int maxInboundMessageBodySize) throws IOException {
_socket = socket;
_shutdownExecutor = shutdownExecutor;
this.maxInboundMessageBodySize = maxInboundMessageBodySize;

_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
Expand Down Expand Up @@ -181,7 +185,7 @@ public void initialize(AMQConnection connection) {
@Override
public Frame readFrame() throws IOException {
synchronized (_inputStream) {
return Frame.readFrom(_inputStream);
return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize);
}
}

Expand Down
Loading