Skip to content

Commit 3f9584d

Browse files
committed
Fix TODO and implement collect_data op
1 parent 387b255 commit 3f9584d

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

httpbis/src/bin/client_server_loop.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use bytes::Bytes;
22
use futures::stream::StreamExt;
3-
use futures::stream::TryStreamExt;
43
use httpbis::Client;
54
use httpbis::ClientIntf;
65
use httpbis::Headers;
@@ -70,11 +69,8 @@ fn request() {
7069
.expect("headers");
7170
assert_eq!(200, header.status());
7271

73-
// TODO: check content
74-
body.into_stream()
75-
.try_collect::<Vec<_>>()
76-
.await
77-
.expect("body");
72+
let body = body.collect_data().await.expect("body");
73+
assert_eq!(&b"hello there"[..], &body[..]);
7874
})
7975
});
8076
}

httpbis/src/common/stream_after_headers.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ use std::pin::Pin;
77
use std::task::Context;
88
use std::task::Poll;
99

10+
use bytes::BufMut;
1011
use bytes::Bytes;
12+
use bytes::BytesMut;
1113
use futures::future;
1214
use futures::Future;
1315
use futures::Stream;
1416
use tokio::io::AsyncRead;
1517
use tokio::io::ReadBuf;
1618

19+
use crate::solicit_async::TryFutureBox;
1720
use crate::solicit_async::TryStreamBox;
1821
use crate::DataOrTrailers;
1922
use crate::Headers;
@@ -71,7 +74,7 @@ pub trait StreamAfterHeaders: fmt::Debug + Unpin + Send + 'static {
7174
/// Fetch next `DATA`.
7275
fn next_data<'a>(
7376
&'a mut self,
74-
) -> Pin<Box<dyn Future<Output = crate::Result<Option<Bytes>>> + 'a>> {
77+
) -> Pin<Box<dyn Future<Output = crate::Result<Option<Bytes>>> + Send + 'a>> {
7578
Box::pin(future::poll_fn(move |cx| self.poll_data(cx)?.map(Ok)))
7679
}
7780

@@ -105,6 +108,24 @@ pub trait StreamAfterHeaders: fmt::Debug + Unpin + Send + 'static {
105108
{
106109
Box::pin(DataStream(self))
107110
}
111+
112+
fn collect_data(mut self) -> TryFutureBox<Bytes>
113+
where
114+
Self: Sized,
115+
{
116+
Box::pin(async move {
117+
let mut r = BytesMut::new();
118+
loop {
119+
match self.next_data().await? {
120+
None => return Ok(r.freeze()),
121+
Some(b) => {
122+
// TODO: figure out how to efficiently extend from Bytes
123+
r.put_slice(&b)
124+
}
125+
}
126+
}
127+
})
128+
}
108129
}
109130

110131
struct AsStream<S: StreamAfterHeaders>(S);

0 commit comments

Comments
 (0)