Skip to content

Commit 9935f30

Browse files
committed
Implement http_body::Body and implement some wasip3-http <-> http type conversions
Signed-off-by: Brian Hardock <brian.hardock@fermyon.com>
1 parent bb46ecc commit 9935f30

File tree

5 files changed

+551
-0
lines changed

5 files changed

+551
-0
lines changed

crates/wasip3/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,23 @@ edition.workspace = true
1111
repository.workspace = true
1212
rust-version.workspace = true
1313

14+
[features]
15+
http-compat = [
16+
"dep:bytes",
17+
"dep:http-body",
18+
"dep:http",
19+
"dep:thiserror",
20+
]
21+
1422
[dependencies]
1523
# NB: async bindings require `std` right now so this doesn't optionally disable
1624
# `std`, it's intentionally always enabled.
1725
wit-bindgen = { workspace = true, features = ['default', 'async'] }
26+
# Optional dependencies for extending http types.
27+
bytes = { version = "1.10.1", optional = true }
28+
http-body = { version = "1.0.1", optional = true }
29+
http = { version = "1.3.1", optional = true }
30+
thiserror = { version = "2.0.17", optional = true }
1831

1932
[dev-dependencies]
2033
futures = "0.3.31"
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use crate::{
2+
http::types::{ErrorCode, HeaderError, Trailers},
3+
wit_bindgen::{FutureReader, FutureWriter, StreamReader, StreamWriter},
4+
wit_future, wit_stream,
5+
};
6+
use http::HeaderMap;
7+
use http_body::{Body as _, Frame};
8+
use std::future::poll_fn;
9+
use std::{fmt::Debug, pin};
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
12+
13+
pub type BodyResult = Result<Option<Trailers>, ErrorCode>;
14+
15+
#[derive(Debug, thiserror::Error)]
16+
pub enum Error {
17+
/// The [`http_body::Body`] returned an error.
18+
#[error("body error: {0}")]
19+
HttpBody(#[source] BoxError),
20+
21+
/// Received trailers were rejected by [`Trailers::from_list`].
22+
#[error("invalid trailers: {0}")]
23+
InvalidTrailers(#[source] HeaderError),
24+
25+
/// The result future reader end was closed (dropped).
26+
///
27+
/// The result that couldn't be written is returned.
28+
#[error("result future reader closed")]
29+
ResultReaderClosed(BodyResult),
30+
31+
/// The stream reader end was closed (dropped).
32+
///
33+
/// The number of bytes written successfully is returned as `written` and
34+
/// the bytes that couldn't be written are returned as `unwritten`.
35+
#[error("stream reader closed")]
36+
StreamReaderClosed { written: usize, unwritten: Vec<u8> },
37+
}
38+
39+
/// BodyWriter coordinates a [`StreamWriter`] and [`FutureWriter`] associated
40+
/// with the write end of a `wasi:http` `Request` or `Response` body.
41+
pub struct BodyWriter {
42+
pub stream_writer: StreamWriter<u8>,
43+
pub result_writer: FutureWriter<BodyResult>,
44+
pub trailers: HeaderMap,
45+
}
46+
47+
impl BodyWriter {
48+
/// Returns a new writer and the matching stream and result future readers,
49+
/// which will typically be used to create a `wasi:http` `Request` or
50+
/// `Response`.
51+
pub fn new() -> (Self, StreamReader<u8>, FutureReader<BodyResult>) {
52+
let (stream_writer, stream_reader) = wit_stream::new();
53+
let (result_writer, result_reader) =
54+
// TODO: is there a more appropriate ErrorCode?
55+
wit_future::new(|| Err(ErrorCode::InternalError(Some("body writer dropped".into()))));
56+
(
57+
Self {
58+
stream_writer,
59+
result_writer,
60+
trailers: Default::default(),
61+
},
62+
stream_reader,
63+
result_reader,
64+
)
65+
}
66+
67+
/// Sends the given [`http_body::Body`] to this writer.
68+
///
69+
/// This copies all data frames from the body to this writer's stream and
70+
/// then writes any trailers from the body to the result future. On success
71+
/// the number of data bytes written to the stream (which does not including
72+
/// trailers) is returned.
73+
///
74+
/// If there is an error it is written to the result future.
75+
pub async fn send_http_body<T>(mut self, mut body: &mut T) -> Result<u64, Error>
76+
where
77+
T: http_body::Body + Unpin,
78+
T::Data: Into<Vec<u8>>,
79+
T::Error: Into<BoxError>,
80+
{
81+
let mut total_written = 0;
82+
83+
loop {
84+
let frame = poll_fn(|cx| pin::Pin::new(&mut body).poll_frame(cx)).await;
85+
86+
match frame {
87+
Some(Ok(frame)) => {
88+
let written = self.send_frame(frame).await?;
89+
total_written += written as u64;
90+
}
91+
Some(Err(err)) => {
92+
let err = err.into();
93+
// TODO: consider if there are better ErrorCode mappings
94+
let error_code = ErrorCode::InternalError(Some(err.to_string()));
95+
// TODO: log result_writer.write errors?
96+
_ = self.result_writer.write(Err(error_code)).await;
97+
return Err(Error::HttpBody(err));
98+
}
99+
None => break,
100+
}
101+
}
102+
let maybe_trailers = if self.trailers.is_empty() {
103+
None
104+
} else {
105+
Some(self.trailers.try_into().map_err(Error::InvalidTrailers)?)
106+
};
107+
match self.result_writer.write(Ok(maybe_trailers)).await {
108+
Ok(()) => Ok(total_written),
109+
Err(err) => Err(Error::ResultReaderClosed(err.value)),
110+
}
111+
}
112+
113+
/// Sends a [`http_body::Frame`].
114+
///
115+
/// - If the frame contains data, the data is written to this writer's
116+
/// stream and the size of the written data is returned.
117+
/// - If the frame contains trailers they are added to [`Self::trailers`]
118+
/// and `Ok(0)` is returned.
119+
pub async fn send_frame<T>(&mut self, frame: Frame<T>) -> Result<usize, Error>
120+
where
121+
T: Into<Vec<u8>>,
122+
{
123+
// Frame is a pseudo-enum which is either 'data' or 'trailers'
124+
if frame.is_data() {
125+
let data = frame.into_data().unwrap_or_else(|_| unreachable!()).into();
126+
let data_len = data.len();
127+
// write_all returns any unwritten data if the read end is dropped
128+
let unwritten = self.stream_writer.write_all(data).await;
129+
if !unwritten.is_empty() {
130+
return Err(Error::StreamReaderClosed {
131+
written: data_len - unwritten.len(),
132+
unwritten,
133+
});
134+
}
135+
Ok(data_len)
136+
} else if frame.is_trailers() {
137+
let trailers = frame.into_trailers().unwrap_or_else(|_| unreachable!());
138+
self.trailers.extend(trailers);
139+
Ok(0)
140+
} else {
141+
unreachable!("Frames are data or trailers");
142+
}
143+
}
144+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use super::to_internal_error_code;
2+
use crate::http::types::{ErrorCode, Fields, HeaderError, Headers, Method, Scheme};
3+
use std::convert::TryFrom;
4+
5+
impl TryFrom<Scheme> for http::uri::Scheme {
6+
type Error = http::uri::InvalidUri;
7+
8+
fn try_from(scheme: Scheme) -> Result<Self, Self::Error> {
9+
match scheme {
10+
Scheme::Http => Ok(http::uri::Scheme::HTTP),
11+
Scheme::Https => Ok(http::uri::Scheme::HTTPS),
12+
Scheme::Other(s) => s.parse(),
13+
}
14+
}
15+
}
16+
17+
impl From<&http::uri::Scheme> for Scheme {
18+
fn from(scheme: &http::uri::Scheme) -> Self {
19+
match scheme {
20+
s if s == &http::uri::Scheme::HTTP => Scheme::Http,
21+
s if s == &http::uri::Scheme::HTTPS => Scheme::Https,
22+
other => Scheme::Other(other.to_string()),
23+
}
24+
}
25+
}
26+
27+
impl TryFrom<Method> for http::Method {
28+
type Error = http::method::InvalidMethod;
29+
30+
fn try_from(method: Method) -> Result<Self, Self::Error> {
31+
match method {
32+
Method::Get => Ok(http::Method::GET),
33+
Method::Post => Ok(http::Method::POST),
34+
Method::Put => Ok(http::Method::PUT),
35+
Method::Delete => Ok(http::Method::DELETE),
36+
Method::Patch => Ok(http::Method::PATCH),
37+
Method::Head => Ok(http::Method::HEAD),
38+
Method::Options => Ok(http::Method::OPTIONS),
39+
Method::Connect => Ok(http::Method::CONNECT),
40+
Method::Trace => Ok(http::Method::TRACE),
41+
Method::Other(o) => http::Method::from_bytes(o.as_bytes()),
42+
}
43+
}
44+
}
45+
46+
impl From<&http::Method> for Method {
47+
fn from(method: &http::Method) -> Self {
48+
match method {
49+
&http::Method::GET => Method::Get,
50+
&http::Method::POST => Method::Post,
51+
&http::Method::PUT => Method::Put,
52+
&http::Method::DELETE => Method::Delete,
53+
&http::Method::PATCH => Method::Patch,
54+
&http::Method::HEAD => Method::Head,
55+
&http::Method::OPTIONS => Method::Options,
56+
&http::Method::CONNECT => Method::Connect,
57+
&http::Method::TRACE => Method::Trace,
58+
other => Method::Other(other.to_string()),
59+
}
60+
}
61+
}
62+
63+
impl TryFrom<Headers> for http::HeaderMap {
64+
type Error = ErrorCode;
65+
66+
fn try_from(headers: Headers) -> Result<Self, Self::Error> {
67+
headers
68+
.copy_all()
69+
.into_iter()
70+
.try_fold(http::HeaderMap::new(), |mut map, (k, v)| {
71+
let v = http::HeaderValue::from_bytes(&v).map_err(to_internal_error_code)?;
72+
let k: http::HeaderName = k.parse().map_err(to_internal_error_code)?;
73+
map.append(k, v);
74+
Ok(map)
75+
})
76+
}
77+
}
78+
79+
impl TryFrom<http::HeaderMap> for Fields {
80+
type Error = HeaderError;
81+
82+
fn try_from(map: http::HeaderMap) -> Result<Self, Self::Error> {
83+
// https://docs.rs/http/1.3.1/http/header/struct.HeaderMap.html#method.into_iter-2
84+
// For each yielded item that has None provided for the HeaderName, then
85+
// the associated header name is the same as that of the previously
86+
// yielded item. The first yielded item will have HeaderName set.
87+
let mut last_name = None;
88+
let iter = map.into_iter().map(move |(name, value)| {
89+
if name.is_some() {
90+
last_name = name;
91+
}
92+
let name = last_name
93+
.as_ref()
94+
.expect("HeaderMap::into_iter always returns Some(name) before None");
95+
let value = bytes::Bytes::from_owner(value).to_vec();
96+
(name.as_str().into(), value)
97+
});
98+
let entries = Vec::from_iter(iter);
99+
Fields::from_list(&entries)
100+
}
101+
}

0 commit comments

Comments
 (0)