6
6
use crate :: task:: AtomicWaker ;
7
7
use alloc:: sync:: { Arc , Weak } ;
8
8
use core:: cell:: UnsafeCell ;
9
- use core:: cmp;
10
9
use core:: fmt:: { self , Debug } ;
11
10
use core:: iter:: FromIterator ;
12
11
use core:: marker:: PhantomData ;
@@ -31,33 +30,6 @@ use self::task::Task;
31
30
mod ready_to_run_queue;
32
31
use self :: ready_to_run_queue:: { Dequeue , ReadyToRunQueue } ;
33
32
34
- /// Constant used for a `FuturesUnordered` to determine how many times it is
35
- /// allowed to poll underlying futures without yielding.
36
- ///
37
- /// A single call to `poll_next` may potentially do a lot of work before
38
- /// yielding. This happens in particular if the underlying futures are awoken
39
- /// frequently but continue to return `Pending`. This is problematic if other
40
- /// tasks are waiting on the executor, since they do not get to run. This value
41
- /// caps the number of calls to `poll` on underlying futures a single call to
42
- /// `poll_next` is allowed to make.
43
- ///
44
- /// The value itself is chosen somewhat arbitrarily. It needs to be high enough
45
- /// that amortize wakeup and scheduling costs, but low enough that we do not
46
- /// starve other tasks for long.
47
- ///
48
- /// See also https://github.com/rust-lang/futures-rs/issues/2047.
49
- ///
50
- /// Note that using the length of the `FuturesUnordered` instead of this value
51
- /// may cause problems if the number of futures is large.
52
- /// See also https://github.com/rust-lang/futures-rs/pull/2527.
53
- ///
54
- /// Additionally, polling the same future twice per iteration may cause another
55
- /// problem. So, when using this value, it is necessary to limit the max value
56
- /// based on the length of the `FuturesUnordered`.
57
- /// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
58
- /// See also https://github.com/rust-lang/futures-rs/pull/2333.
59
- const YIELD_EVERY : usize = 32 ;
60
-
61
33
/// A set of futures which may complete in any order.
62
34
///
63
35
/// This structure is optimized to manage a large number of futures.
@@ -149,6 +121,7 @@ impl<Fut> FuturesUnordered<Fut> {
149
121
next_ready_to_run : AtomicPtr :: new ( ptr:: null_mut ( ) ) ,
150
122
queued : AtomicBool :: new ( true ) ,
151
123
ready_to_run_queue : Weak :: new ( ) ,
124
+ woken : AtomicBool :: new ( false ) ,
152
125
} ) ;
153
126
let stub_ptr = Arc :: as_ptr ( & stub) ;
154
127
let ready_to_run_queue = Arc :: new ( ReadyToRunQueue {
@@ -195,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> {
195
168
next_ready_to_run : AtomicPtr :: new ( ptr:: null_mut ( ) ) ,
196
169
queued : AtomicBool :: new ( true ) ,
197
170
ready_to_run_queue : Arc :: downgrade ( & self . ready_to_run_queue ) ,
171
+ woken : AtomicBool :: new ( false ) ,
198
172
} ) ;
199
173
200
174
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -411,12 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
411
385
type Item = Fut :: Output ;
412
386
413
387
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
414
- // See YIELD_EVERY docs for more.
415
- let yield_every = cmp:: min ( self . len ( ) , YIELD_EVERY ) ;
388
+ let len = self . len ( ) ;
416
389
417
390
// Keep track of how many child futures we have polled,
418
391
// in case we want to forcibly yield.
419
392
let mut polled = 0 ;
393
+ let mut yielded = 0 ;
420
394
421
395
// Ensure `parent` is correctly set.
422
396
self . ready_to_run_queue . waker . register ( cx. waker ( ) ) ;
@@ -527,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
527
501
// the internal allocation, appropriately accessing fields and
528
502
// deallocating the task if need be.
529
503
let res = {
530
- let waker = Task :: waker_ref ( bomb. task . as_ref ( ) . unwrap ( ) ) ;
504
+ let task = bomb. task . as_ref ( ) . unwrap ( ) ;
505
+ // We are only interested in whether the future is awoken before it
506
+ // finishes polling, so reset the flag here.
507
+ task. woken . store ( false , Relaxed ) ;
508
+ let waker = Task :: waker_ref ( task) ;
531
509
let mut cx = Context :: from_waker ( & waker) ;
532
510
533
511
// Safety: We won't move the future ever again
@@ -540,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
540
518
match res {
541
519
Poll :: Pending => {
542
520
let task = bomb. task . take ( ) . unwrap ( ) ;
521
+ // If the future was awoken during polling, we assume
522
+ // the future wanted to explicitly yield.
523
+ yielded += task. woken . load ( Relaxed ) as usize ;
543
524
bomb. queue . link ( task) ;
544
525
545
- if polled == yield_every {
546
- // We have polled a large number of futures in a row without yielding.
547
- // To ensure we do not starve other tasks waiting on the executor,
548
- // we yield here, but immediately wake ourselves up to continue.
526
+ // If a future yields, we respect it and yield here.
527
+ // If all futures have been polled, we also yield here to
528
+ // avoid starving other tasks waiting on the executor.
529
+ // (polling the same future twice per iteration may cause
530
+ // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
531
+ if yielded >= 2 || polled == len {
549
532
cx. waker ( ) . wake_by_ref ( ) ;
550
533
return Poll :: Pending ;
551
534
}
0 commit comments