Skip to content

Commit 78c49f9

Browse files
Tyler Neelyyoshuawuyts
authored andcommitted
Add initial Fuse implementation for Stream
Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
1 parent e6880e1 commit 78c49f9

File tree

3 files changed

+89
-1
lines changed

3 files changed

+89
-1
lines changed

src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub use from_stream::FromStream;
2727
pub use into_stream::IntoStream;
2828
pub use once::{once, Once};
2929
pub use repeat::{repeat, Repeat};
30-
pub use stream::{Scan, Stream, Take, Zip};
30+
pub use stream::{Fuse, Scan, Stream, Take, Zip};
3131

3232
mod double_ended_stream;
3333
mod empty;

src/stream/stream/fuse.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::pin::Pin;
2+
3+
use crate::task::{Context, Poll};
4+
5+
/// A `Stream` that is permanently closed
6+
/// once a single call to `poll` results in
7+
/// `Poll::Ready(None)`, returning `Poll::Ready(None)`
8+
/// for all future calls to `poll`.
9+
#[derive(Clone, Debug)]
10+
pub struct Fuse<S> {
11+
stream: S,
12+
done: bool,
13+
}
14+
15+
impl<S: Unpin> Unpin for Fuse<S> {}
16+
17+
impl<S: futures::Stream> Fuse<S> {
18+
pin_utils::unsafe_pinned!(stream: S);
19+
pin_utils::unsafe_unpinned!(done: bool);
20+
21+
/// Returns `true` if the underlying stream is fused.
22+
///
23+
/// If this `Stream` is fused, all future calls to
24+
/// `poll` will return `Poll::Ready(None)`.
25+
pub fn is_done(&self) -> bool {
26+
self.done
27+
}
28+
29+
/// Consumes this `Fuse` and returns the inner
30+
/// `Stream`, unfusing it if it had become
31+
/// fused.
32+
pub fn into_inner(self) -> S
33+
where
34+
S: Sized,
35+
{
36+
self.stream
37+
}
38+
}
39+
40+
impl<S: futures::Stream> futures::Stream for Fuse<S> {
41+
type Item = S::Item;
42+
43+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
44+
if self.done {
45+
Poll::Ready(None)
46+
} else {
47+
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
48+
if next.is_none() {
49+
*self.as_mut().done() = true;
50+
}
51+
Poll::Ready(next)
52+
}
53+
}
54+
}

src/stream/stream/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ mod filter_map;
2828
mod find;
2929
mod find_map;
3030
mod fold;
31+
mod fuse;
3132
mod min_by;
3233
mod next;
3334
mod nth;
3435
mod scan;
3536
mod take;
3637
mod zip;
3738

39+
pub use fuse::Fuse;
3840
pub use scan::Scan;
3941
pub use take::Take;
4042
pub use zip::Zip;
@@ -246,6 +248,38 @@ pub trait Stream {
246248
Enumerate::new(self)
247249
}
248250

251+
/// Transforms this `Stream` into a "fused" `Stream`
252+
/// such that after the first time `poll` returns
253+
/// `Poll::Ready(None)`, all future calls to
254+
/// `poll` will also return `Poll::Ready(None)`.
255+
///
256+
/// # Examples
257+
///
258+
/// ```
259+
/// # #![feature(async_await)]
260+
/// # fn main() { async_std::task::block_on(async {
261+
/// #
262+
/// use async_std::prelude::*;
263+
/// use async_std::stream;
264+
///
265+
/// let mut s = stream::repeat(9).take(3);
266+
///
267+
/// while let Some(v) = s.next().await {
268+
/// assert_eq!(v, 9);
269+
/// }
270+
/// #
271+
/// # }) }
272+
/// ```
273+
fn fuse(self) -> Fuse<Self>
274+
where
275+
Self: Sized,
276+
{
277+
Fuse {
278+
stream: self,
279+
done: false,
280+
}
281+
}
282+
249283
/// Both filters and maps a stream.
250284
///
251285
/// # Examples

0 commit comments

Comments
 (0)