-
Notifications
You must be signed in to change notification settings - Fork 13.4k
Add core::stream::Stream
#79023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add core::stream::Stream
#79023
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
//! Composable asynchronous iteration. | ||
//! | ||
//! If futures are asynchronous values, then streams are asynchronous | ||
//! iterators. If you've found yourself with an asynchronous collection of some kind, | ||
//! and needed to perform an operation on the elements of said collection, | ||
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic | ||
//! asynchronous Rust code, so it's worth becoming familiar with them. | ||
//! | ||
//! Before explaining more, let's talk about how this module is structured: | ||
//! | ||
//! # Organization | ||
//! | ||
//! This module is largely organized by type: | ||
//! | ||
//! * [Traits] are the core portion: these traits define what kind of streams | ||
//! exist and what you can do with them. The methods of these traits are worth | ||
//! putting some extra study time into. | ||
//! * Functions provide some helpful ways to create some basic streams. | ||
//! * Structs are often the return types of the various methods on this | ||
//! module's traits. You'll usually want to look at the method that creates | ||
//! the `struct`, rather than the `struct` itself. For more detail about why, | ||
//! see '[Implementing Stream](#implementing-stream)'. | ||
//! | ||
//! [Traits]: #traits | ||
//! | ||
//! That's it! Let's dig into streams. | ||
//! | ||
//! # Stream | ||
//! | ||
//! The heart and soul of this module is the [`Stream`] trait. The core of | ||
//! [`Stream`] looks like this: | ||
//! | ||
//! ``` | ||
//! # use core::task::{Context, Poll}; | ||
//! # use core::pin::Pin; | ||
//! trait Stream { | ||
//! type Item; | ||
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; | ||
//! } | ||
//! ``` | ||
//! | ||
//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] | ||
//! method which is used when implementing a `Stream`, and a (to-be-implemented) | ||
//! `next` method which is used when consuming a stream. Consumers of `Stream` | ||
//! only need to consider `next`, which when called, returns a future which | ||
//! yields `Option<Stream::Item>`. | ||
//! | ||
//! The future returned by `next` will yield `Some(Item)` as long as there are | ||
//! elements, and once they've all been exhausted, will yield `None` to indicate | ||
//! that iteration is finished. If we're waiting on something asynchronous to | ||
//! resolve, the future will wait until the stream is ready to yield again. | ||
//! | ||
//! Individual streams may choose to resume iteration, and so calling `next` | ||
//! again may or may not eventually yield `Some(Item)` again at some point. | ||
//! | ||
//! [`Stream`]'s full definition includes a number of other methods as well, | ||
//! but they are default methods, built on top of [`poll_next`], and so you get | ||
//! them for free. | ||
//! | ||
//! [`Poll`]: super::task::Poll | ||
//! [`poll_next`]: Stream::poll_next | ||
//! | ||
//! # Implementing Stream | ||
//! | ||
//! Creating a stream of your own involves two steps: creating a `struct` to | ||
//! hold the stream's state, and then implementing [`Stream`] for that | ||
//! `struct`. | ||
//! | ||
//! Let's make a stream named `Counter` which counts from `1` to `5`: | ||
//! | ||
//! ```no_run | ||
//! #![feature(async_stream)] | ||
//! # use core::stream::Stream; | ||
//! # use core::task::{Context, Poll}; | ||
//! # use core::pin::Pin; | ||
//! | ||
//! // First, the struct: | ||
//! | ||
//! /// A stream which counts from one to five | ||
//! struct Counter { | ||
//! count: usize, | ||
//! } | ||
//! | ||
//! // we want our count to start at one, so let's add a new() method to help. | ||
//! // This isn't strictly necessary, but is convenient. Note that we start | ||
//! // `count` at zero, we'll see why in `poll_next()`'s implementation below. | ||
//! impl Counter { | ||
//! fn new() -> Counter { | ||
//! Counter { count: 0 } | ||
//! } | ||
//! } | ||
//! | ||
//! // Then, we implement `Stream` for our `Counter`: | ||
//! | ||
//! impl Stream for Counter { | ||
//! // we will be counting with usize | ||
//! type Item = usize; | ||
//! | ||
//! // poll_next() is the only required method | ||
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
//! // Increment our count. This is why we started at zero. | ||
//! self.count += 1; | ||
//! | ||
//! // Check to see if we've finished counting or not. | ||
//! if self.count < 6 { | ||
//! Poll::Ready(Some(self.count)) | ||
//! } else { | ||
//! Poll::Ready(None) | ||
//! } | ||
//! } | ||
//! } | ||
//! ``` | ||
//! | ||
//! # Laziness | ||
//! | ||
//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a | ||
//! whole lot. Nothing really happens until you call `next`. This is sometimes a | ||
//! source of confusion when creating a stream solely for its side effects. The | ||
//! compiler will warn us about this kind of behavior: | ||
//! | ||
//! ```text | ||
//! warning: unused result that must be used: streams do nothing unless polled | ||
//! ``` | ||
|
||
mod stream; | ||
|
||
pub use stream::Stream; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
use crate::ops::DerefMut; | ||
use crate::pin::Pin; | ||
use crate::task::{Context, Poll}; | ||
|
||
/// An interface for dealing with asynchronous iterators. | ||
/// | ||
/// This is the main stream trait. For more about the concept of streams | ||
/// generally, please see the [module-level documentation]. In particular, you | ||
/// may want to know how to [implement `Stream`][impl]. | ||
/// | ||
/// [module-level documentation]: index.html | ||
/// [impl]: index.html#implementing-stream | ||
#[unstable(feature = "async_stream", issue = "79024")] | ||
#[must_use = "streams do nothing unless polled"] | ||
pub trait Stream { | ||
yoshuawuyts marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// The type of items yielded by the stream. | ||
type Item; | ||
|
||
/// Attempt to pull out the next value of this stream, registering the | ||
/// current task for wakeup if the value is not yet available, and returning | ||
/// `None` if the stream is exhausted. | ||
/// | ||
/// # Return value | ||
/// | ||
/// There are several possible return values, each indicating a distinct | ||
/// stream state: | ||
/// | ||
/// - `Poll::Pending` means that this stream's next value is not ready | ||
/// yet. Implementations will ensure that the current task will be notified | ||
/// when the next value may be ready. | ||
/// | ||
/// - `Poll::Ready(Some(val))` means that the stream has successfully | ||
/// produced a value, `val`, and may produce further values on subsequent | ||
/// `poll_next` calls. | ||
/// | ||
/// - `Poll::Ready(None)` means that the stream has terminated, and | ||
/// `poll_next` should not be invoked again. | ||
yoshuawuyts marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// | ||
/// # Panics | ||
/// | ||
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its | ||
/// `poll_next` method again may panic, block forever, or cause other kinds of | ||
/// problems; the `Stream` trait places no requirements on the effects of | ||
/// such a call. However, as the `poll_next` method is not marked `unsafe`, | ||
/// Rust's usual rules apply: calls must never cause undefined behavior | ||
/// (memory corruption, incorrect use of `unsafe` functions, or the like), | ||
/// regardless of the stream's state. | ||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; | ||
|
||
/// Returns the bounds on the remaining length of the stream. | ||
/// | ||
/// Specifically, `size_hint()` returns a tuple where the first element | ||
/// is the lower bound, and the second element is the upper bound. | ||
/// | ||
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. | ||
/// A [`None`] here means that either there is no known upper bound, or the | ||
/// upper bound is larger than [`usize`]. | ||
/// | ||
/// # Implementation notes | ||
/// | ||
/// It is not enforced that a stream implementation yields the declared | ||
/// number of elements. A buggy stream may yield less than the lower bound | ||
/// or more than the upper bound of elements. | ||
/// | ||
/// `size_hint()` is primarily intended to be used for optimizations such as | ||
/// reserving space for the elements of the stream, but must not be | ||
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect | ||
/// implementation of `size_hint()` should not lead to memory safety | ||
/// violations. | ||
/// | ||
/// That said, the implementation should provide a correct estimation, | ||
/// because otherwise it would be a violation of the trait's protocol. | ||
/// | ||
/// The default implementation returns `(0, `[`None`]`)` which is correct for any | ||
/// stream. | ||
#[inline] | ||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
(0, None) | ||
} | ||
} | ||
yoshuawuyts marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#[unstable(feature = "async_stream", issue = "79024")] | ||
impl<S: ?Sized + Stream + Unpin> Stream for &mut S { | ||
type Item = S::Item; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
S::poll_next(Pin::new(&mut **self), cx) | ||
} | ||
|
||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
(**self).size_hint() | ||
} | ||
} | ||
|
||
#[unstable(feature = "async_stream", issue = "79024")] | ||
impl<P> Stream for Pin<P> | ||
where | ||
P: DerefMut + Unpin, | ||
P::Target: Stream, | ||
yoshuawuyts marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
type Item = <P::Target as Stream>::Item; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.get_mut().as_mut().poll_next(cx) | ||
} | ||
|
||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
(**self).size_hint() | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.