-
Notifications
You must be signed in to change notification settings - Fork 113
Small Performance Improvements #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
// | ||
// This source file is part of the SwiftAWSLambdaRuntime open source project | ||
// | ||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
|
@@ -97,9 +97,16 @@ internal final class HTTPClient { | |
private func connect() -> EventLoopFuture<Channel> { | ||
let bootstrap = ClientBootstrap(group: self.eventLoop) | ||
.channelInitializer { channel in | ||
channel.pipeline.addHTTPClientHandlers().flatMap { | ||
channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.configuration.keepAlive), | ||
UnaryHandler(keepAlive: self.configuration.keepAlive)]) | ||
do { | ||
try channel.pipeline.syncOperations.addHTTPClientHandlers() | ||
// Lambda quotas... An invocation payload is maximal 6MB in size: | ||
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html | ||
try channel.pipeline.syncOperations.addHandler( | ||
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)) | ||
try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler()) | ||
return channel.eventLoop.makeSucceededFuture(()) | ||
} catch { | ||
return channel.eventLoop.makeFailedFuture(error) | ||
} | ||
} | ||
|
||
|
@@ -131,10 +138,10 @@ internal final class HTTPClient { | |
} | ||
|
||
internal struct Response: Equatable { | ||
public var version: HTTPVersion | ||
public var status: HTTPResponseStatus | ||
public var headers: HTTPHeaders | ||
public var body: ByteBuffer? | ||
var version: HTTPVersion | ||
var status: HTTPResponseStatus | ||
var headers: HTTPHeaders | ||
var body: ByteBuffer? | ||
} | ||
|
||
internal enum Errors: Error { | ||
|
@@ -149,133 +156,77 @@ internal final class HTTPClient { | |
} | ||
} | ||
|
||
private final class HTTPHandler: ChannelDuplexHandler { | ||
typealias OutboundIn = HTTPClient.Request | ||
typealias InboundOut = HTTPClient.Response | ||
typealias InboundIn = HTTPClientResponsePart | ||
// no need in locks since we validate only one request can run at a time | ||
private final class LambdaChannelHandler: ChannelDuplexHandler { | ||
typealias InboundIn = NIOHTTPClientResponseFull | ||
typealias OutboundIn = HTTPRequestWrapper | ||
typealias OutboundOut = HTTPClientRequestPart | ||
|
||
private let keepAlive: Bool | ||
private var readState: ReadState = .idle | ||
|
||
init(keepAlive: Bool) { | ||
self.keepAlive = keepAlive | ||
} | ||
|
||
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) { | ||
let request = unwrapOutboundIn(data) | ||
|
||
var head = HTTPRequestHead(version: .init(major: 1, minor: 1), method: request.method, uri: request.url, headers: request.headers) | ||
head.headers.add(name: "host", value: request.targetHost) | ||
switch request.method { | ||
case .POST, .PUT: | ||
head.headers.add(name: "content-length", value: String(request.body?.readableBytes ?? 0)) | ||
default: | ||
break | ||
} | ||
|
||
// We don't add a "Connection" header here if we want to keep the connection open, | ||
// HTTP/1.1 defines specifies the following in RFC 2616, Section 8.1.2.1: | ||
// | ||
// An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to | ||
// maintain a persistent connection unless a Connection header including | ||
// the connection-token "close" was sent in the request. If the server | ||
// chooses to close the connection immediately after sending the | ||
// response, it SHOULD send a Connection header including the | ||
// connection-token close. | ||
// | ||
// See also UnaryHandler.channelRead below. | ||
if !self.keepAlive { | ||
head.headers.add(name: "connection", value: "close") | ||
} | ||
|
||
context.write(self.wrapOutboundOut(HTTPClientRequestPart.head(head))).flatMap { _ -> EventLoopFuture<Void> in | ||
if let body = request.body { | ||
return context.writeAndFlush(self.wrapOutboundOut(HTTPClientRequestPart.body(.byteBuffer(body)))) | ||
} else { | ||
context.flush() | ||
return context.eventLoop.makeSucceededFuture(()) | ||
} | ||
}.cascade(to: promise) | ||
} | ||
|
||
func channelRead(context: ChannelHandlerContext, data: NIOAny) { | ||
let response = unwrapInboundIn(data) | ||
|
||
switch response { | ||
case .head(let head): | ||
guard case .idle = self.readState else { | ||
preconditionFailure("invalid read state \(self.readState)") | ||
} | ||
self.readState = .head(head) | ||
case .body(var bodyPart): | ||
switch self.readState { | ||
case .head(let head): | ||
self.readState = .body(head, bodyPart) | ||
case .body(let head, var body): | ||
body.writeBuffer(&bodyPart) | ||
self.readState = .body(head, body) | ||
Comment on lines
-215
to
-217
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a fairly large performance issue here... Since case .body(let head, var body):
self.readState = .idle // removes the self.readState reference. Therefore no Cow necessary
body.writeBuffer(&bodyPart)
self.readState = .body(head, body) However since October of last year NIO has a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moving to |
||
default: | ||
preconditionFailure("invalid read state \(self.readState)") | ||
} | ||
case .end: | ||
switch self.readState { | ||
case .head(let head): | ||
context.fireChannelRead(wrapInboundOut(HTTPClient.Response(version: head.version, status: head.status, headers: head.headers, body: nil))) | ||
self.readState = .idle | ||
case .body(let head, let body): | ||
context.fireChannelRead(wrapInboundOut(HTTPClient.Response(version: head.version, status: head.status, headers: head.headers, body: body))) | ||
self.readState = .idle | ||
default: | ||
preconditionFailure("invalid read state \(self.readState)") | ||
} | ||
} | ||
} | ||
|
||
private enum ReadState { | ||
enum State { | ||
case idle | ||
case head(HTTPResponseHead) | ||
case body(HTTPResponseHead, ByteBuffer) | ||
case running(promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?) | ||
case waitForConnectionClose(HTTPClient.Response, EventLoopPromise<HTTPClient.Response>) | ||
} | ||
} | ||
|
||
// no need in locks since we validate only one request can run at a time | ||
private final class UnaryHandler: ChannelDuplexHandler { | ||
typealias OutboundIn = HTTPRequestWrapper | ||
typealias InboundIn = HTTPClient.Response | ||
typealias OutboundOut = HTTPClient.Request | ||
|
||
private let keepAlive: Bool | ||
|
||
private var pending: (promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)? | ||
private var state: State = .idle | ||
private var lastError: Error? | ||
|
||
init(keepAlive: Bool) { | ||
self.keepAlive = keepAlive | ||
} | ||
init() {} | ||
|
||
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) { | ||
guard self.pending == nil else { | ||
guard case .idle = self.state else { | ||
preconditionFailure("invalid state, outstanding request") | ||
} | ||
let wrapper = unwrapOutboundIn(data) | ||
|
||
var head = HTTPRequestHead( | ||
version: .http1_1, | ||
method: wrapper.request.method, | ||
uri: wrapper.request.url, | ||
headers: wrapper.request.headers | ||
) | ||
head.headers.add(name: "host", value: wrapper.request.targetHost) | ||
switch head.method { | ||
case .POST, .PUT: | ||
head.headers.add(name: "content-length", value: String(wrapper.request.body?.readableBytes ?? 0)) | ||
default: | ||
break | ||
} | ||
|
||
let timeoutTask = wrapper.request.timeout.map { | ||
context.eventLoop.scheduleTask(in: $0) { | ||
if self.pending != nil { | ||
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout) | ||
guard case .running = self.state else { | ||
preconditionFailure("invalid state") | ||
} | ||
|
||
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout) | ||
} | ||
} | ||
self.pending = (promise: wrapper.promise, timeout: timeoutTask) | ||
context.writeAndFlush(wrapOutboundOut(wrapper.request), promise: promise) | ||
self.state = .running(promise: wrapper.promise, timeout: timeoutTask) | ||
|
||
context.write(wrapOutboundOut(.head(head)), promise: nil) | ||
if let body = wrapper.request.body { | ||
context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil) | ||
} | ||
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: promise) | ||
} | ||
|
||
func channelRead(context: ChannelHandlerContext, data: NIOAny) { | ||
let response = unwrapInboundIn(data) | ||
guard let pending = self.pending else { | ||
guard case .running(let promise, let timeout) = self.state else { | ||
preconditionFailure("invalid state, no pending request") | ||
} | ||
|
||
let response = unwrapInboundIn(data) | ||
|
||
let httpResponse = HTTPClient.Response( | ||
version: response.head.version, | ||
status: response.head.status, | ||
headers: response.head.headers, | ||
body: response.body | ||
) | ||
|
||
timeout?.cancel() | ||
|
||
// As defined in RFC 7230 Section 6.3: | ||
// HTTP/1.1 defaults to the use of "persistent connections", allowing | ||
// multiple requests and responses to be carried over a single | ||
|
@@ -285,14 +236,31 @@ private final class UnaryHandler: ChannelDuplexHandler { | |
// | ||
// That's why we only assume the connection shall be closed if we receive | ||
// a "connection = close" header. | ||
let serverCloseConnection = response.headers.first(name: "connection")?.lowercased() == "close" | ||
|
||
if !self.keepAlive || serverCloseConnection || response.version != .init(major: 1, minor: 1) { | ||
pending.promise.futureResult.whenComplete { _ in | ||
_ = context.channel.close() | ||
} | ||
let serverCloseConnection = | ||
response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) | ||
|
||
let closeConnection = serverCloseConnection || response.head.version != .http1_1 | ||
|
||
if closeConnection { | ||
// If we were succeeding the request promise here directly and closing the connection | ||
// after succeeding the promise we may run into a race condition: | ||
// | ||
// The lambda runtime will ask for the next work item directly after a succeeded post | ||
// response request. The desire for the next work item might be faster than the attempt | ||
// to close the connection. This will lead to a situation where we try to the connection | ||
// but the next request has already been scheduled on the connection that we want to | ||
// close. For this reason we postpone succeeding the promise until the connection has | ||
// been closed. This codepath will only be hit in the very, very unlikely event of the | ||
// Lambda control plane demanding to close connection. (It's more or less only | ||
// implemented to support http1.1 correctly.) This behavior is ensured with the test | ||
// `LambdaTest.testNoKeepAliveServer`. | ||
self.state = .waitForConnectionClose(httpResponse, promise) | ||
_ = context.channel.close() | ||
return | ||
} else { | ||
self.state = .idle | ||
promise.succeed(httpResponse) | ||
} | ||
self.completeWith(.success(response)) | ||
} | ||
|
||
func errorCaught(context: ChannelHandlerContext, error: Error) { | ||
|
@@ -303,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler { | |
|
||
func channelInactive(context: ChannelHandlerContext) { | ||
// fail any pending responses with last error or assume peer disconnected | ||
if self.pending != nil { | ||
let error = self.lastError ?? HTTPClient.Errors.connectionResetByPeer | ||
self.completeWith(.failure(error)) | ||
} | ||
context.fireChannelInactive() | ||
|
||
switch self.state { | ||
case .idle: | ||
break | ||
case .running(let promise, let timeout): | ||
self.state = .idle | ||
timeout?.cancel() | ||
promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer) | ||
|
||
case .waitForConnectionClose(let response, let promise): | ||
self.state = .idle | ||
promise.succeed(response) | ||
} | ||
} | ||
|
||
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) { | ||
switch event { | ||
case is RequestCancelEvent: | ||
if self.pending != nil { | ||
self.completeWith(.failure(HTTPClient.Errors.cancelled)) | ||
switch self.state { | ||
case .idle: | ||
break | ||
case .running(let promise, let timeout): | ||
self.state = .idle | ||
timeout?.cancel() | ||
promise.fail(HTTPClient.Errors.cancelled) | ||
|
||
// after the cancel error has been send, we want to close the connection so | ||
// that no more packets can be read on this connection. | ||
_ = context.channel.close() | ||
case .waitForConnectionClose(_, let promise): | ||
self.state = .idle | ||
promise.fail(HTTPClient.Errors.cancelled) | ||
} | ||
default: | ||
context.triggerUserOutboundEvent(event, promise: promise) | ||
} | ||
} | ||
|
||
private func completeWith(_ result: Result<HTTPClient.Response, Error>) { | ||
guard let pending = self.pending else { | ||
preconditionFailure("invalid state, no pending request") | ||
} | ||
self.pending = nil | ||
self.lastError = nil | ||
pending.timeout?.cancel() | ||
pending.promise.completeWith(result) | ||
} | ||
} | ||
|
||
private struct HTTPRequestWrapper { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
// | ||
// This source file is part of the SwiftAWSLambdaRuntime open source project | ||
// | ||
// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
|
@@ -64,7 +64,6 @@ extension Lambda { | |
struct RuntimeEngine: CustomStringConvertible { | ||
let ip: String | ||
let port: Int | ||
let keepAlive: Bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have removed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the intent was to be able and force closed connection by sending that header from the client, but this is fine. |
||
let requestTimeout: TimeAmount? | ||
|
||
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) { | ||
|
@@ -74,12 +73,11 @@ extension Lambda { | |
} | ||
self.ip = String(ipPort[0]) | ||
self.port = port | ||
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true | ||
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) } | ||
} | ||
|
||
var description: String { | ||
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout))" | ||
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))" | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
// | ||
// This source file is part of the SwiftAWSLambdaRuntime open source project | ||
// | ||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
|
@@ -30,7 +30,7 @@ internal final class MockLambdaServer { | |
private var shutdown = false | ||
|
||
public init(behavior: LambdaServerBehavior, host: String = "127.0.0.1", port: Int = 7000, keepAlive: Bool = true) { | ||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes Xcodes debug view much easier to deal with! And we only need a single core for the MockServer anyway. |
||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
self.behavior = behavior | ||
self.host = host | ||
self.port = port | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cleanup change.
public
access control in ainternal struct
doesn't make much sense.