Skip to content

Commit 0a6f22a

Browse files
Rasmus Viitanenrasviitanen
Rasmus Viitanen
authored andcommitted
switch panic handling method
1 parent d3ff435 commit 0a6f22a

File tree

3 files changed

+87
-21
lines changed

3 files changed

+87
-21
lines changed

lambda-runtime/examples/basic.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
// This example requires the following input to succeed:
1+
// This example demonstrates use of shared resources such as DB connections
2+
// or local caches that can be initialized at the start of the runtime and
3+
// reused by subsequent lambda handler calls.
4+
// Run it with the following input:
25
// { "command": "do something" }
36

47
use lambda_runtime::{handler_fn, Context, Error};
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// This example requires the following input to succeed:
2+
// { "command": "do something" }
3+
4+
use std::convert::{Infallible, TryFrom};
5+
6+
use lambda_runtime::{handler_fn, Context, Error};
7+
use log::LevelFilter;
8+
use serde::{Deserialize, Serialize};
9+
use simple_logger::SimpleLogger;
10+
11+
/// This is also a made-up example. Requests come into the runtime as unicode
12+
/// strings in json format, which can map to any structure that implements `serde::Deserialize`
13+
/// The runtime pays no attention to the contents of the request payload.
14+
#[derive(Deserialize)]
15+
struct Request {
16+
command: String,
17+
}
18+
19+
/// This is a made-up example of what a response structure may look like.
20+
/// There is no restriction on what it can be. The runtime requires responses
21+
/// to be serialized into json. The runtime pays no attention
22+
/// to the contents of the response payload.
23+
#[derive(Serialize)]
24+
struct Response {
25+
req_id: String,
26+
msg: String,
27+
}
28+
29+
struct SharedClient {
30+
name: &'static str,
31+
}
32+
33+
impl TryFrom<&'static str> for SharedClient {
34+
type Error = Infallible;
35+
fn try_from(name: &'static str) -> Result<Self, Self::Error> {
36+
Ok(Self { name })
37+
}
38+
}
39+
40+
impl SharedClient {
41+
fn response(&self, req_id: String, command: String) -> Response {
42+
Response {
43+
req_id,
44+
msg: format!("Command {} executed by {}.", command, self.name),
45+
}
46+
}
47+
}
48+
49+
#[tokio::main]
50+
async fn main() -> Result<(), Error> {
51+
// required to enable CloudWatch error logging by the runtime
52+
// can be replaced with any other method of initializing `log`
53+
SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap();
54+
55+
let client = SharedClient::try_from("Shared Client 1 (perhaps a database)")?;
56+
let client_ref = &client;
57+
lambda_runtime::run(handler_fn(move |event: Request, ctx: Context| async move {
58+
let command = event.command;
59+
Ok::<Response, Error>(client_ref.response(ctx.request_id, command))
60+
}))
61+
.await?;
62+
Ok(())
63+
}

lambda-runtime/src/lib.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
convert::{TryFrom, TryInto},
1414
env, fmt,
1515
future::Future,
16-
sync::Arc,
16+
panic,
1717
};
1818
use tokio::io::{AsyncRead, AsyncWrite};
1919
use tokio_stream::{Stream, StreamExt};
@@ -93,7 +93,7 @@ pub struct HandlerFn<F> {
9393
impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
9494
where
9595
F: Fn(A, Context) -> Fut,
96-
Fut: Future<Output = Result<B, Error>> + Send,
96+
Fut: Future<Output = Result<B, Error>>,
9797
Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
9898
{
9999
type Error = Error;
@@ -135,14 +135,13 @@ where
135135
handler: F,
136136
) -> Result<(), Error>
137137
where
138-
F: Handler<A, B> + Send + Sync + 'static,
139-
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>> + Send + 'static,
140-
<F as Handler<A, B>>::Error: fmt::Display + Send + Sync + 'static,
141-
A: for<'de> Deserialize<'de> + Send + Sync + 'static,
142-
B: Serialize + Send + Sync + 'static,
138+
F: Handler<A, B>,
139+
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>>,
140+
<F as Handler<A, B>>::Error: fmt::Display,
141+
A: for<'de> Deserialize<'de>,
142+
B: Serialize,
143143
{
144144
let client = &self.client;
145-
let handler = Arc::new(handler);
146145
tokio::pin!(incoming);
147146
while let Some(event) = incoming.next().await {
148147
trace!("New event arrived (run loop)");
@@ -154,12 +153,10 @@ where
154153
trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose
155154
let body = serde_json::from_slice(&body)?;
156155

157-
let handler = Arc::clone(&handler);
158156
let request_id = &ctx.request_id.clone();
159-
#[allow(clippy::async_yields_async)]
160-
let task = tokio::spawn(async move { handler.call(body, ctx) });
157+
let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(body, ctx)));
161158

162-
let req = match task.await {
159+
let req = match task {
163160
Ok(response) => match response.await {
164161
Ok(response) => {
165162
trace!("Ok response from handler (run loop)");
@@ -181,18 +178,21 @@ where
181178
.into_req()
182179
}
183180
},
184-
Err(err) if err.is_panic() => {
181+
Err(err) => {
185182
error!("{:?}", err); // inconsistent with other log record formats - to be reviewed
186183
EventErrorRequest {
187184
request_id,
188185
diagnostic: Diagnostic {
189186
error_type: type_name_of_val(&err).to_owned(),
190-
error_message: format!("Lambda panicked: {}", err),
187+
error_message: if let Some(msg) = err.downcast_ref::<&str>() {
188+
format!("Lambda panicked: {}", msg)
189+
} else {
190+
"Lambda panicked".to_string()
191+
},
191192
},
192193
}
193194
.into_req()
194195
}
195-
Err(_) => unreachable!("tokio::task should not be canceled"),
196196
};
197197
let req = req?;
198198
client.call(req).await.expect("Unable to send response to Runtime APIs");
@@ -291,11 +291,11 @@ where
291291
/// ```
292292
pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
293293
where
294-
F: Handler<A, B> + Send + Sync + 'static,
295-
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>> + Send + 'static,
296-
<F as Handler<A, B>>::Error: fmt::Display + Send + Sync + 'static,
297-
A: for<'de> Deserialize<'de> + Send + Sync + 'static,
298-
B: Serialize + Send + Sync + 'static,
294+
F: Handler<A, B>,
295+
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>>,
296+
<F as Handler<A, B>>::Error: fmt::Display,
297+
A: for<'de> Deserialize<'de>,
298+
B: Serialize,
299299
{
300300
trace!("Loading config from env");
301301
let config = Config::from_env()?;

0 commit comments

Comments
 (0)