Skip to content

Commit c74e9a0

Browse files
committed
FlattenUnordered: improve wakers behavior
1 parent 10a5dd7 commit c74e9a0

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,17 @@ impl SharedPollState {
9292
let value = self
9393
.state
9494
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
95-
let next_value = value | to_poll;
96-
95+
// Waking process for this waker already started
96+
if value & waking != NONE {
97+
return None;
98+
}
99+
let mut next_value = value | to_poll;
100+
// Only start the waking process if we're not in the polling phase and the stream isn't woken already
97101
if value & (WOKEN | POLLING) == NONE {
98-
Some(next_value | waking)
99-
} else if next_value != value {
100-
Some(next_value)
101-
} else {
102-
None
102+
next_value |= waking;
103103
}
104+
105+
(next_value != value).then(|| next_value)
104106
})
105107
.ok()?;
106108

@@ -141,15 +143,13 @@ impl SharedPollState {
141143
fn stop_waking(&self, waking: u8) -> u8 {
142144
self.state
143145
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
144-
let next_value = value & !waking;
145-
146+
let mut next_value = value & !waking;
147+
// Waker will be called only if the current waking state is the same as the specified waker state
146148
if value & WAKING_ALL == waking {
147-
Some(next_value | WOKEN)
148-
} else if next_value != value {
149-
Some(next_value)
150-
} else {
151-
None
149+
next_value |= WOKEN;
152150
}
151+
152+
(next_value != value).then(|| next_value)
153153
})
154154
.unwrap_or_else(identity)
155155
}

0 commit comments

Comments
 (0)