Skip to content

Remove static #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions lambda-runtime/examples/shared_resource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// This example requires the following input to succeed:
// { "command": "do something" }

use std::convert::{Infallible, TryFrom};

use lambda_runtime::{handler_fn, Context, Error};
use log::LevelFilter;
use serde::{Deserialize, Serialize};
use simple_logger::SimpleLogger;

/// This is also a made-up example. Requests come into the runtime as unicode
/// strings in json format, which can map to any structure that implements `serde::Deserialize`
/// The runtime pays no attention to the contents of the request payload.
#[derive(Deserialize)]
struct Request {
command: String,
}

/// This is a made-up example of what a response structure may look like.
/// There is no restriction on what it can be. The runtime requires responses
/// to be serialized into json. The runtime pays no attention
/// to the contents of the response payload.
#[derive(Serialize)]
struct Response {
req_id: String,
msg: String,
}

struct SharedClient {
name: &'static str,
}

impl TryFrom<&'static str> for SharedClient {
type Error = Infallible;
fn try_from(name: &'static str) -> Result<Self, Self::Error> {
Ok(Self { name })
}
}

impl SharedClient {
fn response(&self, req_id: String, command: String) -> Response {
Response {
req_id,
msg: format!("Command {} executed by {}.", command, self.name),
}
}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
// can be replaced with any other method of initializing `log`
SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap();

let client = SharedClient::try_from("Shared Client 1 (perhaps a database)")?;
let client_ref = &client;
lambda_runtime::run(handler_fn(move |event: Request, ctx: Context| async move {
let command = event.command;
Ok::<Response, Error>(client_ref.response(ctx.request_id, command))
}))
.await?;
Ok(())
}
40 changes: 20 additions & 20 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
convert::{TryFrom, TryInto},
env, fmt,
future::Future,
sync::Arc,
panic,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::{Stream, StreamExt};
Expand Down Expand Up @@ -93,7 +93,7 @@ pub struct HandlerFn<F> {
impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
where
F: Fn(A, Context) -> Fut,
Fut: Future<Output = Result<B, Error>> + Send,
Fut: Future<Output = Result<B, Error>>,
Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
{
type Error = Error;
Expand Down Expand Up @@ -135,14 +135,13 @@ where
handler: F,
) -> Result<(), Error>
where
F: Handler<A, B> + Send + Sync + 'static,
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>> + Send + 'static,
<F as Handler<A, B>>::Error: fmt::Display + Send + Sync + 'static,
A: for<'de> Deserialize<'de> + Send + Sync + 'static,
B: Serialize + Send + Sync + 'static,
F: Handler<A, B>,
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>>,
<F as Handler<A, B>>::Error: fmt::Display,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
let client = &self.client;
let handler = Arc::new(handler);
tokio::pin!(incoming);
while let Some(event) = incoming.next().await {
trace!("New event arrived (run loop)");
Expand All @@ -154,12 +153,10 @@ where
trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose
let body = serde_json::from_slice(&body)?;

let handler = Arc::clone(&handler);
let request_id = &ctx.request_id.clone();
#[allow(clippy::async_yields_async)]
let task = tokio::spawn(async move { handler.call(body, ctx) });
let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(body, ctx)));

let req = match task.await {
let req = match task {
Ok(response) => match response.await {
Ok(response) => {
trace!("Ok response from handler (run loop)");
Expand All @@ -181,18 +178,21 @@ where
.into_req()
}
},
Err(err) if err.is_panic() => {
Err(err) => {
error!("{:?}", err); // inconsistent with other log record formats - to be reviewed
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: format!("Lambda panicked: {}", err),
error_message: if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
},
},
}
.into_req()
}
Err(_) => unreachable!("tokio::task should not be canceled"),
};
let req = req?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
Expand Down Expand Up @@ -291,11 +291,11 @@ where
/// ```
pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
where
F: Handler<A, B> + Send + Sync + 'static,
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>> + Send + 'static,
<F as Handler<A, B>>::Error: fmt::Display + Send + Sync + 'static,
A: for<'de> Deserialize<'de> + Send + Sync + 'static,
B: Serialize + Send + Sync + 'static,
F: Handler<A, B>,
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>>,
<F as Handler<A, B>>::Error: fmt::Display,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
trace!("Loading config from env");
let config = Config::from_env()?;
Expand Down