Skip to content

terminator handler #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 7 additions & 26 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ extension Lambda {
/// `ByteBufferAllocator` to allocate `ByteBuffer`
public let allocator: ByteBufferAllocator

init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) {
/// `Terminator` to register shutdown operations
public let terminator: LambdaTerminator

init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) {
self.eventLoop = eventLoop
self.logger = logger
self.allocator = allocator
self.terminator = terminator
}

/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
Expand All @@ -52,7 +56,8 @@ extension Lambda {
InitializationContext(
logger: logger,
eventLoop: eventLoop,
allocator: ByteBufferAllocator()
allocator: ByteBufferAllocator(),
terminator: LambdaTerminator()
)
}
}
Expand Down Expand Up @@ -205,27 +210,3 @@ public struct LambdaContext: CustomDebugStringConvertible {
)
}
}

// MARK: - ShutdownContext

extension Lambda {
/// Lambda runtime shutdown context.
/// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument.
public final class ShutdownContext {
/// `Logger` to log with
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public let logger: Logger

/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop

internal init(logger: Logger, eventLoop: EventLoop) {
self.eventLoop = eventLoop
self.logger = logger
}
}
}
13 changes: 0 additions & 13 deletions Sources/AWSLambdaRuntimeCore/LambdaHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,6 @@ public protocol ByteBufferLambdaHandler {
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`
func handle(_ event: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?>

/// Clean up the Lambda resources asynchronously.
/// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections.
///
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
}

extension ByteBufferLambdaHandler {
public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(())
}
}

extension ByteBufferLambdaHandler {
Expand Down
13 changes: 8 additions & 5 deletions Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ extension Lambda {
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: LambdaTerminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occured
let context = InitializationContext(logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator)
// 2. report initialization error if one occurred
let context = InitializationContext(
logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
terminator: terminator
)
return Handler.makeHandler(context: context)
// Hopping back to "our" EventLoop is important in case the factory returns a future
// that originated from a foreign EventLoop/EventLoopGroup.
Expand Down
15 changes: 7 additions & 8 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,22 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {

var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let terminator = LambdaTerminator()
let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration)

let startupFuture = runner.initialize(logger: logger, handlerType: Handler.self)
startupFuture.flatMap { handler -> EventLoopFuture<(Handler, Result<Int, Error>)> in
let startupFuture = runner.initialize(logger: logger, terminator: terminator, handlerType: Handler.self)
startupFuture.flatMap { handler -> EventLoopFuture<Result<Int, Error>> in
// after the startup future has succeeded, we have a handler that we can use
// to `run` the lambda.
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
self.state = .active(runner, handler)
self.run(promise: finishedPromise)
return finishedPromise.futureResult.mapResult { (handler, $0) }
}
.flatMap { handler, runnerResult -> EventLoopFuture<Int> in
return finishedPromise.futureResult.mapResult { $0 }
}.flatMap { runnerResult -> EventLoopFuture<Int> in
// after the lambda finishPromise has succeeded or failed we need to
// shutdown the handler
let shutdownContext = Lambda.ShutdownContext(logger: logger, eventLoop: self.eventLoop)
return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
// if, we had an error shuting down the lambda, we want to concatenate it with
terminator.terminate(eventLoop: self.eventLoop).flatMapErrorThrowing { error in
// if, we had an error shutting down the handler, we want to concatenate it with
// the runner result
logger.error("Error shutting down handler: \(error)")
throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
Expand Down
139 changes: 139 additions & 0 deletions Sources/AWSLambdaRuntimeCore/Terminator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOConcurrencyHelpers
import NIOCore

/// Lambda terminator.
/// Utility to manage the lambda shutdown sequence.
public final class LambdaTerminator {
private typealias Handler = (EventLoop) -> EventLoopFuture<Void>

private var storage: Storage

init() {
self.storage = Storage()
}

/// Register a shutdown handler with the terminator
///
/// - parameters:
/// - name: Display name for logging purposes
/// - handler: The shutdown handler to call when terminating the Lambda.
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - Returns: A `RegistrationKey` that can be used to de-register the handler when its no longer needed.
@discardableResult
public func register(name: String, handler: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey {
let key = RegistrationKey()
self.storage.add(key: key, name: name, handler: handler)
return key
}

/// De-register a shutdown handler with the terminator
///
/// - parameters:
/// - key: A `RegistrationKey` obtained from calling the register API.
public func deregister(_ key: RegistrationKey) {
self.storage.remove(key)
}

/// Begin the termination cycle
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - parameters:
/// - eventLoop: The `EventLoop` to run the termination on.
///
/// - Returns: An `EventLoopFuture` with the result of the termination cycle.
internal func terminate(eventLoop: EventLoop) -> EventLoopFuture<Void> {
func terminate(_ iterator: IndexingIterator<[(name: String, handler: Handler)]>, errors: [Error], promise: EventLoopPromise<Void>) {
var iterator = iterator
guard let handler = iterator.next()?.handler else {
if errors.isEmpty {
return promise.succeed(())
} else {
return promise.fail(TerminationError(underlying: errors))
}
}
handler(eventLoop).whenComplete { result in
var errors = errors
if case .failure(let error) = result {
errors.append(error)
}
return terminate(iterator, errors: errors, promise: promise)
}
}

// terminate in cascading, reverse order
let promise = eventLoop.makePromise(of: Void.self)
terminate(self.storage.handlers.reversed().makeIterator(), errors: [], promise: promise)
return promise.futureResult
}
}

extension LambdaTerminator {
/// Lambda terminator registration key.
public struct RegistrationKey: Hashable, CustomStringConvertible {
var value: String

init() {
// UUID basically
self.value = LambdaRequestID().uuidString
}

public var description: String {
self.value
}
}
}

extension LambdaTerminator {
private final class Storage {
private let lock: Lock
private var index: [RegistrationKey]
private var map: [RegistrationKey: (name: String, handler: Handler)]

init() {
self.lock = .init()
self.index = []
self.map = [:]
}

func add(key: RegistrationKey, name: String, handler: @escaping Handler) {
self.lock.withLock {
self.index.append(key)
self.map[key] = (name: name, handler: handler)
}
}

func remove(_ key: RegistrationKey) {
self.lock.withLock {
self.index = self.index.filter { $0 != key }
self.map[key] = nil
}
}

var handlers: [(name: String, handler: Handler)] {
self.lock.withLock {
self.index.compactMap { self.map[$0] }
}
}
}
}

extension LambdaTerminator {
struct TerminationError: Error {
let underlying: [Error]
}
}
3 changes: 1 addition & 2 deletions Sources/AWSLambdaTesting/Lambda+Testing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ extension Lambda {
public init(requestID: String = "\(DispatchTime.now().uptimeNanoseconds)",
traceID: String = "Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1",
invokedFunctionARN: String = "arn:aws:lambda:us-west-1:\(DispatchTime.now().uptimeNanoseconds):function:custom-runtime",
timeout: DispatchTimeInterval = .seconds(5))
{
timeout: DispatchTimeInterval = .seconds(5)) {
self.requestID = requestID
self.traceID = traceID
self.invokedFunctionARN = invokedFunctionARN
Expand Down
32 changes: 25 additions & 7 deletions Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,37 @@ class LambdaRuntimeTest: XCTestCase {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

struct ShutdownError: Error {}
struct ShutdownError: Error {
let description: String
}

struct ShutdownErrorHandler: EventLoopLambdaHandler {
typealias Event = String
typealias Output = Void

static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<ShutdownErrorHandler> {
context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
// register shutdown operation
context.terminator.register(name: "test 1", handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 1"))
})
context.terminator.register(name: "test 2", handler: { eventLoop in
eventLoop.makeSucceededVoidFuture()
})
context.terminator.register(name: "test 3", handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 2"))
})
context.terminator.register(name: "test 4", handler: { eventLoop in
eventLoop.makeSucceededVoidFuture()
})
context.terminator.register(name: "test 5", handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 3"))
})
return context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
}

func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededVoidFuture()
}

func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
context.eventLoop.makeFailedFuture(ShutdownError())
}
}

let eventLoop = eventLoopGroup.next()
Expand All @@ -95,7 +109,11 @@ class LambdaRuntimeTest: XCTestCase {
XCTFail("Unexpected error: \(error)"); return
}

XCTAssert(shutdownError is ShutdownError)
XCTAssertEqual(shutdownError as? LambdaTerminator.TerminationError, LambdaTerminator.TerminationError(underlying: [
ShutdownError(description: "error 3"),
ShutdownError(description: "error 2"),
ShutdownError(description: "error 1"),
]))
XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
}
}
Expand Down
13 changes: 12 additions & 1 deletion Tests/AWSLambdaRuntimeCoreTests/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ func runLambda<Handler: ByteBufferLambdaHandler>(behavior: LambdaServerBehavior,
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let logger = Logger(label: "TestLogger")
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let terminator = LambdaTerminator()
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
let server = try MockLambdaServer(behavior: behavior).start().wait()
defer { XCTAssertNoThrow(try server.stop().wait()) }
try runner.initialize(logger: logger, handlerType: handlerType).flatMap { handler in
try runner.initialize(logger: logger, terminator: terminator, handlerType: handlerType).flatMap { handler in
runner.run(logger: logger, handler: handler)
}.wait()
}
Expand Down Expand Up @@ -66,3 +67,13 @@ extension Lambda.RuntimeError: Equatable {
String(describing: lhs) == String(describing: rhs)
}
}

extension LambdaTerminator.TerminationError: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
guard lhs.underlying.count == rhs.underlying.count else {
return false
}
// technically incorrect, but good enough for our tests
return String(describing: lhs) == String(describing: rhs)
}
}
3 changes: 2 additions & 1 deletion Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class CodableLambdaTest: XCTestCase {
Lambda.InitializationContext(
logger: Logger(label: "test"),
eventLoop: self.eventLoopGroup.next(),
allocator: ByteBufferAllocator()
allocator: ByteBufferAllocator(),
terminator: LambdaTerminator()
)
}
}
Expand Down