diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml
index 126b8f7161..e39d1e518d 100644
--- a/.github/workflows/CI.yml
+++ b/.github/workflows/CI.yml
@@ -144,7 +144,7 @@ jobs:
- name: Test
# Can't enable tcp feature since Miri does not support the tokio runtime
- run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,stream,nightly
+ run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,nightly
features:
name: features
diff --git a/Cargo.toml b/Cargo.toml
index 554d092034..be09230f0c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,7 +27,8 @@ futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
-http-body = "0.4"
+http-body = { git = "https://github.com/hyperium/http-body", branch = "master" }
+http-body-util = { git = "https://github.com/hyperium/http-body", branch = "master" }
httpdate = "1.0"
httparse = "1.6"
h2 = { version = "0.3.9", optional = true }
@@ -80,7 +81,6 @@ full = [
"http1",
"http2",
"server",
- "stream",
"runtime",
]
@@ -92,9 +92,6 @@ http2 = ["h2"]
client = []
server = []
-# `impl Stream` for things
-stream = []
-
# Tokio support
runtime = [
"tcp",
diff --git a/benches/body.rs b/benches/body.rs
index 255914d7a8..f716314dc1 100644
--- a/benches/body.rs
+++ b/benches/body.rs
@@ -6,7 +6,7 @@ extern crate test;
use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
-use hyper::body::Body;
+use http_body_util::StreamBody;
macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
@@ -20,9 +20,10 @@ macro_rules! bench_stream {
$bencher.iter(|| {
rt.block_on(async {
- let $body_pat = Body::wrap_stream(
+ let $body_pat = StreamBody::new(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
);
+
$block;
});
});
diff --git a/benches/server.rs b/benches/server.rs
index 7ca0d0896a..fed50c0710 100644
--- a/benches/server.rs
+++ b/benches/server.rs
@@ -9,10 +9,11 @@ use std::sync::mpsc;
use std::time::Duration;
use futures_util::{stream, StreamExt};
+use http_body_util::StreamBody;
use tokio::sync::oneshot;
use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Response, Server};
+use hyper::{Response, Server};
macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => {{
@@ -101,7 +102,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
- Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
+ StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}
@@ -123,7 +124,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
- Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
+ StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}
diff --git a/examples/echo.rs b/examples/echo.rs
index ff7573049e..42404b5f73 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -1,6 +1,5 @@
#![deny(warnings)]
-use futures_util::TryStreamExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
@@ -16,16 +15,17 @@ async fn echo(req: Request
) -> Result, hyper::Error> {
// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),
+ // TODO: Fix this, broken in PR #2896
// Convert to uppercase before sending back to client using a stream.
- (&Method::POST, "/echo/uppercase") => {
- let chunk_stream = req.into_body().map_ok(|chunk| {
- chunk
- .iter()
- .map(|byte| byte.to_ascii_uppercase())
- .collect::>()
- });
- Ok(Response::new(Body::wrap_stream(chunk_stream)))
- }
+ // (&Method::POST, "/echo/uppercase") => {
+ // let chunk_stream = req.into_body().map_ok(|chunk| {
+ // chunk
+ // .iter()
+ // .map(|byte| byte.to_ascii_uppercase())
+ // .collect::>()
+ // });
+ // Ok(Response::new(Body::wrap_stream(chunk_stream)))
+ // }
// Reverse the entire body before sending back to the client.
//
diff --git a/examples/send_file.rs b/examples/send_file.rs
index 3f660abf72..8456268755 100644
--- a/examples/send_file.rs
+++ b/examples/send_file.rs
@@ -1,9 +1,5 @@
#![deny(warnings)]
-use tokio::fs::File;
-
-use tokio_util::codec::{BytesCodec, FramedRead};
-
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};
@@ -48,11 +44,8 @@ fn not_found() -> Response {
}
async fn simple_file_send(filename: &str) -> Result> {
- // Serve a file by asynchronously reading it by chunks using tokio-util crate.
-
- if let Ok(file) = File::open(filename).await {
- let stream = FramedRead::new(file, BytesCodec::new());
- let body = Body::wrap_stream(stream);
+ if let Ok(contents) = tokio::fs::read(filename).await {
+ let body = contents.into();
return Ok(Response::new(body));
}
diff --git a/examples/web_api.rs b/examples/web_api.rs
index 5226249b35..855ce5bc77 100644
--- a/examples/web_api.rs
+++ b/examples/web_api.rs
@@ -1,7 +1,6 @@
#![deny(warnings)]
use bytes::Buf;
-use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
@@ -24,18 +23,10 @@ async fn client_request_response(client: &Client) -> ResultPOST request body: {}
Response: ",
- POST_DATA,
- )
- .into())
- });
- let after = web_res.into_body();
- let body = Body::wrap_stream(before.chain(after));
- Ok(Response::new(body))
+ let res_body = web_res.into_body();
+
+ Ok(Response::new(res_body))
}
async fn api_post_response(req: Request) -> Result> {
diff --git a/src/body/body.rs b/src/body/body.rs
index 9dc1a034f9..0ba63a4b68 100644
--- a/src/body/body.rs
+++ b/src/body/body.rs
@@ -1,20 +1,14 @@
use std::borrow::Cow;
-#[cfg(feature = "stream")]
-use std::error::Error as StdError;
use std::fmt;
use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
-#[cfg(feature = "stream")]
-use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};
use super::DecodedLength;
-#[cfg(feature = "stream")]
-use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
@@ -56,12 +50,6 @@ enum Kind {
},
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
- #[cfg(feature = "stream")]
- Wrapped(
- SyncWrapper<
- Pin>> + Send>>,
- >,
- ),
}
struct Extra {
@@ -164,39 +152,6 @@ impl Body {
(tx, rx)
}
- /// Wrap a futures `Stream` in a box inside `Body`.
- ///
- /// # Example
- ///
- /// ```
- /// # use hyper::Body;
- /// let chunks: Vec> = vec![
- /// Ok("hello"),
- /// Ok(" "),
- /// Ok("world"),
- /// ];
- ///
- /// let stream = futures_util::stream::iter(chunks);
- ///
- /// let body = Body::wrap_stream(stream);
- /// ```
- ///
- /// # Optional
- ///
- /// This function requires enabling the `stream` feature in your
- /// `Cargo.toml`.
- #[cfg(feature = "stream")]
- #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
- pub fn wrap_stream(stream: S) -> Body
- where
- S: Stream- > + Send + 'static,
- O: Into + 'static,
- E: Into> + 'static,
- {
- let mapped = stream.map_ok(Into::into).map_err(Into::into);
- Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
- }
-
fn new(kind: Kind) -> Body {
Body { kind, extra: None }
}
@@ -329,12 +284,6 @@ impl Body {
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),
-
- #[cfg(feature = "stream")]
- Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
- Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
- None => Poll::Ready(None),
- },
}
}
@@ -405,8 +354,6 @@ impl HttpBody for Body {
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => false,
- #[cfg(feature = "stream")]
- Kind::Wrapped(..) => false,
}
}
@@ -426,8 +373,6 @@ impl HttpBody for Body {
match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
- #[cfg(feature = "stream")]
- Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len!(content_length),
@@ -457,33 +402,6 @@ impl fmt::Debug for Body {
}
}
-/// # Optional
-///
-/// This function requires enabling the `stream` feature in your
-/// `Cargo.toml`.
-#[cfg(feature = "stream")]
-impl Stream for Body {
- type Item = crate::Result;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll