Skip to content

Commit d26886d

Browse files
committed
feat(p3-http): begin outgoing HTTP implementation
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 66d87ce commit d26886d

File tree

8 files changed

+570
-120
lines changed

8 files changed

+570
-120
lines changed

crates/wasi-http/src/p3/body.rs

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ use http_body_util::combinators::BoxBody;
99
use std::sync::Arc;
1010
use tokio::sync::{mpsc, oneshot};
1111
use tokio_util::sync::PollSender;
12-
use wasmtime::StoreContextMut;
1312
use wasmtime::component::{
1413
Accessor, FutureConsumer, FutureReader, Resource, Source, StreamConsumer, StreamReader,
1514
StreamResult,
1615
};
16+
use wasmtime::{AsContextMut, StoreContextMut};
1717

1818
/// The concrete type behind a `wasi:http/types/body` resource.
1919
pub(crate) enum Body {
@@ -24,10 +24,14 @@ pub(crate) enum Body {
2424
/// Future, on which guest will write result and optional trailers
2525
trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
2626
/// Channel, on which transmission result will be written
27-
result_tx: oneshot::Sender<Result<(), ErrorCode>>,
27+
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
2828
},
2929
/// Body constructed by the host.
30-
Host(BoxBody<Bytes, ErrorCode>),
30+
Host {
31+
body: BoxBody<Bytes, ErrorCode>,
32+
/// Channel, on which transmission result will be written
33+
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
34+
},
3135
/// Body is consumed.
3236
Consumed,
3337
}
@@ -72,6 +76,38 @@ pub(crate) struct GuestBody {
7276
Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>,
7377
}
7478

79+
impl GuestBody {
80+
pub fn new<T: 'static>(
81+
mut store: impl AsContextMut<Data = T>,
82+
contents_rx: Option<StreamReader<u8>>,
83+
trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
84+
getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>,
85+
) -> Self {
86+
let (trailers_http_tx, trailers_http_rx) = oneshot::channel();
87+
trailers_rx.pipe(
88+
&mut store,
89+
GuestTrailerConsumer {
90+
tx: trailers_http_tx,
91+
getter,
92+
},
93+
);
94+
let contents_rx = contents_rx.map(|rx| {
95+
let (http_tx, http_rx) = mpsc::channel(1);
96+
rx.pipe(
97+
store,
98+
GuestBodyConsumer {
99+
tx: PollSender::new(http_tx),
100+
},
101+
);
102+
http_rx
103+
});
104+
Self {
105+
trailers_rx: Some(trailers_http_rx),
106+
contents_rx,
107+
}
108+
}
109+
}
110+
75111
impl http_body::Body for GuestBody {
76112
type Data = Bytes;
77113
type Error = ErrorCode;
@@ -181,3 +217,41 @@ where
181217
}
182218
}
183219
}
220+
221+
pub(crate) struct IncomingResponseBody {
222+
pub incoming: hyper::body::Incoming,
223+
pub timeout: tokio::time::Interval,
224+
}
225+
226+
impl http_body::Body for IncomingResponseBody {
227+
type Data = <hyper::body::Incoming as http_body::Body>::Data;
228+
type Error = ErrorCode;
229+
230+
fn poll_frame(
231+
mut self: Pin<&mut Self>,
232+
cx: &mut Context<'_>,
233+
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
234+
match Pin::new(&mut self.as_mut().incoming).poll_frame(cx) {
235+
Poll::Ready(None) => Poll::Ready(None),
236+
Poll::Ready(Some(Err(err))) => {
237+
Poll::Ready(Some(Err(ErrorCode::from_hyper_response_error(err))))
238+
}
239+
Poll::Ready(Some(Ok(frame))) => {
240+
self.timeout.reset();
241+
Poll::Ready(Some(Ok(frame)))
242+
}
243+
Poll::Pending => {
244+
ready!(self.timeout.poll_tick(cx));
245+
Poll::Ready(Some(Err(ErrorCode::ConnectionReadTimeout)))
246+
}
247+
}
248+
}
249+
250+
fn is_end_stream(&self) -> bool {
251+
self.incoming.is_end_stream()
252+
}
253+
254+
fn size_hint(&self) -> http_body::SizeHint {
255+
self.incoming.size_hint()
256+
}
257+
}

crates/wasi-http/src/p3/conv.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,48 @@
11
use crate::p3::bindings::http::types::{ErrorCode, Method, Scheme};
22
use core::convert::Infallible;
3+
use core::error::Error as _;
4+
use tracing::warn;
35

46
impl From<Infallible> for ErrorCode {
57
fn from(x: Infallible) -> Self {
68
match x {}
79
}
810
}
911

