Skip to content

Commit e789b61

Browse files
authored
stream: add StreamExt::timeout_repeating (#5577)
1 parent 2cd4f4a commit e789b61

File tree

3 files changed

+171
-3
lines changed

3 files changed

+171
-3
lines changed

tokio-stream/src/stream_ext.rs

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ use try_next::TryNext;
5757

5858
cfg_time! {
5959
pub(crate) mod timeout;
60+
pub(crate) mod timeout_repeating;
6061
use timeout::Timeout;
61-
use tokio::time::Duration;
62+
use timeout_repeating::TimeoutRepeating;
63+
use tokio::time::{Duration, Interval};
6264
mod throttle;
6365
use throttle::{throttle, Throttle};
6466
mod chunks_timeout;
@@ -924,7 +926,9 @@ pub trait StreamExt: Stream {
924926
/// If the wrapped stream yields a value before the deadline is reached, the
925927
/// value is returned. Otherwise, an error is returned. The caller may decide
926928
/// to continue consuming the stream and will eventually get the next source
927-
/// stream value once it becomes available.
929+
/// stream value once it becomes available. See
930+
/// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
931+
/// where the timeouts will repeat.
928932
///
929933
/// # Notes
930934
///
@@ -971,6 +975,25 @@ pub trait StreamExt: Stream {
971975
/// assert_eq!(int_stream.try_next().await, Ok(None));
972976
/// # }
973977
/// ```
978+
///
979+
/// Once a timeout error is received, no further events will be received
980+
/// unless the wrapped stream yields a value (timeouts do not repeat).
981+
///
982+
/// ```
983+
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
984+
/// # async fn main() {
985+
/// use tokio_stream::{StreamExt, wrappers::IntervalStream};
986+
/// use std::time::Duration;
987+
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
988+
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
989+
/// tokio::pin!(timeout_stream);
990+
///
991+
/// // Only one timeout will be received between values in the source stream.
992+
/// assert!(timeout_stream.try_next().await.is_ok());
993+
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
994+
/// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
995+
/// # }
996+
/// ```
974997
#[cfg(all(feature = "time"))]
975998
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
976999
fn timeout(self, duration: Duration) -> Timeout<Self>
@@ -980,6 +1003,95 @@ pub trait StreamExt: Stream {
9801003
Timeout::new(self, duration)
9811004
}
9821005

1006+
/// Applies a per-item timeout to the passed stream.
1007+
///
1008+
/// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
1009+
/// controls the time each element of the stream has to complete before
1010+
/// timing out.
1011+
///
1012+
/// If the wrapped stream yields a value before the deadline is reached, the
1013+
/// value is returned. Otherwise, an error is returned. The caller may decide
1014+
/// to continue consuming the stream and will eventually get the next source
1015+
/// stream value once it becomes available. Unlike `timeout()`, if no value
1016+
/// becomes available before the deadline is reached, additional errors are
1017+
/// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1018+
/// for an alternative where the timeouts do not repeat.
1019+
///
1020+
/// # Notes
1021+
///
1022+
/// This function consumes the stream passed into it and returns a
1023+
/// wrapped version of it.
1024+
///
1025+
/// Polling the returned stream will continue to poll the inner stream even
1026+
/// if one or more items time out.
1027+
///
1028+
/// # Examples
1029+
///
1030+
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1031+
///
1032+
/// ```
1033+
/// # #[tokio::main]
1034+
/// # async fn main() {
1035+
/// use tokio_stream::{self as stream, StreamExt};
1036+
/// use std::time::Duration;
1037+
/// # let int_stream = stream::iter(1..=3);
1038+
///
1039+
/// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1040+
/// tokio::pin!(int_stream);
1041+
///
1042+
/// // When no items time out, we get the 3 elements in succession:
1043+
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1044+
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1045+
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1046+
/// assert_eq!(int_stream.try_next().await, Ok(None));
1047+
///
1048+
/// // If the second item times out, we get an error and continue polling the stream:
1049+
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1050+
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1051+
/// assert!(int_stream.try_next().await.is_err());
1052+
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1053+
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1054+
/// assert_eq!(int_stream.try_next().await, Ok(None));
1055+
///
1056+
/// // If we want to stop consuming the source stream the first time an
1057+
/// // element times out, we can use the `take_while` operator:
1058+
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1059+
/// let mut int_stream = int_stream.take_while(Result::is_ok);
1060+
///
1061+
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1062+
/// assert_eq!(int_stream.try_next().await, Ok(None));
1063+
/// # }
1064+
/// ```
1065+
///
1066+
/// Timeout errors will be continuously produced at the specified interval
1067+
/// until the wrapped stream yields a value.
1068+
///
1069+
/// ```
1070+
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1071+
/// # async fn main() {
1072+
/// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1073+
/// use std::time::Duration;
1074+
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1075+
/// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1076+
/// tokio::pin!(timeout_stream);
1077+
///
1078+
/// // Multiple timeouts will be received between values in the source stream.
1079+
/// assert!(timeout_stream.try_next().await.is_ok());
1080+
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1081+
/// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1082+
/// // Will eventually receive another value from the source stream...
1083+
/// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1084+
/// # }
1085+
/// ```
1086+
#[cfg(all(feature = "time"))]
1087+
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1088+
fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1089+
where
1090+
Self: Sized,
1091+
{
1092+
TimeoutRepeating::new(self, interval)
1093+
}
1094+
9831095
/// Slows down a stream by enforcing a delay between items.
9841096
///
9851097
/// The underlying timer behind this utility has a granularity of one millisecond.

tokio-stream/src/stream_ext/timeout.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pin_project! {
2323
}
2424
}
2525

26-
/// Error returned by `Timeout`.
26+
/// Error returned by `Timeout` and `TimeoutRepeating`.
2727
#[derive(Debug, PartialEq, Eq)]
2828
pub struct Elapsed(());
2929

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use crate::stream_ext::Fuse;
2+
use crate::{Elapsed, Stream};
3+
use tokio::time::Interval;
4+
5+
use core::pin::Pin;
6+
use core::task::{Context, Poll};
7+
use pin_project_lite::pin_project;
8+
9+
pin_project! {
10+
/// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
11+
#[must_use = "streams do nothing unless polled"]
12+
#[derive(Debug)]
13+
pub struct TimeoutRepeating<S> {
14+
#[pin]
15+
stream: Fuse<S>,
16+
#[pin]
17+
interval: Interval,
18+
}
19+
}
20+
21+
impl<S: Stream> TimeoutRepeating<S> {
22+
pub(super) fn new(stream: S, interval: Interval) -> Self {
23+
TimeoutRepeating {
24+
stream: Fuse::new(stream),
25+
interval,
26+
}
27+
}
28+
}
29+
30+
impl<S: Stream> Stream for TimeoutRepeating<S> {
31+
type Item = Result<S::Item, Elapsed>;
32+
33+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34+
let mut me = self.project();
35+
36+
match me.stream.poll_next(cx) {
37+
Poll::Ready(v) => {
38+
if v.is_some() {
39+
me.interval.reset();
40+
}
41+
return Poll::Ready(v.map(Ok));
42+
}
43+
Poll::Pending => {}
44+
};
45+
46+
ready!(me.interval.poll_tick(cx));
47+
Poll::Ready(Some(Err(Elapsed::new())))
48+
}
49+
50+
fn size_hint(&self) -> (usize, Option<usize>) {
51+
let (lower, _) = self.stream.size_hint();
52+
53+
// The timeout stream may insert an error an infinite number of times.
54+
(lower, None)
55+
}
56+
}

0 commit comments

Comments
 (0)