Skip to content

Commit

Permalink
Merge pull request #889 from spyrkob/UNDERTOW-1657
Browse files Browse the repository at this point in the history
[UNDERTOW-1657] Memory exhaustion issue in HttpReadListener and related fixes
  • Loading branch information
fl4via authored May 15, 2020
2 parents b5e03e5 + 30b029f commit c74b8ff
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 27 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/io/undertow/UndertowLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String
@Message(id = 5084, value = "Attempted to write %s bytes however content-length has been set to %s")
IOException dataLargerThanContentLength(long totalToWrite, long responseContentLength);

@LogMessage(level = ERROR)
@LogMessage(level = DEBUG)
@Message(id = 5085, value = "Connection %s for exchange %s was not closed cleanly, forcibly closing connection")
void responseWasNotTerminated(ServerConnection connection, HttpServerExchange exchange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public final class HttpServerExchange extends AbstractAttachable {
// mutable state

private int state = 200;
private HttpString requestMethod;
private HttpString requestMethod = HttpString.EMPTY;
private String requestScheme;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;

import org.xnio.channels.StreamSinkChannel;
import org.xnio.conduits.AbstractStreamSourceConduit;
import org.xnio.conduits.StreamSourceConduit;

import io.undertow.server.ConduitWrapper;
import io.undertow.server.protocol.http.HttpContinue;
import io.undertow.server.Connectors;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.ResponseCommitListener;
import io.undertow.server.protocol.http.HttpContinue;
import io.undertow.util.ConduitFactory;
import io.undertow.util.StatusCodes;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.conduits.AbstractStreamSourceConduit;
import org.xnio.conduits.StreamSourceConduit;

/**
* Handler for requests that require 100-continue responses. If an attempt is made to read from the source
Expand All @@ -44,7 +47,7 @@ public class HttpContinueReadHandler implements HttpHandler {
private static final ConduitWrapper<StreamSourceConduit> WRAPPER = new ConduitWrapper<StreamSourceConduit>() {
@Override
public StreamSourceConduit wrap(final ConduitFactory<StreamSourceConduit> factory, final HttpServerExchange exchange) {
if(exchange.isRequestChannelAvailable() && !exchange.isResponseStarted()) {
if (exchange.isRequestChannelAvailable() && !exchange.isResponseStarted()) {
return new ContinueConduit(factory.create(), exchange);
}
return factory.create();
Expand All @@ -61,6 +64,17 @@ public HttpContinueReadHandler(final HttpHandler handler) {
public void handleRequest(final HttpServerExchange exchange) throws Exception {
if (HttpContinue.requiresContinueResponse(exchange)) {
exchange.addRequestWrapper(WRAPPER);
exchange.addResponseCommitListener(new ResponseCommitListener() {
@Override
public void beforeCommit(HttpServerExchange exchange) {
//we are writing the response, and have not read the request then we mark this as non-persistent
if (!HttpContinue.isContinueResponseSent(exchange)) {
exchange.setPersistent(false);
//we also kill the request channel, because it is unusable now
exchange.getConnection().terminateRequestChannel(exchange);
}
}
});
}
handler.handleRequest(exchange);
}
Expand All @@ -81,6 +95,7 @@ protected ContinueConduit(final StreamSourceConduit next, final HttpServerExchan
public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
if (exchange.getStatusCode() == StatusCodes.EXPECTATION_FAILED) {
//rejected
Connectors.terminateRequest(exchange);
return -1;
}
if (!sent) {
Expand All @@ -100,6 +115,7 @@ public long transferTo(final long position, final long count, final FileChannel
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
if (exchange.getStatusCode() == StatusCodes.EXPECTATION_FAILED) {
//rejected
Connectors.terminateRequest(exchange);
return -1;
}
if (!sent) {
Expand All @@ -119,6 +135,7 @@ public long transferTo(final long count, final ByteBuffer throughBuffer, final S
public int read(final ByteBuffer dst) throws IOException {
if (exchange.getStatusCode() == StatusCodes.EXPECTATION_FAILED) {
//rejected
Connectors.terminateRequest(exchange);
return -1;
}
if (!sent) {
Expand All @@ -138,6 +155,7 @@ public int read(final ByteBuffer dst) throws IOException {
public long read(final ByteBuffer[] dsts, final int offs, final int len) throws IOException {
if (exchange.getStatusCode() == StatusCodes.EXPECTATION_FAILED) {
//rejected
Connectors.terminateRequest(exchange);
return -1;
}
if (!sent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.undertow.server.HttpServerExchange;
import io.undertow.server.SSLSessionInfo;
import io.undertow.util.DateUtils;

import org.xnio.IoUtils;
import org.xnio.OptionMap;
import io.undertow.connector.ByteBufferPool;
import org.xnio.StreamConnection;
Expand Down Expand Up @@ -61,7 +63,9 @@ public boolean isContinueResponseSupported() {

@Override
public void terminateRequestChannel(HttpServerExchange exchange) {
//todo: terminate
if (!exchange.isPersistent()) {
IoUtils.safeClose(getChannel().getSourceChannel());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public static boolean requiresContinueResponse(HeaderMap requestHeaders) {
return false;
}

public static boolean isContinueResponseSent(HttpServerExchange exchange) {
return exchange.getAttachment(ALREADY_SENT) != null;
}

/**
* Sends a continuation using async IO, and calls back when it is complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ public void exchangeComplete(final HttpServerExchange exchange) {
}
}
} else if (!exchange.isPersistent()) {
if (connection.getExtraBytes() != null) {
connection.getExtraBytes().close();
connection.setExtraBytes(null);
}
ConnectionUtils.cleanClose(connection.getChannel(), connection);
} else {
//upgrade or connect handling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,33 @@

package io.undertow.server.protocol.http;

import io.undertow.UndertowMessages;
import io.undertow.server.Connectors;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.HeaderValues;
import io.undertow.util.HttpString;
import io.undertow.util.Protocols;
import io.undertow.util.StatusCodes;
import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;

import org.xnio.Buffers;
import org.xnio.IoUtils;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.AbstractStreamSinkConduit;
import org.xnio.conduits.ConduitWritableByteChannel;
import org.xnio.conduits.Conduits;
import org.xnio.conduits.StreamSinkConduit;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;

import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;
import io.undertow.UndertowMessages;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.Connectors;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.HeaderValues;
import io.undertow.util.HttpString;
import io.undertow.util.Protocols;
import io.undertow.util.StatusCodes;

/**
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
Expand Down Expand Up @@ -290,6 +291,13 @@ private void bufferDone() {
}
}

public void freeContinueResponse() {
if (pooledBuffer != null) {
pooledBuffer.close();
pooledBuffer = null;
}
}

private static void writeString(ByteBuffer buffer, String string) {
int length = string.length();
for (int charIndex = 0; charIndex < length; charIndex++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.undertow.util.HttpString;
import io.undertow.util.ImmediatePooledByteBuffer;
import io.undertow.util.Methods;

import org.xnio.IoUtils;
import org.xnio.OptionMap;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
Expand Down Expand Up @@ -111,7 +113,15 @@ public HttpServerExchange sendOutOfBandResponse(HttpServerExchange exchange) {
@Override
public StreamSinkConduit wrap(ConduitFactory<StreamSinkConduit> factory, HttpServerExchange exchange) {

ServerFixedLengthStreamSinkConduit fixed = new ServerFixedLengthStreamSinkConduit(new HttpResponseConduit(getSinkChannel().getConduit(), getByteBufferPool(), HttpServerConnection.this, exchange), false, false);
HttpResponseConduit httpResponseConduit = new HttpResponseConduit(getSinkChannel().getConduit(), getByteBufferPool(), HttpServerConnection.this, exchange);
exchange.addExchangeCompleteListener(new ExchangeCompletionListener() {
@Override
public void exchangeEvent(HttpServerExchange exchange, NextListener nextListener) {
httpResponseConduit.freeContinueResponse();
nextListener.proceed();
}
});
ServerFixedLengthStreamSinkConduit fixed = new ServerFixedLengthStreamSinkConduit(httpResponseConduit, false, false);
fixed.reset(0, exchange);
return fixed;
}
Expand All @@ -135,7 +145,9 @@ public boolean isContinueResponseSupported() {

@Override
public void terminateRequestChannel(HttpServerExchange exchange) {

if (!exchange.isPersistent()) {
IoUtils.safeClose(getChannel().getSourceChannel());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import io.undertow.util.Methods;
import io.undertow.util.StatusCodes;

import org.jboss.logging.Logger;
import org.xnio.conduits.ConduitStreamSourceChannel;
import org.xnio.conduits.StreamSinkConduit;
Expand Down Expand Up @@ -224,6 +226,11 @@ static StreamSinkConduit createSinkConduit(final HttpServerExchange exchange) {
final HeaderMap responseHeaders = exchange.getResponseHeaders();
// test to see if we're still persistent
String connection = responseHeaders.getFirst(Headers.CONNECTION);
if(exchange.getStatusCode() == StatusCodes.EXPECTATION_FAILED) {
//417 responses are never persistent, as we have no idea if there is a response body
//still coming on the wire.
exchange.setPersistent(false);
}
if (!exchange.isPersistent()) {
responseHeaders.put(Headers.CONNECTION, Headers.CLOSE.toString());
} else if (exchange.isPersistent() && connection != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2020 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.undertow.server.handlers;

import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.testutils.DefaultServer;
import io.undertow.util.StatusCodes;

/**
* @author Stuart Douglas
*/
@RunWith(DefaultServer.class)
public class HttpContinueConduitWrappingHandlerBufferLeakTestCase {

static Socket persistentSocket;

@BeforeClass
public static void setup() {
final BlockingHandler blockingHandler = new BlockingHandler();
final HttpContinueReadHandler handler = new HttpContinueReadHandler(blockingHandler);
DefaultServer.setRootHandler(handler);
blockingHandler.setRootHandler(new HttpHandler() {
@Override
public void handleRequest(final HttpServerExchange exchange) {
try {
if (exchange.getQueryParameters().containsKey("reject")) {
exchange.getRequestChannel();
exchange.setStatusCode(StatusCodes.EXPECTATION_FAILED);
exchange.getOutputStream().close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}

@Before
public void before() {
Assume.assumeFalse(DefaultServer.isAjp());
}

@Test
public void testHttpContinueRejectedBodySentAnywayNoBufferLeak() throws IOException {
persistentSocket = new Socket(DefaultServer.getHostAddress(), DefaultServer.getHostPort());

String message = "POST /path?reject=true HTTP/1.1\r\n" +
"Expect: 100-continue\r\n" +
"Content-Length: 16\r\n" +
"Content-Type: text/plain; charset=ISO-8859-1\r\n" +
"Host: localhost:7777\r\n" +
"Connection: Keep-Alive\r\n\r\nMy HTTP Request!";
persistentSocket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
persistentSocket.getOutputStream().flush();
persistentSocket.getInputStream().read();
}

@Test
public void testHttpContinueBodySentAnywayNoLeak() throws IOException {
persistentSocket = new Socket(DefaultServer.getHostAddress(), DefaultServer.getHostPort());

String message = "POST /path HTTP/1.1\r\n" +
"Expect: 100-continue\r\n" +
"Content-Length: 16\r\n" +
"Content-Type: text/plain; charset=ISO-8859-1\r\n" +
"Host: localhost:7777\r\n" +
"Connection: Keep-Alive\r\n\r\nMy HTTP Request!";
persistentSocket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
persistentSocket.getOutputStream().flush();
persistentSocket.getInputStream().read();
}

}
Loading

0 comments on commit c74b8ff

Please sign in to comment.