Skip to content

Lambda with Async/Await #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f914c3f
Remove synchronous lambda
Jun 28, 2019
5e66648
use custom proptest attribute; Err -> Error
Jun 28, 2019
96ea124
version bump
Jul 5, 2019
ae003fd
Fix failing ui tests
Jul 5, 2019
5869b52
remove better-panic dev dependency
Jul 5, 2019
2d5b168
remove publicly exported `Error` type alias.
Jul 5, 2019
abdf54a
fix silly type error
Jul 8, 2019
a564dac
Language/ecosystem updates.
Aug 14, 2019
b7eb844
Remove #![feature(async_await)]
Aug 22, 2019
024730f
update to latest syn + remove proptest dependency
Oct 1, 2019
12a1050
rename generics
Oct 1, 2019
8233a68
errors + external proptest dep
Oct 11, 2019
d31200d
dedicated request types
Nov 26, 2019
044caf5
simplify a bit
davidbarsky Nov 28, 2019
4b507c9
Update deps; remove flaky UI tests.
Dec 9, 2019
4d21c00
WIP
Dec 9, 2019
2447e4e
more WIP changes
Dec 10, 2019
a57f2d1
test expected endpoints
Dec 20, 2019
7a34cc2
it "works"
davidbarsky Dec 21, 2019
4b9a2e8
task-local context, baby
davidbarsky Dec 21, 2019
ee837cb
simulated endpoints
davidbarsky Dec 22, 2019
4a08c58
fix doc tests; handle paths correctly; run everything in tasks
davidbarsky Dec 22, 2019
b1e2570
remove unneeded dep
davidbarsky Dec 22, 2019
9528a7b
re-introduce @AnderEnder ci changes
davidbarsky Dec 22, 2019
5eb6954
remove double-serialization
davidbarsky Dec 23, 2019
b890cf4
remove travis
davidbarsky Dec 23, 2019
77c0b95
Use default HTTP scheme for endpoint like "127.0.0.1:9001" (#188)
skorobkov Dec 25, 2019
1b95ac1
test lambda function end-to-end
davidbarsky Dec 25, 2019
8fed758
lol, bye vscode
davidbarsky Jan 1, 2020
2d37c34
fix lambda-http deps (#189)
skorobkov Jan 1, 2020
603315a
fix error bounds
davidbarsky Jan 1, 2020
e691c0e
update dep; use tokio's task-local for ctx.
Jan 28, 2020
6536257
wip; added `race` macro ; started `Incoming` stream macro.
Jan 28, 2020
006a275
handle routing better
davidbarsky Jan 29, 2020
d252d04
Merge branch 'master' into async-await
davidbarsky Mar 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
dedicated request types
  • Loading branch information
David Barsky committed Nov 26, 2019
commit d31200d0da34b0de9d29113b68ae0b22fb9c8b50
5 changes: 3 additions & 2 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
edition = "2018"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know this was a thing but heads up, rustfmt picks up the edition from your crate if invoked via cargo so you might not need this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it normally should, but I think rust-analyzer didn't pick it up, but I think that was fixed.

# https://github.com/rust-lang/rustfmt/blob/master/Configurations.md#merge_imports
merge_imports = true
# merge_imports = true
# https://github.com/rust-lang/rustfmt/blob/master/Configurations.md#max_width
max_width = 120
# max_width = 120
388 changes: 168 additions & 220 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions lambda-attributes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ edition = "2018"
proc-macro = true

[dependencies]
proc-macro2 = { version = "0.4.30", features = ["nightly"] }
proc-macro2 = { version = "0.4.30" }
syn = { version = "1.0.5", features = ["full"] }
quote = "1"
proptest = "0.9.3"
9 changes: 5 additions & 4 deletions lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ authors = ["David Barsky <dbarsky@amazon.com>"]
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.18", features = ["async-await", "nightly"] }
tokio = { version = "0.2.0-alpha.5" }
hyper = { version = "0.13.0-alpha.2", features = ["unstable-stream"] }
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }
tokio = { version = "0.2.0-alpha.6" }
hyper = { version = "0.13.0-alpha.4", features = ["unstable-stream"] }
async-stream = "0.1"
serde = { version = "1", features = ["derive"] }
thiserror = "1.0"
anyhow = "1.0"
serde_json = "1.0.39"
fehler = "1.0.0-alpha.1"
bytes = "0.4"
http = "0.1.17"
headers = "0.2.1"
Expand Down
70 changes: 37 additions & 33 deletions lambda/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::err_fmt;
use crate::requests::{IntoRequest, NextEventRequest};
use crate::{err_fmt, Error};
use async_stream::try_stream;
use fehler::Exception;
use bytes::Bytes;
use futures::prelude::*;
use http::{Method, Request, Response, Uri, uri::PathAndQuery};
use http::{uri::PathAndQuery, Method, Request, Response, Uri};
use hyper::Body;
use serde::{Serialize, Deserialize};
use bytes::Bytes;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone)]
pub struct Client {
Expand All @@ -23,7 +23,7 @@ struct LambdaError {
#[derive(Debug, PartialEq)]
struct InitializationErrorRequest {
path: PathAndQuery,
error: LambdaError
error: LambdaError,
}

impl InitializationErrorRequest {
Expand All @@ -44,38 +44,34 @@ struct InvocationOkRequest {
impl InvocationOkRequest {
fn new(request_id: String, body: Vec<u8>) -> Self {
let path = format!("/runtime/invocation/{}/response", request_id);
let path = PathAndQuery::from_shared(Bytes::from(path)).expect("Unable to construct PathAndQuery");
Self {
path,
body
}
let path =
PathAndQuery::from_shared(Bytes::from(path)).expect("Unable to construct PathAndQuery");
Self { path, body }
}
}
#[derive(Debug, PartialEq)]
struct InvocationErrorRequest {
path: PathAndQuery,
error: LambdaError
error: LambdaError,
}

impl InvocationErrorRequest {
fn new(request_id: String, error: LambdaError) -> Self {
let path = format!("/runtime/invocation/{}/error", request_id);
let path = PathAndQuery::from_shared(Bytes::from(path)).expect("Unable to construct PathAndQuery");
Self {
path,
error
}
let path =
PathAndQuery::from_shared(Bytes::from(path)).expect("Unable to construct PathAndQuery");
Self { path, error }
}
}

#[test]
fn round_trip_lambda_error() -> Result<(), Exception> {
use serde_json::{json, from_value, to_value, Value};
fn round_trip_lambda_error() -> Result<(), anyhow::Error> {
use serde_json::{from_value, json, to_value, Value};
let expected = json!({
"errorMessage" : "Error parsing event data.",
"errorType" : "InvalidEventDataException"
});

let actual: LambdaError = from_value(expected.clone())?;
let actual: Value = to_value(actual)?;
assert_eq!(expected, actual);
Expand All @@ -91,43 +87,51 @@ impl Client {
}
}

fn set_origin(&self, req: Request<Vec<u8>>) -> Result<Request<Vec<u8>>, Exception> {
fn set_origin(&self, req: Request<Vec<u8>>) -> Result<Request<Vec<u8>>, anyhow::Error> {
let (mut parts, body) = req.into_parts();
let (scheme, authority) = {
let scheme = self.base.scheme_part().ok_or(err_fmt!("PathAndQuery not found"))?;
let authority = self.base.authority_part().ok_or(err_fmt!("Authority not found"))?;
let scheme = self
.base
.scheme_part()
.ok_or(err_fmt!("PathAndQuery not found"))?;
let authority = self
.base
.authority_part()
.ok_or(err_fmt!("Authority not found"))?;
(scheme, authority)
};
let path = parts.uri.path_and_query().ok_or(err_fmt!("PathAndQuery not found"))?;
let path = parts
.uri
.path_and_query()
.ok_or(err_fmt!("PathAndQuery not found"))?;

let uri = Uri::builder()
.scheme(scheme.clone())
.authority(authority.clone())
.path_and_query(path.clone())
.build()?;
.build()
.expect("Unable to build URI");

parts.uri = uri;
Ok(Request::from_parts(parts, body))
}

pub async fn call(&mut self, req: Request<Vec<u8>>) -> Result<Response<Body>, Exception> {
pub async fn call(&mut self, req: Request<Vec<u8>>) -> Result<Response<Body>, anyhow::Error> {
let req = self.set_origin(req)?;
let (parts, body) = req.into_parts();
let body = Body::from(body);
let req = Request::from_parts(parts, body);
self.client.request(req).await.map_err(Into::into)
let response = self.client.request(req).await.map_err(Error::Hyper)?;
Ok(response)
}
}

pub fn events(client: Client) -> impl Stream<Item = Result<Response<Body>, Exception>> {
pub fn events(client: Client) -> impl Stream<Item = Result<Response<Body>, anyhow::Error>> {
try_stream! {
let mut client = client;
loop {
let req = Request::builder()
.method(Method::GET)
.uri(Uri::from_static("/runtime/invocation/next"))
.body(Vec::new())
.expect("Unable to construct request.");
let req = NextEventRequest;
let req = req.into_req()?;
let event = client.call(req).await?;
yield event;
}
Expand Down
88 changes: 69 additions & 19 deletions lambda/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
//! ```
pub use crate::types::LambdaCtx;
use client::{events, Client};
use fehler::{Error, Exception};
use futures::{pin_mut, prelude::*};
use http::{Method, Request, Response, Uri};
pub use lambda_attributes::lambda;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{convert::TryFrom, env, error, fmt};
use thiserror::Error;

mod client;
mod requests;
/// Types availible to a Lambda function.
mod types;

Expand All @@ -57,16 +59,43 @@ impl std::fmt::Display for StringError {
}
}

#[derive(Error, Debug)]
pub enum Error {
#[error("error parsing environment variable: {var}")]
VarError {
var: &'static str,
#[source]
source: std::env::VarError,
},
#[error("error making an http request")]
Hyper(#[source] hyper::error::Error),
#[error("invalid URI: {uri}")]
InvalidUri {
uri: String,
#[source]
source: http::uri::InvalidUri,
},
#[error("invalid http request")]
InvalidRequest(#[source] http::Error),
#[error("serialization error")]
Json {
#[source]
source: serde_json::error::Error,
},
}

macro_rules! null_display {
($t:ty) => { impl fmt::Display for $t {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { Ok(()) }
} };
($t:ty) => {
impl fmt::Display for $t {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}
};
}

#[derive(PartialEq, Debug, Error)]
pub(crate) enum RuntimeError {

}
pub(crate) enum RuntimeError {}
null_display!(RuntimeError);

#[doc(hidden)]
Expand Down Expand Up @@ -96,7 +125,7 @@ pub struct Config {

impl Config {
/// Attempts to read configuration from environment variables.
pub fn from_env() -> Result<Self, Exception> {
pub fn from_env() -> Result<Self, anyhow::Error> {
let conf = Config {
endpoint: env::var("AWS_LAMBDA_RUNTIME_API")?,
function_name: env::var("AWS_LAMBDA_FUNCTION_NAME")?,
Expand Down Expand Up @@ -187,14 +216,17 @@ where
/// Ok(event)
/// }
/// ```
pub async fn run<Function, Event, Output>(mut handler: Function) -> Result<(), Exception>
pub async fn run<Function, Event, Output>(mut handler: Function) -> Result<(), anyhow::Error>
where
Function: Handler<Event, Output>,
Event: for<'de> Deserialize<'de>,
Output: Serialize,
{
let uri = env::var("AWS_LAMBDA_RUNTIME_API")?.into();
let uri = Uri::from_shared(uri)?;
let uri = env::var("AWS_LAMBDA_RUNTIME_API").map_err(|e| Error::VarError {
var: "AWS_LAMBDA_RUNTIME_API",
source: e,
})?;
let uri = Uri::from_str(&uri).map_err(|e| Error::InvalidUri { uri, source: e })?;
let mut client = Client::new(uri);
let stream = events(client.clone());
pin_mut!(stream);
Expand All @@ -203,21 +235,39 @@ where
let (parts, body) = event?.into_parts();
let mut ctx: LambdaCtx = LambdaCtx::try_from(parts.headers)?;
ctx.env_config = Config::from_env()?;
let body = body.try_concat().await?;
let body = serde_json::from_slice(&body)?;
let body = body.try_concat().await.map_err(Error::Hyper)?;
let body = serde_json::from_slice(&body).map_err(|e| Error::Json { source: e })?;

match handler.call(body, Some(ctx.clone())).await {
Ok(res) => {
let body = serde_json::to_vec(&res)?;
let uri = format!("/runtime/invocation/{}/response", &ctx.id).parse::<Uri>()?;
let req = Request::builder().uri(uri).method(Method::POST).body(body)?;
let body = serde_json::to_vec(&res).map_err(|e| Error::Json { source: e })?;
let uri = format!("/runtime/invocation/{}/response", &ctx.id)
.parse::<Uri>()
.map_err(|e| Error::InvalidUri {
uri: format!("/runtime/invocation/{}/response", &ctx.id),
source: e,
})?;
let req = Request::builder()
.uri(uri)
.method(Method::POST)
.body(body)
.expect("Invalid request");

client.call(req).await?;
}
Err(err) => {
let err = type_name_of_val(err);
let uri = format!("/runtime/invocation/{}/error", &ctx.id).parse::<Uri>()?;
let req = Request::builder().uri(uri).method(Method::POST).body(Vec::from(err))?;
let uri = format!("/runtime/invocation/{}/error", &ctx.id)
.parse::<Uri>()
.map_err(|e| Error::InvalidUri {
uri: format!("/runtime/invocation/{}/error", &ctx.id),
source: e,
})?;
let req = Request::builder()
.uri(uri)
.method(Method::POST)
.body(Vec::from(err))
.expect("Invalid request");

client.call(req).await?;
}
Expand Down Expand Up @@ -249,7 +299,7 @@ impl error::Error for MyError {
}

#[tokio::test]
async fn get_next() -> Result<(), Exception> {
async fn get_next() -> Result<(), Error> {
fn test_fn() -> Result<String, MyError> {
Err(MyError)
}
Expand Down
Loading