Skip to content

New runtime client #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 4, 2024
Prev Previous commit
Next Next commit
Fixing soundness
  • Loading branch information
fabianfett committed Sep 4, 2024
commit dad7333a538deda1bba58c717299bbdfdeffd272
165 changes: 87 additions & 78 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
//
//===----------------------------------------------------------------------===//

import Logging
import NIOCore
import NIOHTTP1
import NIOPosix
import Logging
import _NIOBase64

final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
Expand All @@ -36,15 +36,15 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
func write(_ buffer: NIOCore.ByteBuffer) async throws {
try await self.runtimeClient.write(buffer)
}

func finish() async throws {
try await self.runtimeClient.finish()
}

func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws {
try await self.runtimeClient.writeAndFinish(buffer)
}

func reportError(_ error: any Error) async throws {
try await self.runtimeClient.reportError(error)
}
Expand Down Expand Up @@ -120,7 +120,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
case .connecting(var array):
// Since we do get sequential invocations this case normally should never be hit.
// We'll support it anyway.
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<LambdaChannelHandler, any Error>) in
return try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<LambdaChannelHandler, any Error>) in
array.append(continuation)
self.connectionState = .connecting(array)
}
Expand Down Expand Up @@ -211,16 +212,17 @@ private final class LambdaChannelHandler {
func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
switch self.state {
case .connected(let context, .idle):
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Invocation, any Error>) in
return try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<Invocation, any Error>) in
self.state = .connected(context, .waitingForNextInvocation(continuation))
self.sendNextRequest(context: context)
}

case .connected(_, .closing),
.connected(_, .sendingResponse),
.connected(_, .sentResponse),
.connected(_, .waitingForNextInvocation),
.connected(_, .waitingForResponse):
.connected(_, .sendingResponse),
.connected(_, .sentResponse),
.connected(_, .waitingForNextInvocation),
.connected(_, .waitingForResponse):
fatalError()

case .disconnected:
Expand All @@ -232,7 +234,7 @@ private final class LambdaChannelHandler {
func reportError(isolation: isolated (any Actor)? = #isolation, _ error: any Error) async throws {
switch self.state {
case .connected(_, .idle(.none)),
.connected(_, .waitingForNextInvocation):
.connected(_, .waitingForNextInvocation):
fatalError("Invalid state: \(self.state)")

case .connected(let context, .waitingForResponse(let requestID)):
Expand All @@ -248,14 +250,17 @@ private final class LambdaChannelHandler {
}

case .connected(_, .idle(previousRequestID: .some(let requestID))),
.connected(_, .sentResponse(let requestID, _)):
.connected(_, .sentResponse(let requestID, _)):
// The final response has already been sent. The only way to report the unhandled error
// now is to log it. Normally this library never logs higher than debug, we make an
// exception here, as there is no other way of reporting the error otherwise.
self.logger.error("Unhandled error after stream has finished", metadata: [
"lambda_request_id": "\(requestID)",
"lambda_error": "\(String(describing: error))"
])
self.logger.error(
"Unhandled error after stream has finished",
metadata: [
"lambda_request_id": "\(requestID)",
"lambda_error": "\(String(describing: error))",
]
)

case .disconnected, .connected(_, .closing):
// TODO: throw error here
Expand All @@ -266,7 +271,7 @@ private final class LambdaChannelHandler {
func writeResponseBodyPart(isolation: isolated (any Actor)? = #isolation, _ byteBuffer: ByteBuffer) async throws {
switch self.state {
case .connected(_, .idle(.none)),
.connected(_, .waitingForNextInvocation):
.connected(_, .waitingForNextInvocation):
fatalError("Invalid state: \(self.state)")

case .connected(let context, .waitingForResponse(let requestID)):
Expand All @@ -277,11 +282,10 @@ private final class LambdaChannelHandler {
try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: nil, context: context)

case .connected(_, .idle(previousRequestID: .some(let requestID))),
.connected(_, .sentResponse(let requestID, _)):
.connected(_, .sentResponse(let requestID, _)):
// TODO: throw error here – user tries to write after the stream has been finished
fatalError()


case .disconnected, .connected(_, .closing):
// TODO: throw error here
fatalError()
Expand All @@ -291,7 +295,7 @@ private final class LambdaChannelHandler {
func finishResponseRequest(isolation: isolated (any Actor)? = #isolation, finalData: ByteBuffer?) async throws {
switch self.state {
case .connected(_, .idle(.none)),
.connected(_, .waitingForNextInvocation):
.connected(_, .waitingForNextInvocation):
fatalError("Invalid state: \(self.state)")

case .connected(let context, .waitingForResponse(let requestID)):
Expand All @@ -307,11 +311,10 @@ private final class LambdaChannelHandler {
}

case .connected(_, .idle(previousRequestID: .some(let requestID))),
.connected(_, .sentResponse(let requestID, _)):
.connected(_, .sentResponse(let requestID, _)):
// TODO: throw error here – user tries to write after the stream has been finished
fatalError()


case .disconnected, .connected(_, .closing):
// TODO: throw error here
fatalError()
Expand Down Expand Up @@ -355,14 +358,15 @@ private final class LambdaChannelHandler {
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length
let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix

let headers: HTTPHeaders = if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
LambdaRuntimeClient.streamingHeaders
}
let headers: HTTPHeaders =
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
LambdaRuntimeClient.streamingHeaders
}

let httpRequest = HTTPRequestHead(
version: .http1_1,
Expand All @@ -383,7 +387,12 @@ private final class LambdaChannelHandler {
}

private func sendNextRequest(context: ChannelHandlerContext) {
let httpRequest = HTTPRequestHead(version: .http1_1, method: .GET, uri: self.nextInvocationPath, headers: LambdaRuntimeClient.defaultHeaders)
let httpRequest = HTTPRequestHead(
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: LambdaRuntimeClient.defaultHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
Expand Down Expand Up @@ -478,40 +487,40 @@ extension LambdaChannelHandler: ChannelInboundHandler {
break
}

// // As defined in RFC 7230 Section 6.3:
// // HTTP/1.1 defaults to the use of "persistent connections", allowing
// // multiple requests and responses to be carried over a single
// // connection. The "close" connection option is used to signal that a
// // connection will not persist after the current request/response. HTTP
// // implementations SHOULD support persistent connections.
// //
// // That's why we only assume the connection shall be closed if we receive
// // a "connection = close" header.
// let serverCloseConnection =
// response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
//
// let closeConnection = serverCloseConnection || response.head.version != .http1_1
//
// if closeConnection {
// // If we were succeeding the request promise here directly and closing the connection
// // after succeeding the promise we may run into a race condition:
// //
// // The lambda runtime will ask for the next work item directly after a succeeded post
// // response request. The desire for the next work item might be faster than the attempt
// // to close the connection. This will lead to a situation where we try to the connection
// // but the next request has already been scheduled on the connection that we want to
// // close. For this reason we postpone succeeding the promise until the connection has
// // been closed. This codepath will only be hit in the very, very unlikely event of the
// // Lambda control plane demanding to close connection. (It's more or less only
// // implemented to support http1.1 correctly.) This behavior is ensured with the test
// // `LambdaTest.testNoKeepAliveServer`.
// self.state = .waitForConnectionClose(httpResponse, promise)
// _ = context.channel.close()
// return
// } else {
// self.state = .idle
// promise.succeed(httpResponse)
// }
// // As defined in RFC 7230 Section 6.3:
// // HTTP/1.1 defaults to the use of "persistent connections", allowing
// // multiple requests and responses to be carried over a single
// // connection. The "close" connection option is used to signal that a
// // connection will not persist after the current request/response. HTTP
// // implementations SHOULD support persistent connections.
// //
// // That's why we only assume the connection shall be closed if we receive
// // a "connection = close" header.
// let serverCloseConnection =
// response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
//
// let closeConnection = serverCloseConnection || response.head.version != .http1_1
//
// if closeConnection {
// // If we were succeeding the request promise here directly and closing the connection
// // after succeeding the promise we may run into a race condition:
// //
// // The lambda runtime will ask for the next work item directly after a succeeded post
// // response request. The desire for the next work item might be faster than the attempt
// // to close the connection. This will lead to a situation where we try to the connection
// // but the next request has already been scheduled on the connection that we want to
// // close. For this reason we postpone succeeding the promise until the connection has
// // been closed. This codepath will only be hit in the very, very unlikely event of the
// // Lambda control plane demanding to close connection. (It's more or less only
// // implemented to support http1.1 correctly.) This behavior is ensured with the test
// // `LambdaTest.testNoKeepAliveServer`.
// self.state = .waitForConnectionClose(httpResponse, promise)
// _ = context.channel.close()
// return
// } else {
// self.state = .idle
// promise.succeed(httpResponse)
// }
}

func errorCaught(context: ChannelHandlerContext, error: Error) {
Expand All @@ -524,19 +533,19 @@ extension LambdaChannelHandler: ChannelInboundHandler {
// fail any pending responses with last error or assume peer disconnected
context.fireChannelInactive()

// switch self.state {
// case .idle:
// break
//
// case .running(let promise, let timeout):
// self.state = .idle
// timeout?.cancel()
// promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
//
// case .waitForConnectionClose(let response, let promise):
// self.state = .idle
// promise.succeed(response)
// }
// switch self.state {
// case .idle:
// break
//
// case .running(let promise, let timeout):
// self.state = .idle
// timeout?.cancel()
// promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
//
// case .waitForConnectionClose(let response, let promise):
// self.state = .idle
// promise.succeed(response)
// }
}
}

Expand Down
3 changes: 1 addition & 2 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ struct NewLambdaRuntimeError: Error {
case finishAfterFinishHasBeenSent
case lostConnectionToControlPlane
case unexpectedStatusCodeForRequest

}

var code: Code


}
27 changes: 20 additions & 7 deletions Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
//===----------------------------------------------------------------------===//
//
// NewLambdaRuntimeClientTests.swift
// swift-aws-lambda-runtime
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Created by Fabian Fett on 28.08.24.
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Testing
import Logging
import NIOCore
import NIOPosix
import Logging
import Testing

import struct Foundation.UUID

@testable import AWSLambdaRuntimeCore

@Suite
Expand Down Expand Up @@ -52,7 +61,8 @@ struct NewLambdaRuntimeClientTests {
let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: 7000)

try await NewLambdaRuntimeClient.withRuntimeClient(
configuration: configuration, eventLoop: eventLoopGroup.next(),
configuration: configuration,
eventLoop: eventLoopGroup.next(),
logger: self.logger
) { runtimeClient in
do {
Expand All @@ -77,7 +87,10 @@ struct NewLambdaRuntimeClientTests {
}
}

func withMockServer<Result>(behaviour: some LambdaServerBehavior, _ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result) async throws -> Result {
func withMockServer<Result>(
behaviour: some LambdaServerBehavior,
_ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result
) async throws -> Result {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let server = MockLambdaServer(behavior: behaviour)
_ = try await server.start().get()
Expand Down