Skip to content

Commit 8bc9f1e

Browse files
taiki-ejonhoo
andauthored
FuturesUnordered: Respect yielding from future (#2551)
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
1 parent 6464371 commit 8bc9f1e

File tree

3 files changed

+27
-37
lines changed

3 files changed

+27
-37
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
use crate::task::AtomicWaker;
77
use alloc::sync::{Arc, Weak};
88
use core::cell::UnsafeCell;
9-
use core::cmp;
109
use core::fmt::{self, Debug};
1110
use core::iter::FromIterator;
1211
use core::marker::PhantomData;
@@ -31,33 +30,6 @@ use self::task::Task;
3130
mod ready_to_run_queue;
3231
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
3332

34-
/// Constant used for a `FuturesUnordered` to determine how many times it is
35-
/// allowed to poll underlying futures without yielding.
36-
///
37-
/// A single call to `poll_next` may potentially do a lot of work before
38-
/// yielding. This happens in particular if the underlying futures are awoken
39-
/// frequently but continue to return `Pending`. This is problematic if other
40-
/// tasks are waiting on the executor, since they do not get to run. This value
41-
/// caps the number of calls to `poll` on underlying futures a single call to
42-
/// `poll_next` is allowed to make.
43-
///
44-
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
45-
/// that amortize wakeup and scheduling costs, but low enough that we do not
46-
/// starve other tasks for long.
47-
///
48-
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
49-
///
50-
/// Note that using the length of the `FuturesUnordered` instead of this value
51-
/// may cause problems if the number of futures is large.
52-
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
53-
///
54-
/// Additionally, polling the same future twice per iteration may cause another
55-
/// problem. So, when using this value, it is necessary to limit the max value
56-
/// based on the length of the `FuturesUnordered`.
57-
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
58-
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
59-
const YIELD_EVERY: usize = 32;
60-
6133
/// A set of futures which may complete in any order.
6234
///
6335
/// This structure is optimized to manage a large number of futures.
@@ -149,6 +121,7 @@ impl<Fut> FuturesUnordered<Fut> {
149121
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
150122
queued: AtomicBool::new(true),
151123
ready_to_run_queue: Weak::new(),
124+
woken: AtomicBool::new(false),
152125
});
153126
let stub_ptr = Arc::as_ptr(&stub);
154127
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
@@ -195,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> {
195168
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
196169
queued: AtomicBool::new(true),
197170
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
171+
woken: AtomicBool::new(false),
198172
});
199173

200174
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -411,12 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
411385
type Item = Fut::Output;
412386

413387
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
414-
// See YIELD_EVERY docs for more.
415-
let yield_every = cmp::min(self.len(), YIELD_EVERY);
388+
let len = self.len();
416389

417390
// Keep track of how many child futures we have polled,
418391
// in case we want to forcibly yield.
419392
let mut polled = 0;
393+
let mut yielded = 0;
420394

421395
// Ensure `parent` is correctly set.
422396
self.ready_to_run_queue.waker.register(cx.waker());
@@ -527,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
527501
// the internal allocation, appropriately accessing fields and
528502
// deallocating the task if need be.
529503
let res = {
530-
let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
504+
let task = bomb.task.as_ref().unwrap();
505+
// We are only interested in whether the future is awoken before it
506+
// finishes polling, so reset the flag here.
507+
task.woken.store(false, Relaxed);
508+
let waker = Task::waker_ref(task);
531509
let mut cx = Context::from_waker(&waker);
532510

533511
// Safety: We won't move the future ever again
@@ -540,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
540518
match res {
541519
Poll::Pending => {
542520
let task = bomb.task.take().unwrap();
521+
// If the future was awoken during polling, we assume
522+
// the future wanted to explicitly yield.
523+
yielded += task.woken.load(Relaxed) as usize;
543524
bomb.queue.link(task);
544525

545-
if polled == yield_every {
546-
// We have polled a large number of futures in a row without yielding.
547-
// To ensure we do not starve other tasks waiting on the executor,
548-
// we yield here, but immediately wake ourselves up to continue.
526+
// If a future yields, we respect it and yield here.
527+
// If all futures have been polled, we also yield here to
528+
// avoid starving other tasks waiting on the executor.
529+
// (polling the same future twice per iteration may cause
530+
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
531+
if yielded >= 2 || polled == len {
549532
cx.waker().wake_by_ref();
550533
return Poll::Pending;
551534
}

futures-util/src/stream/futures_unordered/task.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use alloc::sync::{Arc, Weak};
22
use core::cell::UnsafeCell;
3-
use core::sync::atomic::Ordering::{self, SeqCst};
3+
use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
44
use core::sync::atomic::{AtomicBool, AtomicPtr};
55

66
use super::abort::abort;
@@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {
3131

3232
// Whether or not this task is currently in the ready to run queue
3333
pub(super) queued: AtomicBool,
34+
35+
// Whether the future was awoken during polling
36+
// It is possible for this flag to be set to true after the polling,
37+
// but it will be ignored.
38+
pub(super) woken: AtomicBool,
3439
}
3540

3641
// `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
4853
None => return,
4954
};
5055

56+
arc_self.woken.store(true, Relaxed);
57+
5158
// It's our job to enqueue this task it into the ready to run queue. To
5259
// do this we set the `queued` flag, and if successful we then do the
5360
// actual queueing operation, ensuring that we're only queued once.

futures/tests/stream_futures_unordered.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ fn polled_only_once_at_most_per_iteration() {
342342

343343
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
344344
assert!(tasks.poll_next_unpin(cx).is_pending());
345-
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
345+
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
346346

347347
let mut tasks = FuturesUnordered::<F>::new();
348348
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));

0 commit comments

Comments
 (0)