Skip to content

Add utility type WakerSet to the sync module #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
9 commits merged into from Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Split unregister into complete and cancel
  • Loading branch information
Stjepan Glavina committed Oct 25, 2019
commit 7462e0bd841f68f2b1ef13afb865b5f43be0e597
15 changes: 5 additions & 10 deletions src/sync/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ impl<T> Sender<T> {
if poll.is_ready() {
// If the current task was registered, unregister now.
if let Some(key) = self.opt_key.take() {
// `true` means the send operation is completed.
self.sender.channel.sends.unregister(key, true);
self.sender.channel.sends.complete(key);
}
}

Expand All @@ -182,8 +181,7 @@ impl<T> Sender<T> {
fn drop(&mut self) {
// If the current task was registered, unregister now.
if let Some(key) = self.opt_key {
// `false` means the send operation is cancelled.
self.sender.channel.sends.unregister(key, false);
self.sender.channel.sends.cancel(key);
}
}
}
Expand Down Expand Up @@ -390,8 +388,7 @@ impl<T> Receiver<T> {
fn drop(&mut self) {
// If the current task was registered, unregister now.
if let Some(key) = self.opt_key {
// `false` means the receive operation is cancelled.
self.channel.recvs.unregister(key, false);
self.channel.recvs.cancel(key);
}
}
}
Expand Down Expand Up @@ -486,8 +483,7 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// If the current task was registered as blocked on this stream, unregister now.
if let Some(key) = self.opt_key {
// `false` means the last request for a stream item is cancelled.
self.channel.streams.unregister(key, false);
self.channel.streams.cancel(key);
}

// Decrement the receiver count and disconnect the channel if it drops down to zero.
Expand Down Expand Up @@ -561,8 +557,7 @@ fn poll_recv<T>(
if poll.is_ready() {
// If the current task was registered, unregister now.
if let Some(key) = opt_key.take() {
// `true` means the receive operation is completed.
registry.unregister(key, true);
registry.complete(key);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ impl<T> Mutex<T> {
if poll.is_ready() {
// If the current task was registered, unregister now.
if let Some(key) = self.opt_key.take() {
// `true` means the operation is completed.
self.mutex.registry.unregister(key, true);
self.mutex.registry.complete(key);
}
}

Expand All @@ -138,8 +137,7 @@ impl<T> Mutex<T> {
fn drop(&mut self) {
// If the current task was registered, unregister now.
if let Some(key) = self.opt_key {
// `false` means the operation is canceled.
self.mutex.registry.unregister(key, false);
self.mutex.registry.cancel(key);
}
}
}
Expand Down
28 changes: 14 additions & 14 deletions src/sync/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,25 @@ impl Registry {
}
}

/// Unregisters an operation.
///
/// If `completed` is `true`, the operation will be removed from the registry. If `completed`
/// is `false`, that means the operation was canceled so another one will be notified instead.
pub fn unregister(&self, key: usize, completed: bool) {
/// Unregisters a completed operation.
pub fn complete(&self, key: usize) {
let mut blocked = self.lock();
if blocked.entries.remove(key).is_none() {
blocked.none_count -= 1;
}
}

// Remove the operation and check if it has been notified.
/// Unregisters a cancelled operation.
pub fn cancel(&self, key: usize) {
let mut blocked = self.lock();
if blocked.entries.remove(key).is_none() {
blocked.none_count -= 1;

// If the operation was notified but also canceled...
if !completed {
// Notify another operation.
if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
blocked.none_count += 1;
}
// The operation was cancelled and notified so notify another operation instead.
if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
blocked.none_count += 1;
}
}
}
Expand Down