Skip to content

Commit

Permalink
feat(util): add BodyDataStream (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored Jun 10, 2024
1 parent fe8aa7e commit 23212f1
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 6 deletions.
3 changes: 1 addition & 2 deletions http-body-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ rust-version = "1.49"

[dependencies]
bytes = "1"
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "1"
http-body = { version = "1", path = "../http-body" }
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt", "sync", "rt-multi-thread"] }
futures-util = { version = "0.3.14", default-features = false }
2 changes: 1 addition & 1 deletion http-body-util/src/combinators/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<T: Body + ?Sized> Future for Collect<T> {
let mut me = self.project();

loop {
let frame = futures_core::ready!(me.body.as_mut().poll_frame(cx));
let frame = futures_util::ready!(me.body.as_mut().poll_frame(cx));

let frame = if let Some(frame) = frame {
frame?
Expand Down
2 changes: 1 addition & 1 deletion http-body-util/src/combinators/with_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
task::{Context, Poll},
};

use futures_core::ready;
use futures_util::ready;
use http::HeaderMap;
use http_body::{Body, Frame};
use pin_project_lite::pin_project;
Expand Down
10 changes: 9 additions & 1 deletion http-body-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use self::either::Either;
pub use self::empty::Empty;
pub use self::full::Full;
pub use self::limited::{LengthLimitError, Limited};
pub use self::stream::{BodyStream, StreamBody};
pub use self::stream::{BodyDataStream, BodyStream, StreamBody};

/// An extension trait for [`http_body::Body`] adding various combinators and adapters
pub trait BodyExt: http_body::Body {
Expand Down Expand Up @@ -128,6 +128,14 @@ pub trait BodyExt: http_body::Body {
{
combinators::WithTrailers::new(self, trailers)
}

/// Turn this body into [`BodyDataStream`].
fn into_data_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
}
}

impl<T: ?Sized> BodyExt for T where T: http_body::Body {}
38 changes: 37 additions & 1 deletion http-body-util/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Buf;
use futures_core::Stream;
use futures_util::{ready, stream::Stream};
use http_body::{Body, Frame};
use pin_project_lite::pin_project;
use std::{
Expand Down Expand Up @@ -101,6 +101,42 @@ where
}
}

pin_project! {
/// A data stream created from a [`Body`].
#[derive(Clone, Copy, Debug)]
pub struct BodyDataStream<B> {
#[pin]
body: B,
}
}

impl<B> BodyDataStream<B> {
/// Create a new `BodyDataStream`
pub fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Stream for BodyDataStream<B>
where
B: Body,
{
type Item = Result<B::Data, B::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().body.poll_frame(cx)) {
Some(Ok(frame)) => match frame.into_data() {
Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
Err(_) => continue,
},
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
}
}
}

#[cfg(test)]
mod tests {
use crate::{BodyExt, BodyStream, StreamBody};
Expand Down

0 comments on commit 23212f1

Please sign in to comment.