Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ let package = Package(
]),
.testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [
.byName(name: "AWSLambdaRuntimeCore"),
.product(name: "NIOTestUtils", package: "swift-nio"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
]),
.testTarget(name: "AWSLambdaRuntimeTests", dependencies: [
.byName(name: "AWSLambdaRuntimeCore"),
Expand Down
10 changes: 4 additions & 6 deletions Sources/AWSLambdaRuntimeCore/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,25 @@ internal final class HTTPClient {
private var state = State.disconnected
private var executing = false

private static let headers = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])

init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
self.eventLoop = eventLoop
self.configuration = configuration
self.targetHost = "\(self.configuration.ip):\(self.configuration.port)"
}

func get(url: String, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
func get(url: String, headers: HTTPHeaders, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
self.execute(Request(targetHost: self.targetHost,
url: url,
method: .GET,
headers: HTTPClient.headers,
headers: headers,
timeout: timeout ?? self.configuration.requestTimeout))
}

func post(url: String, body: ByteBuffer?, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
func post(url: String, headers: HTTPHeaders, body: ByteBuffer?, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
self.execute(Request(targetHost: self.targetHost,
url: url,
method: .POST,
headers: HTTPClient.headers,
headers: headers,
body: body,
timeout: timeout ?? self.configuration.requestTimeout))
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ extension Lambda {
let keepAlive: Bool
let requestTimeout: TimeAmount?

init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
let ipPort = env("AWS_LAMBDA_RUNTIME_API")?.split(separator: ":") ?? ["127.0.0.1", "7000"]
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
let ipPort = (address ?? env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"]
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
preconditionFailure("invalid ip+port configuration \(ipPort)")
}
Expand Down
20 changes: 17 additions & 3 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extension Lambda {
func getNextInvocation(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url).flatMapThrowing { response in
return self.httpClient.get(url: url, headers: RuntimeClient.defaultHeaders).flatMapThrowing { response in
guard response.status == .ok else {
throw RuntimeError.badStatusCode(response.status)
}
Expand All @@ -61,19 +61,23 @@ extension Lambda {
func reportResults(logger: Logger, invocation: Invocation, result: Result<ByteBuffer?, Error>) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestID
var body: ByteBuffer?
let headers: HTTPHeaders

switch result {
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = buffer
headers = RuntimeClient.defaultHeaders
case .failure(let error):
url += Consts.postErrorURLSuffix
let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
body = self.allocator.buffer(capacity: bytes.count)
body!.writeBytes(bytes)
headers = RuntimeClient.errorHeaders
}
logger.debug("reporting results to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw RuntimeError.badStatusCode(response.status)
}
Expand All @@ -98,7 +102,7 @@ extension Lambda {
var body = self.allocator.buffer(capacity: bytes.count)
body.writeBytes(bytes)
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
return self.httpClient.post(url: url, headers: RuntimeClient.errorHeaders, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw RuntimeError.badStatusCode(response.status)
}
Expand Down Expand Up @@ -186,3 +190,13 @@ extension Lambda {
}
}
}

extension Lambda.RuntimeClient {
internal static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])

/// These headers must be sent along an invocation or initialization error report
internal static let errorHeaders = HTTPHeaders([
("user-agent", "Swift-Lambda/Unknown"),
("lambda-runtime-function-error-type", "Unhandled"),
])
}
109 changes: 109 additions & 0 deletions Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
//===----------------------------------------------------------------------===//

@testable import AWSLambdaRuntimeCore
import Logging
import NIO
import NIOFoundationCompat
import NIOHTTP1
import NIOTestUtils
import XCTest

class LambdaRuntimeClientTest: XCTestCase {
Expand Down Expand Up @@ -209,6 +214,110 @@ class LambdaRuntimeClientTest: XCTestCase {
XCTAssertEqual(#"{"errorType":"error","errorMessage":"🥑👨‍👩‍👧‍👧👩‍👩‍👧‍👧👨‍👨‍👧"}"#, String(decoding: emojiBytes, as: Unicode.UTF8.self))
}

func testInitializationErrorReport() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let server = NIOHTTP1TestServer(group: eventLoopGroup)
defer { XCTAssertNoThrow(try server.stop()) }

let logger = Logger(label: "TestLogger")
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let result = client.reportInitializationError(logger: logger, error: TestError("boom"))

var inboundHeader: HTTPServerRequestPart?
XCTAssertNoThrow(inboundHeader = try server.readInbound())
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return }
XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])

var inboundBody: HTTPServerRequestPart?
XCTAssertNoThrow(inboundBody = try server.readInbound())
guard case .body(let body) = try? XCTUnwrap(inboundBody) else { XCTFail("Expected body after head"); return }
XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom")

XCTAssertEqual(try server.readInbound(), .end(nil))

XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
XCTAssertNoThrow(try result.wait())
}

func testInvocationErrorReport() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let server = NIOHTTP1TestServer(group: eventLoopGroup)
defer { XCTAssertNoThrow(try server.stop()) }

let logger = Logger(label: "TestLogger")
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))

let header = HTTPHeaders([
(AmazonHeaders.requestID, "test"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
])
var inv: Lambda.Invocation?
XCTAssertNoThrow(inv = try Lambda.Invocation(headers: header))
guard let invocation = inv else { return }

let result = client.reportResults(logger: logger, invocation: invocation, result: Result.failure(TestError("boom")))

var inboundHeader: HTTPServerRequestPart?
XCTAssertNoThrow(inboundHeader = try server.readInbound())
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return }
XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])

var inboundBody: HTTPServerRequestPart?
XCTAssertNoThrow(inboundBody = try server.readInbound())
guard case .body(let body) = try? XCTUnwrap(inboundBody) else { XCTFail("Expected body after head"); return }
XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom")

XCTAssertEqual(try server.readInbound(), .end(nil))

XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
XCTAssertNoThrow(try result.wait())
}

func testInvocationSuccessResponse() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let server = NIOHTTP1TestServer(group: eventLoopGroup)
defer { XCTAssertNoThrow(try server.stop()) }

let logger = Logger(label: "TestLogger")
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))

let header = HTTPHeaders([
(AmazonHeaders.requestID, "test"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
])
var inv: Lambda.Invocation?
XCTAssertNoThrow(inv = try Lambda.Invocation(headers: header))
guard let invocation = inv else { return }

let result = client.reportResults(logger: logger, invocation: invocation, result: Result.success(nil))

var inboundHeader: HTTPServerRequestPart?
XCTAssertNoThrow(inboundHeader = try server.readInbound())
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return }
XCTAssertFalse(head.headers.contains(name: "lambda-runtime-function-error-type"))
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])

XCTAssertEqual(try server.readInbound(), .end(nil))

XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
XCTAssertNoThrow(try result.wait())
}

class Behavior: LambdaServerBehavior {
var state = 0

Expand Down