Skip to content

Commit 01362a0

Browse files
Implement http_body::Body and some useful conversions (#126)
* Implement http_body::Body and implement some wasip3-http <-> http type conversions Signed-off-by: Brian Hardock <brian.hardock@fermyon.com> * Add http conversions + example Signed-off-by: Brian Hardock <brian.hardock@fermyon.com> --------- Signed-off-by: Brian Hardock <brian.hardock@fermyon.com>
1 parent bb46ecc commit 01362a0

File tree

6 files changed

+873
-0
lines changed

6 files changed

+873
-0
lines changed

crates/wasip3/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,27 @@ edition.workspace = true
1111
repository.workspace = true
1212
rust-version.workspace = true
1313

14+
[features]
15+
http-compat = [
16+
"dep:bytes",
17+
"dep:http-body",
18+
"dep:http",
19+
"dep:thiserror",
20+
]
21+
1422
[dependencies]
1523
# NB: async bindings require `std` right now so this doesn't optionally disable
1624
# `std`, it's intentionally always enabled.
1725
wit-bindgen = { workspace = true, features = ['default', 'async'] }
26+
# Optional dependencies for extending http types.
27+
bytes = { version = "1.10.1", optional = true }
28+
http-body = { version = "1.0.1", optional = true }
29+
http = { version = "1.3.1", optional = true }
30+
thiserror = { version = "2.0.17", optional = true }
1831

1932
[dev-dependencies]
2033
futures = "0.3.31"
34+
http = "1.3.1"
2135

2236
[[example]]
2337
name = "cli-command"
@@ -26,3 +40,8 @@ crate-type = ["cdylib"]
2640
[[example]]
2741
name = "http-proxy"
2842
crate-type = ["cdylib"]
43+
44+
[[example]]
45+
name = "http-proxy-compat"
46+
crate-type = ["cdylib"]
47+
required-features = ["http-compat"]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use wasip3::http_compat::{IncomingRequestBody, http_from_wasi_request, http_into_wasi_response};
2+
use wasip3::http::types::{self, ErrorCode};
3+
4+
wasip3::http::proxy::export!(Example);
5+
6+
struct Example;
7+
8+
impl wasip3::exports::http::handler::Guest for Example {
9+
async fn handle(request: types::Request) -> Result<types::Response, ErrorCode> {
10+
let request = http_from_wasi_request(request)?;
11+
let response = serve(request).await?;
12+
http_into_wasi_response(response)
13+
}
14+
}
15+
16+
async fn serve(_request: http::Request<IncomingRequestBody>) -> Result<http::Response<String>, ErrorCode> {
17+
Ok(http::Response::new("Hello, WASI!".to_string()))
18+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use crate::{
2+
http::types::{ErrorCode, HeaderError, Trailers},
3+
wit_bindgen::{FutureReader, FutureWriter, StreamReader, StreamWriter},
4+
wit_future, wit_stream,
5+
};
6+
use http::HeaderMap;
7+
use http_body::{Body as _, Frame};
8+
use std::future::poll_fn;
9+
use std::{fmt::Debug, pin};
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
12+
13+
pub type BodyResult = Result<Option<Trailers>, ErrorCode>;
14+
15+
#[derive(Debug, thiserror::Error)]
16+
pub enum Error {
17+
/// The [`http_body::Body`] returned an error.
18+
#[error("body error: {0}")]
19+
HttpBody(#[source] BoxError),
20+
21+
/// Received trailers were rejected by [`Trailers::from_list`].
22+
#[error("invalid trailers: {0}")]
23+
InvalidTrailers(#[source] HeaderError),
24+
25+
/// The result future reader end was closed (dropped).
26+
///
27+
/// The result that couldn't be written is returned.
28+
#[error("result future reader closed")]
29+
ResultReaderClosed(BodyResult),
30+
31+
/// The stream reader end was closed (dropped).
32+
///
33+
/// The number of bytes written successfully is returned as `written` and
34+
/// the bytes that couldn't be written are returned as `unwritten`.
35+
#[error("stream reader closed")]
36+
StreamReaderClosed { written: usize, unwritten: Vec<u8> },
37+
}
38+
39+
/// BodyWriter coordinates a [`StreamWriter`] and [`FutureWriter`] associated
40+
/// with the write end of a `wasi:http` `Request` or `Response` body.
41+
pub struct BodyWriter {
42+
pub stream_writer: StreamWriter<u8>,
43+
pub result_writer: FutureWriter<BodyResult>,
44+
pub trailers: HeaderMap,
45+
}
46+
47+
impl BodyWriter {
48+
/// Returns a new writer and the matching stream and result future readers,
49+
/// which will typically be used to create a `wasi:http` `Request` or
50+
/// `Response`.
51+
pub fn new() -> (Self, StreamReader<u8>, FutureReader<BodyResult>) {
52+
let (stream_writer, stream_reader) = wit_stream::new();
53+
let (result_writer, result_reader) =
54+
// TODO: is there a more appropriate ErrorCode?
55+
wit_future::new(|| Err(ErrorCode::InternalError(Some("body writer dropped".into()))));
56+
(
57+
Self {
58+
stream_writer,
59+
result_writer,
60+
trailers: Default::default(),
61+
},
62+
stream_reader,
63+
result_reader,
64+
)
65+
}
66+
67+
/// Sends the given [`http_body::Body`] to this writer.
68+
///
69+
/// This copies all data frames from the body to this writer's stream and
70+
/// then writes any trailers from the body to the result future. On success
71+
/// the number of data bytes written to the stream (which does not including
72+
/// trailers) is returned.
73+
///
74+
/// If there is an error it is written to the result future.
75+
pub async fn send_http_body<T>(mut self, mut body: &mut T) -> Result<u64, Error>
76+
where
77+
T: http_body::Body + Unpin,
78+
T::Data: Into<Vec<u8>>,
79+
T::Error: Into<BoxError>,
80+
{
81+
let mut total_written = 0;
82+
83+
loop {
84+
let frame = poll_fn(|cx| pin::Pin::new(&mut body).poll_frame(cx)).await;
85+
86+
match frame {
87+
Some(Ok(frame)) => {
88+
let written = self.send_frame(frame).await?;
89+
total_written += written as u64;
90+
}
91+
Some(Err(err)) => {
92+
let err = err.into();
93+
// TODO: consider if there are better ErrorCode mappings
94+
let error_code = ErrorCode::InternalError(Some(err.to_string()));
95+
// TODO: log result_writer.write errors?
96+
_ = self.result_writer.write(Err(error_code)).await;
97+
return Err(Error::HttpBody(err));
98+
}
99+
None => break,
100+
}
101+
}
102+
let maybe_trailers = if self.trailers.is_empty() {
103+
None
104+
} else {
105+
Some(self.trailers.try_into().map_err(Error::InvalidTrailers)?)
106+
};
107+
match self.result_writer.write(Ok(maybe_trailers)).await {
108+
Ok(()) => Ok(total_written),
109+
Err(err) => Err(Error::ResultReaderClosed(err.value)),
110+
}
111+
}
112+
113+
/// Sends a [`http_body::Frame`].
114+
///
115+
/// - If the frame contains data, the data is written to this writer's
116+
/// stream and the size of the written data is returned.
117+
/// - If the frame contains trailers they are added to [`Self::trailers`]
118+
/// and `Ok(0)` is returned.
119+
pub async fn send_frame<T>(&mut self, frame: Frame<T>) -> Result<usize, Error>
120+
where
121+
T: Into<Vec<u8>>,
122+
{
123+
// Frame is a pseudo-enum which is either 'data' or 'trailers'
124+
if frame.is_data() {
125+
let data = frame.into_data().unwrap_or_else(|_| unreachable!()).into();
126+
let data_len = data.len();
127+
// write_all returns any unwritten data if the read end is dropped
128+
let unwritten = self.stream_writer.write_all(data).await;
129+
if !unwritten.is_empty() {
130+
return Err(Error::StreamReaderClosed {
131+
written: data_len - unwritten.len(),
132+
unwritten,
133+
});
134+
}
135+
Ok(data_len)
136+
} else if frame.is_trailers() {
137+
let trailers = frame.into_trailers().unwrap_or_else(|_| unreachable!());
138+
self.trailers.extend(trailers);
139+
Ok(0)
140+
} else {
141+
unreachable!("Frames are data or trailers");
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)