12+
impl ErrorCode {
13+
/// Translate a [`hyper::Error`] to a wasi-http [ErrorCode] in the context of a request.
14+
pub fn from_hyper_request_error(err: hyper::Error) -> Self {
15+
// If there's a source, we might be able to extract a wasi-http error from it.
16+
if let Some(cause) = err.source() {
17+
if let Some(err) = cause.downcast_ref::<Self>() {
18+
return err.clone();
19+
}
20+
}
21+
22+
warn!("hyper request error: {err:?}");
23+
24+
Self::HttpProtocolError
25+
}
26+
27+
/// Translate a [`hyper::Error`] to a wasi-http [ErrorCode] in the context of a response.
28+
pub fn from_hyper_response_error(err: hyper::Error) -> Self {
29+
if err.is_timeout() {
30+
return ErrorCode::HttpResponseTimeout;
31+
}
32+
33+
// If there's a source, we might be able to extract a wasi-http error from it.
34+
if let Some(cause) = err.source() {
35+
if let Some(err) = cause.downcast_ref::<Self>() {
36+
return err.clone();
37+
}
38+
}
39+
40+
warn!("hyper response error: {err:?}");
41+
42+
ErrorCode::HttpProtocolError
43+
}
44+
}
45+
1046
impl From<http::Method> for Method {
1147
fn from(method: http::Method) -> Self {
1248
Self::from(&method)

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,116 @@
11
use crate::p3::bindings::http::handler::{Host, HostWithStore};
2-
use crate::p3::bindings::http::types::{Request, Response};
3-
use crate::p3::{HttpResult, WasiHttp, WasiHttpCtxView};
2+
use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
3+
use crate::p3::body::{Body, ConsumedBody, GuestBody};
4+
use crate::p3::host::{delete_request, push_response};
5+
use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
6+
use http::header::HOST;
7+
use http::{HeaderValue, Uri};
8+
use http_body_util::BodyExt as _;
9+
use std::sync::Arc;
10+
use tokio::sync::oneshot;
11+
use tracing::debug;
412
use wasmtime::component::{Accessor, Resource};
513

614
impl HostWithStore for WasiHttp {
7-
#[expect(unused, reason = "work in progress")] // TODO: implement
815
async fn handle<T>(
916
store: &Accessor<T, Self>,
1017
req: Resource<Request>,
1118
) -> HttpResult<Resource<Response>> {
12-
todo!()
19+
let getter = store.getter();
20+
let (res_result_tx, res_result_rx) = oneshot::channel();
21+
let (fut, req_result_tx) = store.with(|mut store| {
22+
let WasiHttpCtxView { ctx, table } = store.get();
23+
let Request {
24+
method,
25+
scheme,
26+
authority,
27+
path_with_query,
28+
headers,
29+
options,
30+
body,
31+
} = delete_request(table, req).map_err(HttpError::trap)?;
32+
let mut headers = Arc::unwrap_or_clone(headers);
33+
if ctx.set_host_header() {
34+
let host = if let Some(authority) = authority.as_ref() {
35+
HeaderValue::try_from(authority.as_str())
36+
.map_err(|err| ErrorCode::InternalError(Some(err.to_string())))?
37+
} else {
38+
HeaderValue::from_static("")
39+
};
40+
headers.insert(HOST, host);
41+
}
42+
43+
let scheme = match scheme {
44+
None => ctx.default_scheme().ok_or(ErrorCode::HttpProtocolError)?,
45+
Some(scheme) if ctx.is_supported_scheme(&scheme) => scheme,
46+
Some(..) => return Err(ErrorCode::HttpProtocolError.into()),
47+
};
48+
let mut uri = Uri::builder().scheme(scheme);
49+
if let Some(authority) = authority {
50+
uri = uri.authority(authority)
51+
};
52+
if let Some(path_with_query) = path_with_query {
53+
uri = uri.path_and_query(path_with_query)
54+
};
55+
let uri = uri.build().map_err(|err| {
56+
debug!(?err, "failed to build request URI");
57+
ErrorCode::HttpRequestUriInvalid
58+
})?;
59+
60+
let mut req = http::Request::builder();
61+
*req.headers_mut().unwrap() = headers;
62+
let (body, result_tx) = match body {
63+
Body::Guest {
64+
contents_rx,
65+
trailers_rx,
66+
result_tx,
67+
} => (
68+
GuestBody::new(&mut store, contents_rx, trailers_rx, getter).boxed(),
69+
Some(result_tx),
70+
),
71+
Body::Host { body, result_tx } => (body, Some(result_tx)),
72+
Body::Consumed => (ConsumedBody.boxed(), None),
73+
};
74+
let req = req
75+
.method(method)
76+
.uri(uri)
77+
.body(body)
78+
.map_err(|err| ErrorCode::InternalError(Some(err.to_string())))?;
79+
HttpResult::Ok((
80+
store.get().ctx.send_request(
81+
req,
82+
options.as_deref().copied(),
83+
Box::new(async {
84+
let Ok(fut) = res_result_rx.await else {
85+
return Ok(());
86+
};
87+
Box::into_pin(fut).await
88+
}),
89+
),
90+
result_tx,
91+
))
92+
})?;
93+
let (res, io) = Box::into_pin(fut).await?;
94+
if let Some(req_result_tx) = req_result_tx {
95+
_ = req_result_tx.send(io);
96+
} else {
97+
Box::into_pin(io).await?;
98+
}
99+
let (
100+
http::response::Parts {
101+
status, headers, ..
102+
},
103+
body,
104+
) = res.into_parts();
105+
let res = Response {
106+
status,
107+
headers: Arc::new(headers),
108+
body: Body::Host {
109+
body,
110+
result_tx: res_result_tx,
111+
},
112+
};
113+
store.with(|mut store| push_response(store.get().table, res).map_err(HttpError::trap))
13114
}
14115
}
15116

0 commit comments

Comments
 (0)