File tree 3 files changed +4
-5
lines changed
futures-util/src/stream/futures_unordered 3 files changed +4
-5
lines changed Original file line number Diff line number Diff line change @@ -390,6 +390,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
390
390
// Keep track of how many child futures we have polled,
391
391
// in case we want to forcibly yield.
392
392
let mut polled = 0 ;
393
+ let mut yielded = 0 ;
393
394
394
395
// Ensure `parent` is correctly set.
395
396
self . ready_to_run_queue . waker . register ( cx. waker ( ) ) ;
@@ -519,15 +520,15 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
519
520
let task = bomb. task . take ( ) . unwrap ( ) ;
520
521
// If the future was awoken during polling, we assume
521
522
// the future wanted to explicitly yield.
522
- let yielded = task. woken . load ( Relaxed ) ;
523
+ yielded + = task. woken . load ( Relaxed ) as usize ;
523
524
bomb. queue . link ( task) ;
524
525
525
526
// If a future yields, we respect it and yield here.
526
527
// If all futures have been polled, we also yield here to
527
528
// avoid starving other tasks waiting on the executor.
528
529
// (polling the same future twice per iteration may cause
529
530
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
530
- if yielded || polled == len {
531
+ if yielded >= 2 || polled == len {
531
532
cx. waker ( ) . wake_by_ref ( ) ;
532
533
return Poll :: Pending ;
533
534
}
Original file line number Diff line number Diff line change @@ -32,7 +32,7 @@ pub(super) struct Task<Fut> {
32
32
// Whether or not this task is currently in the ready to run queue
33
33
pub ( super ) queued : AtomicBool ,
34
34
35
- // Whether the future waken before it finishes polling
35
+ // Whether the future was awoken during polling
36
36
// It is possible for this flag to be set to true after the polling,
37
37
// but it will be ignored.
38
38
pub ( super ) woken : AtomicBool ,
Original file line number Diff line number Diff line change @@ -268,8 +268,6 @@ fn futures_not_moved_after_poll() {
268
268
let fut = future:: ready ( ( ) ) . pending_once ( ) . assert_unmoved ( ) ;
269
269
let mut stream = vec ! [ fut; 3 ] . into_iter ( ) . collect :: < FuturesUnordered < _ > > ( ) ;
270
270
assert_stream_pending ! ( stream) ;
271
- assert_stream_pending ! ( stream) ;
272
- assert_stream_pending ! ( stream) ;
273
271
assert_stream_next ! ( stream, ( ) ) ;
274
272
assert_stream_next ! ( stream, ( ) ) ;
275
273
assert_stream_next ! ( stream, ( ) ) ;
You can’t perform that action at this time.
0 commit comments