Skip to content

Commit 6e953eb

Browse files
authored
Support internal Lambda extensions (#744)
* Support internal Lambda extensions Internal Lambda extensions must be registered during the Lamba lifecycle Init phase, which ends when the runtime calls Next to await the first event. Since the `Extension::run` function both registers and executes the extension, there was previously no way for the runtime to determine that all internal extensions have been registered and that it's safe to proceed to the Invoke lifecycle phase. This change introduces an `Extension::register` method that registers the extension and begins any logs/telemetry handlers. It then returns a new `RegisteredExtension` abstraction that can be used to invoke the extension's run loop concurrently with the runtime's run loop. This change maintains backward compatibility by having the existing `Extension::run` method perform both steps. External Lambda extensions can use either API, and internal extensions should use the new API. Resolves #743. * Add example * Set extension name in example * Remove unnecessary Arc/Mutex
1 parent 1e4e203 commit 6e953eb

File tree

4 files changed

+204
-3
lines changed

4 files changed

+204
-3
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "extension-internal-flush"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
anyhow = "1"
8+
aws_lambda_events = { path = "../../lambda-events" }
9+
lambda-extension = { path = "../../lambda-extension" }
10+
lambda_runtime = { path = "../../lambda-runtime" }
11+
serde = "1.0.136"
12+
tokio = { version = "1", features = ["macros", "sync"] }
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# AWS Lambda runtime + internal extension example
2+
3+
This example demonstrates how to build an AWS Lambda function that includes a
4+
[Lambda internal extension](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html).
5+
Unlike external extensions that run as separate processes, an internal extension runs within the
6+
main runtime process.
7+
8+
One use case for internal extensions is to flush logs or telemetry data after the Lambda runtime
9+
handler has finished processing an event but before the execution environment is frozen awaiting the
10+
arrival of the next event. Without an explicit flush, telemetry data may never be sent since the
11+
execution environment will remain frozen and eventually be terminated if no additional events arrive.
12+
13+
Note that for
14+
[synchronous](https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html) Lambda invocations
15+
(e.g., via
16+
[Amazon API Gateway](https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-integrations.html)),
17+
the Lambda service returns the response to the caller immediately. Extensions may continue to run
18+
without introducing an observable delay.
19+
20+
## Build & Deploy
21+
22+
1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
23+
2. Build the extension with `cargo lambda build --release`
24+
3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE`
25+
26+
The last command will give you an ARN for the extension layer that you can use in your functions.
27+
28+
## Build for ARM 64
29+
30+
Build the extension with `cargo lambda build --release --arm64`
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use anyhow::anyhow;
2+
use aws_lambda_events::sqs::{SqsBatchResponse, SqsEventObj};
3+
use lambda_extension::{service_fn, Error, Extension, NextEvent};
4+
use serde::{Deserialize, Serialize};
5+
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
6+
use tokio::sync::Mutex;
7+
8+
use std::sync::Arc;
9+
10+
/// Implements an internal Lambda extension to flush logs/telemetry after each request.
11+
struct FlushExtension {
12+
request_done_receiver: Mutex<UnboundedReceiver<()>>,
13+
}
14+
15+
impl FlushExtension {
16+
pub fn new(request_done_receiver: UnboundedReceiver<()>) -> Self {
17+
Self {
18+
request_done_receiver: Mutex::new(request_done_receiver),
19+
}
20+
}
21+
22+
pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
23+
match event.next {
24+
// NB: Internal extensions only support the INVOKE event.
25+
NextEvent::Shutdown(shutdown) => {
26+
return Err(anyhow!("extension received unexpected SHUTDOWN event: {:?}", shutdown).into());
27+
}
28+
NextEvent::Invoke(_e) => {}
29+
}
30+
31+
eprintln!("[extension] waiting for event to be processed");
32+
33+
// Wait for runtime to finish processing event.
34+
self.request_done_receiver
35+
.lock()
36+
.await
37+
.recv()
38+
.await
39+
.ok_or_else(|| anyhow!("channel is closed"))?;
40+
41+
eprintln!("[extension] flushing logs and telemetry");
42+
43+
// <flush logs and telemetry here>
44+
45+
Ok(())
46+
}
47+
}
48+
49+
/// Object that you send to SQS and plan to process with the function.
50+
#[derive(Debug, Deserialize, Serialize)]
51+
struct Data {
52+
a: String,
53+
b: i64,
54+
}
55+
56+
/// Implements the main event handler for processing events from an SQS queue.
57+
struct EventHandler {
58+
request_done_sender: UnboundedSender<()>,
59+
}
60+
61+
impl EventHandler {
62+
pub fn new(request_done_sender: UnboundedSender<()>) -> Self {
63+
Self { request_done_sender }
64+
}
65+
66+
pub async fn invoke(
67+
&self,
68+
event: lambda_runtime::LambdaEvent<SqsEventObj<Data>>,
69+
) -> Result<SqsBatchResponse, Error> {
70+
let data = &event.payload.records[0].body;
71+
eprintln!("[runtime] received event {data:?}");
72+
73+
// <process event here>
74+
75+
// Notify the extension to flush traces.
76+
self.request_done_sender.send(()).map_err(Box::new)?;
77+
78+
Ok(SqsBatchResponse::default())
79+
}
80+
}
81+
82+
#[tokio::main]
83+
async fn main() -> Result<(), Error> {
84+
let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();
85+
86+
let flush_extension = Arc::new(FlushExtension::new(request_done_receiver));
87+
let extension = Extension::new()
88+
// Internal extensions only support INVOKE events.
89+
.with_events(&["INVOKE"])
90+
.with_events_processor(service_fn(|event| {
91+
let flush_extension = flush_extension.clone();
92+
async move { flush_extension.invoke(event).await }
93+
}))
94+
// Internal extension names MUST be unique within a given Lambda function.
95+
.with_extension_name("internal-flush")
96+
// Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init
97+
// phase and begins the Invoke phase.
98+
.register()
99+
.await?;
100+
101+
let handler = Arc::new(EventHandler::new(request_done_sender));
102+
103+
tokio::try_join!(
104+
lambda_runtime::run(service_fn(|event| {
105+
let handler = handler.clone();
106+
async move { handler.invoke(event).await }
107+
})),
108+
extension.run(),
109+
)?;
110+
111+
Ok(())
112+
}

lambda-extension/src/extension.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,21 @@ where
215215
}
216216
}
217217

218-
/// Execute the given extension
219-
pub async fn run(self) -> Result<(), Error> {
218+
/// Register the extension.
219+
///
220+
/// Performs the
221+
/// [init phase](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-ib)
222+
/// Lambda lifecycle operations to register the extension. When implementing an internal Lambda
223+
/// extension, it is safe to call `lambda_runtime::run` once the future returned by this
224+
/// function resolves.
225+
pub async fn register(self) -> Result<RegisteredExtension<E>, Error> {
220226
let client = &Client::builder().build()?;
221227

222228
let extension_id = register(client, self.extension_name, self.events).await?;
223229
let extension_id = extension_id.to_str()?;
224-
let mut ep = self.events_processor;
225230

231+
// Logs API subscriptions must be requested during the Lambda init phase (see
232+
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-subscribing).
226233
if let Some(mut log_processor) = self.logs_processor {
227234
trace!("Log processor found");
228235

@@ -262,6 +269,8 @@ where
262269
trace!("Registered extension with Logs API");
263270
}
264271

272+
// Telemetry API subscriptions must be requested during the Lambda init phase (see
273+
// https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html#telemetry-api-registration
265274
if let Some(mut telemetry_processor) = self.telemetry_processor {
266275
trace!("Telemetry processor found");
267276

@@ -301,6 +310,42 @@ where
301310
trace!("Registered extension with Telemetry API");
302311
}
303312

313+
Ok(RegisteredExtension {
314+
extension_id: extension_id.to_string(),
315+
events_processor: self.events_processor,
316+
})
317+
}
318+
319+
/// Execute the given extension.
320+
pub async fn run(self) -> Result<(), Error> {
321+
self.register().await?.run().await
322+
}
323+
}
324+
325+
/// An extension registered by calling [`Extension::register`].
326+
pub struct RegisteredExtension<E> {
327+
extension_id: String,
328+
events_processor: E,
329+
}
330+
331+
impl<E> RegisteredExtension<E>
332+
where
333+
E: Service<LambdaEvent>,
334+
E::Future: Future<Output = Result<(), E::Error>>,
335+
E::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display + fmt::Debug,
336+
{
337+
/// Execute the extension's run loop.
338+
///
339+
/// Performs the
340+
/// [invoke](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-invoke)
341+
/// and, for external Lambda extensions registered to receive the `SHUTDOWN` event, the
342+
/// [shutdown](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown)
343+
/// Lambda lifecycle phases.
344+
pub async fn run(self) -> Result<(), Error> {
345+
let client = &Client::builder().build()?;
346+
let mut ep = self.events_processor;
347+
let extension_id = &self.extension_id;
348+
304349
let incoming = async_stream::stream! {
305350
loop {
306351
trace!("Waiting for next event (incoming loop)");
@@ -351,6 +396,8 @@ where
351396
return Err(err.into());
352397
}
353398
}
399+
400+
// Unreachable.
354401
Ok(())
355402
}
356403
}

0 commit comments

Comments
 (0)