Skip to content

async support #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lifetime hell
  • Loading branch information
Marco Napetti committed May 9, 2019
commit 404c7b161844dbc77c46c4417d079bc375f45d31
44 changes: 22 additions & 22 deletions lambda-runtime-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const MAX_RETRIES: i8 = 3;
///
/// # Panics
/// The function panics if the Lambda environment variables are not set.
pub fn start<EventError>(f: impl Handler<EventError>/*, runtime: Option<TokioRuntime>*/) -> impl Future<Item=(), Error=()>
pub fn start<EventError>(f: impl Handler<EventError> + 'static/*, runtime: Option<TokioRuntime>*/) -> impl Future<Item=(), Error=()>
where
EventError: Fail + LambdaErrorExt + Display + Send + Sync,
{
Expand Down Expand Up @@ -67,7 +67,7 @@ macro_rules! lambda {
/// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment
/// and spin up a new one for the next invocation.
pub fn start_with_config<Config, EventError>(
f: impl Handler<EventError>,
f: impl Handler<EventError> + 'static,
config: &Config,
// runtime: Option<TokioRuntime>,
) -> impl Future<Item=(), Error=()>
Expand Down Expand Up @@ -119,7 +119,7 @@ where
/// # Panics
/// The function panics if we cannot instantiate a new `RustRuntime` object.
pub(crate) fn start_with_runtime_client<EventError>(
f: impl Handler<EventError>,
f: impl Handler<EventError> + 'static,
func_settings: FunctionSettings,
client: RuntimeClient,
) -> impl Future<Item=(), Error=()>
Expand Down Expand Up @@ -187,20 +187,20 @@ where
/// Starts the main event loop and begin polling or new events. If one of the
/// Runtime APIs returns an unrecoverable error this method calls the init failed
/// API and then panics.
fn start(self) -> impl Future<Item=(), Error=()> {
fn start<'a>(&'a mut self) -> impl Future<Item=(), Error=()> + 'a {
debug!("Beginning main event loop");
loop_fn(self, |runtime| {
runtime.get_next_event(0, None).and_then(|(mut runtime, event, ctx)| {
loop_fn((), move |_| {
self.get_next_event(0, None).and_then(|(event, ctx)| {
let request_id = ctx.aws_request_id.clone();
info!("Received new event with AWS request id: {}", request_id);
let function_outcome = runtime.invoke(event, ctx);
let function_outcome = self.invoke(event, ctx);
match function_outcome {
Ok(response) => {
debug!(
"Function executed succesfully for {}, pushing response to Runtime API",
request_id
);
Either::A(runtime.runtime_client.event_response(&request_id, &response)
Either::A(self.runtime_client.event_response(&request_id, &response)
.then(move |r| {
match r {
Ok(_) => info!("Response for {} accepted by Runtime API", request_id),
Expand All @@ -213,18 +213,18 @@ where
"Error for {} is not recoverable, sending fail_init signal and panicking.",
request_id
);
runtime.runtime_client.fail_init(&ErrorResponse::from(e));
self.runtime_client.fail_init(&ErrorResponse::from(e));
panic!("Could not send response");
}
},
}
Ok(Loop::Continue(runtime))
Ok(Loop::Continue(()))
}))
}
Err(e) => {
error!("Handler returned an error for {}: {}", request_id, e);
debug!("Attempting to send error response to Runtime API for {}", request_id);
Either::B(runtime.runtime_client.event_error(&request_id, &ErrorResponse::from(e))
Either::B(self.runtime_client.event_error(&request_id, &ErrorResponse::from(e))
.then(move |r| {
match r {
Ok(_) => info!("Error response for {} accepted by Runtime API", request_id),
Expand All @@ -235,12 +235,12 @@ where
"Error for {} is not recoverable, sending fail_init signal and panicking",
request_id
);
runtime.runtime_client.fail_init(&ErrorResponse::from(e));
self.runtime_client.fail_init(&ErrorResponse::from(e));
panic!("Could not send error response");
}
},
}
Ok(Loop::Continue(runtime))
Ok(Loop::Continue(()))
}))
}
}
Expand All @@ -259,18 +259,18 @@ where
///
/// # Return
/// The next `Event` object to be processed.
pub(super) fn get_next_event(self, retries: i8, e: Option<RuntimeError>) -> impl Future<Item=(Self, Vec<u8>, Context), Error=()> {
loop_fn((self, retries, e), |(runtime, retries, e)| {
pub(super) fn get_next_event<'a>(&'a self, retries: i8, e: Option<RuntimeError>) -> impl Future<Item=(Vec<u8>, Context), Error=()> + 'a {
loop_fn((retries, e), move |(retries, e)| {
if let Some(err) = e {
if retries > runtime.max_retries {
if retries > self.max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
let future = match err.request_id.clone() {
Some(req_id) => {
Either::A(runtime.runtime_client.event_error(&req_id, &ErrorResponse::from(err))
Either::A(self.runtime_client.event_error(&req_id, &ErrorResponse::from(err))
.map_err(|_| panic!("Could not send event error response")))
}
None => {
Either::B(runtime.runtime_client.fail_init(&ErrorResponse::from(err)))
Either::B(self.runtime_client.fail_init(&ErrorResponse::from(err)))
}
};
// to avoid unreachable code
Expand All @@ -282,21 +282,21 @@ where
}
}

Either::B(runtime.runtime_client.next_event()
Either::B(self.runtime_client.next_event()
.then(move |r| {
match r {
Ok((ev_data, invocation_ctx)) => {
let mut handler_ctx = Context::new(runtime.settings.clone());
let mut handler_ctx = Context::new(self.settings.clone());
handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
handler_ctx.client_context = invocation_ctx.client_context;
handler_ctx.identity = invocation_ctx.identity;
handler_ctx.deadline = invocation_ctx.deadline;

Ok(Loop::Break((runtime, ev_data, handler_ctx)))
Ok(Loop::Break((ev_data, handler_ctx)))
},
Err(e) => Ok(Loop::Continue((runtime, retries + 1, Option::from(RuntimeError::from(e))))),
Err(e) => Ok(Loop::Continue((retries + 1, Option::from(RuntimeError::from(e))))),
}
}))
})
Expand Down