Skip to content

terminator handler #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup
  • Loading branch information
tomerd committed Apr 12, 2022
commit cd398157320fa2cacc2c3adc33e859f6ff87f2eb
6 changes: 3 additions & 3 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ extension Lambda {
public let allocator: ByteBufferAllocator

/// `Terminator` to register shutdown operations
public let terminator: Terminator
public let terminator: LambdaTerminator

init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: Terminator) {
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) {
self.eventLoop = eventLoop
self.logger = logger
self.allocator = allocator
Expand All @@ -57,7 +57,7 @@ extension Lambda {
logger: logger,
eventLoop: eventLoop,
allocator: ByteBufferAllocator(),
terminator: Lambda.Terminator()
terminator: LambdaTerminator()
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extension Lambda {
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: Terminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: LambdaTerminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occurred
Expand Down
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {

var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let terminator = Lambda.Terminator()
let terminator = LambdaTerminator()
let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration)

let startupFuture = runner.initialize(logger: logger, terminator: terminator, handlerType: Handler.self)
Expand Down
152 changes: 79 additions & 73 deletions Sources/AWSLambdaRuntimeCore/Terminator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,119 +15,125 @@
import NIOConcurrencyHelpers
import NIOCore

extension Lambda {
/// Lambda terminator.
/// Utility to manage the lambda shutdown sequence.
public final class Terminator {
public typealias Handler = (EventLoop) -> EventLoopFuture<Void>
public typealias RegistrationKey = String
/// Lambda terminator.
/// Utility to manage the lambda shutdown sequence.
public final class LambdaTerminator {
private typealias Handler = (EventLoop) -> EventLoopFuture<Void>

private var storage: Storage
private var storage: Storage

public init() {
self.storage = Storage()
}
init() {
self.storage = Storage()
}

/// Register a shutdown handler with the terminator
///
/// - parameters:
/// - name: Display name for logging purposes
/// - handler: The shutdown handler to call when terminating the Lambda.
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - Returns: A `RegistrationKey` that can be used to de-register the handler when its no longer needed.
@discardableResult
public func register(name: String, handler: @escaping Handler) -> RegistrationKey {
let key = LambdaRequestID().uuidString // UUID basically
self.storage.add(key: key, name: name, handler: handler)
return key
}
/// Register a shutdown handler with the terminator
///
/// - parameters:
/// - name: Display name for logging purposes
/// - handler: The shutdown handler to call when terminating the Lambda.
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - Returns: A `RegistrationKey` that can be used to de-register the handler when its no longer needed.
@discardableResult
public func register(name: String, handler: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey {
let key = RegistrationKey()
self.storage.add(key: key, name: name, handler: handler)
return key
}

/// De-register a shutdown handler with the terminator
///
/// - parameters:
/// - key: A `RegistrationKey` obtained from calling the register API.
public func deregister(_ key: RegistrationKey) {
self.storage.remove(key)
}
/// De-register a shutdown handler with the terminator
///
/// - parameters:
/// - key: A `RegistrationKey` obtained from calling the register API.
public func deregister(_ key: RegistrationKey) {
self.storage.remove(key)
}

/// Begin the termination cycle
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - parameters:
/// - eventLoop: The `EventLoop` to run the termination on.
///
/// - Returns: An `EventLoopFuture` with the result of the termination cycle.
internal func terminate(eventLoop: EventLoop) -> EventLoopFuture<Void> {
func terminate(_ iterator: IndexingIterator<[(name: String, handler: Handler)]>, errors: [Error], promise: EventLoopPromise<Void>) {
var iterator = iterator
guard let handler = iterator.next()?.handler else {
if errors.isEmpty {
return promise.succeed(())
} else {
return promise.fail(TerminationError(underlying: errors))
}
/// Begin the termination cycle
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - parameters:
/// - eventLoop: The `EventLoop` to run the termination on.
///
/// - Returns: An `EventLoopFuture` with the result of the termination cycle.
internal func terminate(eventLoop: EventLoop) -> EventLoopFuture<Void> {
func terminate(_ iterator: IndexingIterator<[(name: String, handler: Handler)]>, errors: [Error], promise: EventLoopPromise<Void>) {
var iterator = iterator
guard let handler = iterator.next()?.handler else {
if errors.isEmpty {
return promise.succeed(())
} else {
return promise.fail(TerminationError(underlying: errors))
}
handler(eventLoop).whenComplete { result in
var errors = errors
if case .failure(let error) = result {
errors.append(error)
}
return terminate(iterator, errors: errors, promise: promise)
}
handler(eventLoop).whenComplete { result in
var errors = errors
if case .failure(let error) = result {
errors.append(error)
}
return terminate(iterator, errors: errors, promise: promise)
}
}

// terminate in cascading, reverse order
let promise = eventLoop.makePromise(of: Void.self)
terminate(self.storage.handlers.reversed().makeIterator(), errors: [], promise: promise)
return promise.futureResult
// terminate in cascading, reverse order
let promise = eventLoop.makePromise(of: Void.self)
terminate(self.storage.handlers.reversed().makeIterator(), errors: [], promise: promise)
return promise.futureResult
}
}

extension LambdaTerminator {
/// Lambda terminator registration key.
public struct RegistrationKey: Hashable, CustomStringConvertible {
var value: String

init() {
// UUID basically
self.value = LambdaRequestID().uuidString
}

public var description: String {
self.value
}
}
}

extension LambdaTerminator {
private final class Storage {
private let lock: Lock
private var index: [String]
private var map: [String: (name: String, handler: Terminator.Handler)]
private var index: [RegistrationKey]
private var map: [RegistrationKey: (name: String, handler: Handler)]

public init() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public init() {
init() {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this missed?

self.lock = .init()
self.index = []
self.map = [:]
}

func add(key: String, name: String, handler: @escaping Terminator.Handler) {
func add(key: RegistrationKey, name: String, handler: @escaping Handler) {
self.lock.withLock {
self.index.append(key)
self.map[key] = (name: name, handler: handler)
}
}

func remove(_ key: String) {
func remove(_ key: RegistrationKey) {
self.lock.withLock {
self.index = self.index.filter { $0 != key }
self.map[key] = nil
}
}

var handlers: [(name: String, handler: Terminator.Handler)] {
var handlers: [(name: String, handler: Handler)] {
self.lock.withLock {
self.index.compactMap { self.map[$0] }
}
}
}
}

extension LambdaTerminator {
struct TerminationError: Error {
let underlying: [Error]
}
}

extension Result {
fileprivate var error: Error? {
switch self {
case .failure(let error):
return error
case .success:
return .none
}
}
}
2 changes: 1 addition & 1 deletion Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class LambdaRuntimeTest: XCTestCase {
XCTFail("Unexpected error: \(error)"); return
}

XCTAssertEqual(shutdownError as? Lambda.TerminationError, Lambda.TerminationError(underlying: [
XCTAssertEqual(shutdownError as? LambdaTerminator.TerminationError, LambdaTerminator.TerminationError(underlying: [
ShutdownError(description: "error 3"),
ShutdownError(description: "error 2"),
ShutdownError(description: "error 1"),
Expand Down
6 changes: 3 additions & 3 deletions Tests/AWSLambdaRuntimeCoreTests/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func runLambda<Handler: ByteBufferLambdaHandler>(behavior: LambdaServerBehavior,
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let logger = Logger(label: "TestLogger")
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let terminator = Lambda.Terminator()
let terminator = LambdaTerminator()
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
let server = try MockLambdaServer(behavior: behavior).start().wait()
defer { XCTAssertNoThrow(try server.stop().wait()) }
Expand Down Expand Up @@ -68,8 +68,8 @@ extension Lambda.RuntimeError: Equatable {
}
}

extension Lambda.TerminationError: Equatable {
public static func == (lhs: Lambda.TerminationError, rhs: Lambda.TerminationError) -> Bool {
extension LambdaTerminator.TerminationError: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
guard lhs.underlying.count == rhs.underlying.count else {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class CodableLambdaTest: XCTestCase {
logger: Logger(label: "test"),
eventLoop: self.eventLoopGroup.next(),
allocator: ByteBufferAllocator(),
terminator: Lambda.Terminator()
terminator: LambdaTerminator()
)
}
}
Expand Down