Skip to content

Handle ResponseAccumulator not being able to buffer large response in memory #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 10, 2022
63 changes: 61 additions & 2 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ extension HTTPClient {
///
/// This ``HTTPClientResponseDelegate`` buffers a complete HTTP response in memory. It does not stream the response body in.
/// The resulting ``Response`` type is ``HTTPClient/Response``.
public class ResponseAccumulator: HTTPClientResponseDelegate {
public final class ResponseAccumulator: HTTPClientResponseDelegate {
public typealias Response = HTTPClient.Response

enum State {
Expand All @@ -367,16 +367,63 @@ public class ResponseAccumulator: HTTPClientResponseDelegate {
case error(Error)
}

public struct ResponseTooBigError: Error, CustomStringConvertible {
public var maxBodySize: Int
public init(maxBodySize: Int) {
self.maxBodySize = maxBodySize
}

public var description: String {
return "ResponseTooBigError: received response body exceeds maximum accepted size of \(self.maxBodySize) bytes"
}
}

var state = State.idle
let request: HTTPClient.Request

public init(request: HTTPClient.Request) {
static let maxByteBufferSize = Int(UInt32.max)

/// Maximum size in bytes of the HTTP response body that ``ResponseAccumulator`` will accept
/// until it will abort the request and throw an ``ResponseTooBigError``.
///
/// Default is 2^32.
/// - precondition: not allowed to exceed 2^32 because ``ByteBuffer`` can not store more bytes
public let maxBodySize: Int

public convenience init(request: HTTPClient.Request) {
self.init(request: request, maxBodySize: Self.maxByteBufferSize)
}

/// - Parameters:
/// - request: The corresponding request of the response this delegate will be accumulating.
/// - maxBodySize: Maximum size in bytes of the HTTP response body that ``ResponseAccumulator`` will accept
/// until it will abort the request and throw an ``ResponseTooBigError``.
/// Default is 2^32.
/// - precondition: maxBodySize is not allowed to exceed 2^32 because ``ByteBuffer`` can not store more bytes
/// - warning: You can use ``ResponseAccumulator`` for just one request.
/// If you start another request, you need to initiate another ``ResponseAccumulator``.
public init(request: HTTPClient.Request, maxBodySize: Int) {
precondition(maxBodySize >= 0, "maxBodyLength is not allowed to be negative")
precondition(
maxBodySize <= Self.maxByteBufferSize,
"maxBodyLength is not allowed to exceed 2^32 because ByteBuffer can not store more bytes"
)
self.request = request
self.maxBodySize = maxBodySize
}

public func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
switch self.state {
case .idle:
if self.request.method != .HEAD,
let contentLength = head.headers.first(name: "Content-Length"),
let announcedBodySize = Int(contentLength),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be tolerating the possibility of a non-integer Content-Length field here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

announcedBodySize > self.maxBodySize {
let error = ResponseTooBigError(maxBodySize: maxBodySize)
self.state = .error(error)
return task.eventLoop.makeFailedFuture(error)
}

self.state = .head(head)
case .head:
preconditionFailure("head already set")
Expand All @@ -395,8 +442,20 @@ public class ResponseAccumulator: HTTPClientResponseDelegate {
case .idle:
preconditionFailure("no head received before body")
case .head(let head):
guard part.readableBytes <= self.maxBodySize else {
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
self.state = .error(error)
return task.eventLoop.makeFailedFuture(error)
}
self.state = .body(head, part)
case .body(let head, var body):
let newBufferSize = body.writerIndex + part.readableBytes
guard newBufferSize <= self.maxBodySize else {
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
self.state = .error(error)
return task.eventLoop.makeFailedFuture(error)
}

// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's
// a cross-module call in the way) so we need to drop the original reference to `body` in
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which
Expand Down
31 changes: 31 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,18 @@ internal final class HTTPBin<RequestHandler: ChannelInboundHandler> where
return self.serverChannel.localAddress!
}

var baseURL: String {
let scheme: String = {
switch mode {
case .http1_1, .refuse:
return "http"
case .http2:
return "https"
}
}()
return "\(scheme)://localhost:\(self.port)/"
}

private let mode: Mode
private let sslContext: NIOSSLContext?
private var serverChannel: Channel!
Expand Down Expand Up @@ -1319,6 +1331,25 @@ class HTTPEchoHandler: ChannelInboundHandler {
}
}

final class HTTPEchoHeaders: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let request = self.unwrapInboundIn(data)
switch request {
case .head(let requestHead):
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok, headers: requestHead.headers))), promise: nil)
case .body:
break
case .end:
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenSuccess {
context.close(promise: nil)
}
}
}
}

