Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 114 additions & 2 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ use try_next::TryNext;

cfg_time! {
pub(crate) mod timeout;
pub(crate) mod timeout_repeating;
use timeout::Timeout;
use tokio::time::Duration;
use timeout_repeating::TimeoutRepeating;
use tokio::time::{Duration, Interval};
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
Expand Down Expand Up @@ -924,7 +926,9 @@ pub trait StreamExt: Stream {
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available.
/// stream value once it becomes available. See
/// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
/// where the timeouts will repeat.
///
/// # Notes
///
Expand Down Expand Up @@ -971,6 +975,25 @@ pub trait StreamExt: Stream {
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
///
/// Once a timeout error is received, no further events will be received
/// unless the wrapped stream yields a value (timeouts do not repeat).
///
/// ```
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
/// tokio::pin!(timeout_stream);
///
/// // Only one timeout will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
/// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
/// # }
/// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
Expand All @@ -980,6 +1003,95 @@ pub trait StreamExt: Stream {
Timeout::new(self, duration)
}

/// Applies a per-item timeout to the passed stream.
///
/// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
/// controls the time each element of the stream has to complete before
/// timing out.
///
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available. Unlike `timeout()`, if no value
/// becomes available before the deadline is reached, additional errors are
/// returned at the specified interval. See [`timeout`](StreamExt::timeout)
/// for an alternative where the timeouts do not repeat.
///
/// # Notes
///
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
///
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
///
/// # Examples
///
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
/// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
/// tokio::pin!(int_stream);
///
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
///
/// Timeout errors will be continuously produced at the specified interval
/// until the wrapped stream yields a value.
///
/// ```
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
/// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
/// tokio::pin!(timeout_stream);
///
/// // Multiple timeouts will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
/// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
/// // Will eventually receive another value from the source stream...
/// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
/// # }
/// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
where
Self: Sized,
{
TimeoutRepeating::new(self, interval)
}

/// Slows down a stream by enforcing a delay between items.
///
/// The underlying timer behind this utility has a granularity of one millisecond.
Expand Down
2 changes: 1 addition & 1 deletion tokio-stream/src/stream_ext/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pin_project! {
}
}

/// Error returned by `Timeout`.
/// Error returned by `Timeout` and `TimeoutRepeating`.
#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());

Expand Down
56 changes: 56 additions & 0 deletions tokio-stream/src/stream_ext/timeout_repeating.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::stream_ext::Fuse;
use crate::{Elapsed, Stream};
use tokio::time::Interval;

use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct TimeoutRepeating<S> {
#[pin]
stream: Fuse<S>,
#[pin]
interval: Interval,
}
}

impl<S: Stream> TimeoutRepeating<S> {
pub(super) fn new(stream: S, interval: Interval) -> Self {
TimeoutRepeating {
stream: Fuse::new(stream),
interval,
}
}
}

impl<S: Stream> Stream for TimeoutRepeating<S> {
type Item = Result<S::Item, Elapsed>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();

match me.stream.poll_next(cx) {
Poll::Ready(v) => {
if v.is_some() {
me.interval.reset();
}
return Poll::Ready(v.map(Ok));
}
Poll::Pending => {}
};

ready!(me.interval.poll_tick(cx));
Poll::Ready(Some(Err(Elapsed::new())))
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, _) = self.stream.size_hint();

// The timeout stream may insert an error an infinite number of times.
(lower, None)
}
}