Skip to content

Commit 07e7597

Browse files
committed
feat(p3-http): implement incoming HTTP
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 68f6d61 commit 07e7597

File tree

14 files changed

+1109
-537
lines changed

14 files changed

+1109
-537
lines changed

crates/wasi-http/src/p3/bindings.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ mod generated {
77
world: "wasi:http/proxy",
88
imports: {
99
"wasi:http/handler/[async]handle": async | store | trappable | tracing,
10+
"wasi:http/types/[method]request.consume-body": async | store | trappable | tracing,
11+
"wasi:http/types/[method]response.consume-body": async | store | trappable | tracing,
1012
"wasi:http/types/[static]request.new": async | store | trappable | tracing,
1113
"wasi:http/types/[static]response.new": async | store | trappable | tracing,
1214
default: trappable | tracing,

crates/wasi-http/src/p3/body.rs

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
use crate::p3::WasiHttpView;
2+
use crate::p3::bindings::http::types::{ErrorCode, Trailers};
3+
use anyhow::Context as _;
4+
use bytes::{Bytes, BytesMut};
5+
use core::future::poll_fn;
6+
use core::pin::{Pin, pin};
7+
use core::task::{Context, Poll, ready};
8+
use http_body_util::combinators::BoxBody;
9+
use std::sync::Arc;
10+
use tokio::sync::{mpsc, oneshot};
11+
use wasmtime::component::{
12+
Accessor, AccessorTask, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter,
13+
GuardedStreamReader, HasData, Resource, StreamReader,
14+
};
15+
16+
/// The concrete type behind a `wasi:http/types/body` resource.
17+
pub(crate) enum Body {
18+
/// Body constructed by the guest
19+
Guest(GuestBodyContext),
20+
/// Body constructed by the host.
21+
Host(BoxBody<Bytes, ErrorCode>),
22+
/// Body is consumed.
23+
Consumed,
24+
}
25+
26+
/// Context of a body constructed by the guest
27+
pub struct GuestBodyContext {
28+
/// The body stream
29+
pub(crate) contents_rx: Option<StreamReader<u8>>,
30+
/// Future, on which guest will write result and optional trailers
31+
pub(crate) trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
32+
/// Future, on which transmission result will be written
33+
pub(crate) result_tx: FutureWriter<Result<(), ErrorCode>>,
34+
}
35+
36+
pub struct GuestBodyTaskContext {
37+
pub(crate) cx: GuestBodyContext,
38+
pub(crate) contents_tx: mpsc::Sender<Bytes>,
39+
pub(crate) trailers_tx: oneshot::Sender<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>,
40+
}
41+
42+
impl GuestBodyTaskContext {
43+
/// Consume the body given an I/O operation `io`.
44+
///
45+
/// This function returns a [GuestBodyTask], which implements a [AccessorTask] and
46+
/// must be run using the engine's event loop.
47+
pub fn consume<Fut>(self, io: Fut) -> GuestBodyTask<Fut>
48+
where
49+
Fut: Future<Output = Result<(), ErrorCode>>,
50+
{
51+
GuestBodyTask { cx: self, io }
52+
}
53+
}
54+
55+
pub struct GuestBodyTask<T> {
56+
cx: GuestBodyTaskContext,
57+
io: T,
58+
}
59+
60+
impl<T, U, Fut> AccessorTask<T, U, wasmtime::Result<()>> for GuestBodyTask<Fut>
61+
where
62+
T: WasiHttpView,
63+
U: HasData,
64+
Fut: Future<Output = Result<(), ErrorCode>> + Send + 'static,
65+
{
66+
async fn run(self, store: &Accessor<T, U>) -> wasmtime::Result<()> {
67+
let Self {
68+
cx:
69+
GuestBodyTaskContext {
70+
cx:
71+
GuestBodyContext {
72+
contents_rx,
73+
trailers_rx,
74+
result_tx,
75+
},
76+
contents_tx,
77+
mut trailers_tx,
78+
},
79+
io,
80+
} = self;
81+
let trailers_rx = GuardedFutureReader::new(store, trailers_rx);
82+
let mut result_tx = GuardedFutureWriter::new(store, result_tx);
83+
if let Some(contents_rx) = contents_rx {
84+
let mut contents_rx = GuardedStreamReader::new(store, contents_rx);
85+
// TODO: use content-length
86+
let mut buf = BytesMut::with_capacity(8192);
87+
while !contents_rx.is_closed() {
88+
let mut tx = pin!(contents_tx.reserve());
89+
let Some(Ok(tx)) = ({
90+
let mut contents_tx_dropped = pin!(contents_rx.watch_writer());
91+
poll_fn(|cx| match contents_tx_dropped.as_mut().poll(cx) {
92+
Poll::Ready(()) => return Poll::Ready(None),
93+
Poll::Pending => tx.as_mut().poll(cx).map(Some),
94+
})
95+
.await
96+
}) else {
97+
// Either:
98+
// - body receiver has been closed
99+
// - guest writer has been closed
100+
break;
101+
};
102+
buf = contents_rx.read(buf).await;
103+
if !buf.is_empty() {
104+
tx.send(buf.split().freeze());
105+
}
106+
}
107+
}
108+
drop(contents_tx);
109+
110+
let mut rx = pin!(trailers_rx.read());
111+
match poll_fn(|cx| match trailers_tx.poll_closed(cx) {
112+
Poll::Ready(()) => return Poll::Ready(None),
113+
Poll::Pending => rx.as_mut().poll(cx).map(Some),
114+
})
115+
.await
116+
{
117+
Some(Some(Ok(Some(trailers)))) => {
118+
let trailers = store.with(|mut store| {
119+
store
120+
.data_mut()
121+
.http()
122+
.table
123+
.delete(trailers)
124+
.context("failed to delete trailers")
125+
})?;
126+
_ = trailers_tx.send(Ok(Some(trailers.into())));
127+
}
128+
Some(Some(Ok(None))) => {
129+
_ = trailers_tx.send(Ok(None));
130+
}
131+
Some(Some(Err(err))) => {
132+
_ = trailers_tx.send(Err(err));
133+
}
134+
Some(None) | None => {
135+
// Either:
136+
// - trailer receiver has been closed
137+
// - guest writer has been closed
138+
drop(trailers_tx);
139+
}
140+
}
141+
142+
let mut io = pin!(io);
143+
if let Some(res) = {
144+
let mut result_rx_dropped = pin!(result_tx.watch_reader());
145+
poll_fn(|cx| match result_rx_dropped.as_mut().poll(cx) {
146+
Poll::Ready(()) => return Poll::Ready(None),
147+
Poll::Pending => io.as_mut().poll(cx).map(Some),
148+
})
149+
.await
150+
} {
151+
result_tx.write(res).await;
152+
}
153+
Ok(())
154+
}
155+
}
156+
157+
pub(crate) struct GuestBody {
158+
pub(crate) contents_rx: Option<mpsc::Receiver<Bytes>>,
159+
pub(crate) trailers_rx:
160+
Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>,
161+
}
162+
163+
impl http_body::Body for GuestBody {
164+
type Data = Bytes;
165+
type Error = ErrorCode;
166+
167+
fn poll_frame(
168+
mut self: Pin<&mut Self>,
169+
cx: &mut Context<'_>,
170+
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
171+
if let Some(contents_rx) = self.contents_rx.as_mut() {
172+
while let Some(buf) = ready!(contents_rx.poll_recv(cx)) {
173+
return Poll::Ready(Some(Ok(http_body::Frame::data(buf))));
174+
}
175+
self.contents_rx = None;
176+
}
177+
178+
let Some(trailers_rx) = self.trailers_rx.as_mut() else {
179+
return Poll::Ready(None);
180+
};
181+
182+
let res = ready!(Pin::new(trailers_rx).poll(cx));
183+
self.trailers_rx = None;
184+
match res {
185+
Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(
186+
Arc::unwrap_or_clone(trailers),
187+
)))),
188+
Ok(Ok(None)) => Poll::Ready(None),
189+
Ok(Err(err)) => Poll::Ready(Some(Err(err))),
190+
Err(..) => Poll::Ready(None),
191+
}
192+
}
193+
194+
fn is_end_stream(&self) -> bool {
195+
if let Some(contents_rx) = self.contents_rx.as_ref() {
196+
if !contents_rx.is_empty() || !contents_rx.is_closed() {
197+
return false;
198+
}
199+
}
200+
if let Some(trailers_rx) = self.trailers_rx.as_ref() {
201+
if !trailers_rx.is_terminated() {
202+
return false;
203+
}
204+
}
205+
return true;
206+
}
207+
208+
fn size_hint(&self) -> http_body::SizeHint {
209+
// TODO: use content-length
210+
http_body::SizeHint::default()
211+
}
212+
}
213+
214+
pub(crate) struct ConsumedBody;
215+
216+
impl http_body::Body for ConsumedBody {
217+
type Data = Bytes;
218+
type Error = ErrorCode;
219+
220+
fn poll_frame(
221+
self: Pin<&mut Self>,
222+
_cx: &mut Context<'_>,
223+
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
224+
Poll::Ready(Some(Err(ErrorCode::InternalError(Some(
225+
"body consumed".into(),
226+
)))))
227+
}
228+
229+
fn is_end_stream(&self) -> bool {
230+
true
231+
}
232+
233+
fn size_hint(&self) -> http_body::SizeHint {
234+
http_body::SizeHint::with_exact(0)
235+
}
236+
}

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use anyhow::bail;
55
use wasmtime::component::{Accessor, Resource};
66

77
impl HostWithStore for WasiHttp {
8-
async fn handle<U>(
9-
store: &Accessor<U, Self>,
8+
#[expect(unused, reason = "work in progress")] // TODO: implement
9+
async fn handle<T>(
10+
store: &Accessor<T, Self>,
1011
request: Resource<Request>,
1112
) -> wasmtime::Result<Result<Resource<Response>, ErrorCode>> {
1213
bail!("TODO")

crates/wasi-http/src/p3/host/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::p3::bindings::http::types::{Fields, Request, Response};
22
use anyhow::Context as _;
3-
use core::ops::Deref;
43
use wasmtime::component::{Resource, ResourceTable};
54

65
mod handler;

0 commit comments

Comments
 (0)