final class HTTP200DelayedHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart
Expand Down
5 changes: 5 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ extension HTTPClientTests {
("testSSLHandshakeErrorPropagationDelayedClose", testSSLHandshakeErrorPropagationDelayedClose),
("testWeCloseConnectionsWhenConnectionCloseSetByServer", testWeCloseConnectionsWhenConnectionCloseSetByServer),
("testBiDirectionalStreaming", testBiDirectionalStreaming),
("testResponseAccumulatorMaxBodySizeLimitExceedingWithContentLength", testResponseAccumulatorMaxBodySizeLimitExceedingWithContentLength),
("testResponseAccumulatorMaxBodySizeLimitNotExceedingWithContentLength", testResponseAccumulatorMaxBodySizeLimitNotExceedingWithContentLength),
("testResponseAccumulatorMaxBodySizeLimitExceedingWithContentLengthButMethodIsHead", testResponseAccumulatorMaxBodySizeLimitExceedingWithContentLengthButMethodIsHead),
("testResponseAccumulatorMaxBodySizeLimitExceedingWithTransferEncodingChuncked", testResponseAccumulatorMaxBodySizeLimitExceedingWithTransferEncodingChuncked),
("testResponseAccumulatorMaxBodySizeLimitNotExceedingWithTransferEncodingChuncked", testResponseAccumulatorMaxBodySizeLimitNotExceedingWithTransferEncodingChuncked),
("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200),
("testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests", testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests),
("testCloseConnectionAfterEarly2XXWhenStreaming", testCloseConnectionAfterEarly2XXWhenStreaming),
Expand Down
88 changes: 86 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2677,8 +2677,8 @@ class HTTPClientTests: XCTestCase {
let delegate = TestDelegate()

XCTAssertThrowsError(try httpClient.execute(request: request, delegate: delegate).wait()) {
XCTAssertEqual(.connectTimeout, $0 as? HTTPClientError)
XCTAssertEqual(.connectTimeout, delegate.error as? HTTPClientError)
XCTAssertEqualTypeAndValue($0, HTTPClientError.connectTimeout)
XCTAssertEqualTypeAndValue(delegate.error, HTTPClientError.connectTimeout)
}
}

Expand Down Expand Up @@ -3092,6 +3092,90 @@ class HTTPClientTests: XCTestCase {
XCTAssertNil(try delegate.next().wait())
}

func testResponseAccumulatorMaxBodySizeLimitExceedingWithContentLength() throws {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let body = ByteBuffer(bytes: 0..<11)

var request = try Request(url: httpBin.baseURL)
request.body = .byteBuffer(body)
XCTAssertThrowsError(try self.defaultClient.execute(
request: request,
delegate: ResponseAccumulator(request: request, maxBodySize: 10)
).wait()) { error in
XCTAssertTrue(error is ResponseAccumulator.ResponseTooBigError, "unexpected error \(error)")
}
}

func testResponseAccumulatorMaxBodySizeLimitNotExceedingWithContentLength() throws {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let body = ByteBuffer(bytes: 0..<10)

var request = try Request(url: httpBin.baseURL)
request.body = .byteBuffer(body)
let response = try self.defaultClient.execute(
request: request,
delegate: ResponseAccumulator(request: request, maxBodySize: 10)
).wait()

XCTAssertEqual(response.body, body)
}

func testResponseAccumulatorMaxBodySizeLimitExceedingWithContentLengthButMethodIsHead() throws {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHeaders() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let body = ByteBuffer(bytes: 0..<11)

var request = try Request(url: httpBin.baseURL, method: .HEAD)
request.body = .byteBuffer(body)
let response = try self.defaultClient.execute(
request: request,
delegate: ResponseAccumulator(request: request, maxBodySize: 10)
).wait()

XCTAssertEqual(response.body ?? ByteBuffer(), ByteBuffer())
}

func testResponseAccumulatorMaxBodySizeLimitExceedingWithTransferEncodingChuncked() throws {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let body = ByteBuffer(bytes: 0..<11)

var request = try Request(url: httpBin.baseURL)
request.body = .stream { writer in
writer.write(.byteBuffer(body))
}
XCTAssertThrowsError(try self.defaultClient.execute(
request: request,
delegate: ResponseAccumulator(request: request, maxBodySize: 10)
).wait()) { error in
XCTAssertTrue(error is ResponseAccumulator.ResponseTooBigError, "unexpected error \(error)")
}
}

func testResponseAccumulatorMaxBodySizeLimitNotExceedingWithTransferEncodingChuncked() throws {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let body = ByteBuffer(bytes: 0..<10)

var request = try Request(url: httpBin.baseURL)
request.body = .stream { writer in
writer.write(.byteBuffer(body))
}
let response = try self.defaultClient.execute(
request: request,
delegate: ResponseAccumulator(request: request, maxBodySize: 10)
).wait()

XCTAssertEqual(response.body, body)
}

// In this test, we test that a request can continue to stream its body after the response head and end
// was received where the end is a 200.
func testBiDirectionalStreamingEarly200() {
Expand Down