Skip to content

Commit

Permalink
sync: free chan Blocks when Chan is dropped (tokio-rs#978)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar authored and carllerche committed Mar 13, 2019
1 parent a1871b1 commit 27148d6
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
3 changes: 3 additions & 0 deletions tokio-sync/src/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,13 @@ impl<T, S> Drop for Chan<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;

// Safety: the only owner of the rx fields is Chan, and eing
// inside its own Drop means we're the last ones to touch it.
self.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };

while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
unsafe { rx_fields.list.free_blocks() };
});
}
}
Expand Down
37 changes: 31 additions & 6 deletions tokio-sync/src/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<T> Tx<T> {
/// Push a value into the list.
pub(crate) fn push(&self, value: T) {
// First, claim a slot for the value. `Acquire` is used here to
// synchronize with the `fetch_add` in `free_blocks`.
// synchronize with the `fetch_add` in `reclaim_blocks`.
let slot_index = self.tail_position.fetch_add(1, Acquire);

// Load the current block and write the value
Expand Down Expand Up @@ -170,6 +170,7 @@ impl<T> Tx<T> {
}

pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
debug!("+ reclaim_block({:p})", block);
// The block has been removed from the linked list and ownership
// is reclaimed.
//
Expand Down Expand Up @@ -206,6 +207,7 @@ impl<T> Tx<T> {
}

if !reused {
debug!(" + block freed {:p}", block);
let _ = Box::from_raw(block.as_ptr());
}
}
Expand All @@ -231,7 +233,7 @@ impl<T> Rx<T> {
return None;
}

self.free_blocks(tx);
self.reclaim_blocks(tx);

unsafe {
let block = self.head.as_ref();
Expand Down Expand Up @@ -276,8 +278,8 @@ impl<T> Rx<T> {
}
}

fn free_blocks(&mut self, tx: &Tx<T>) {
debug!("+ free_blocks()");
fn reclaim_blocks(&mut self, tx: &Tx<T>) {
debug!("+ reclaim_blocks()");

while self.free_head != self.head {
unsafe {
Expand All @@ -297,8 +299,8 @@ impl<T> Rx<T> {
}

// We may read the next pointer with `Relaxed` ordering as it is
// guaranteed that the `free_blocks` routine trails the `recv`
// routine. Any memory accessed by `free_blocks` has already
// guaranteed that the `reclaim_blocks` routine trails the `recv`
// routine. Any memory accessed by `reclaim_blocks` has already
// been acquired by `recv`.
let next_block = block.as_ref().load_next(Relaxed);

Expand All @@ -313,6 +315,29 @@ impl<T> Rx<T> {
loom::yield_now();
}
}

/// Effectively `Drop` all the blocks. Should only be called once, when
/// the list is dropping.
pub(super) unsafe fn free_blocks(&mut self) {
debug!("+ free_blocks()");
debug_assert_ne!(self.free_head, NonNull::dangling());

let mut cur = Some(self.free_head);

#[cfg(debug_assertions)]
{
// to trigger the debug assert above so as to catch that we
// don't call `free_blocks` more than once.
self.free_head = NonNull::dangling();
self.head = NonNull::dangling();
}

while let Some(block) = cur {
cur = block.as_ref().load_next(Relaxed);
debug!(" + free: block = {:p}", block);
drop(Box::from_raw(block.as_ptr()));
}
}
}

impl<T> fmt::Debug for Rx<T> {
Expand Down
2 changes: 1 addition & 1 deletion tokio-sync/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ fn dropping_rx_closes_channel_for_try() {
}

#[test]
fn unconsumed_messagers_are_dropped() {
fn unconsumed_messages_are_dropped() {
let msg = Arc::new(());

let (mut tx, rx) = mpsc::channel(100);
Expand Down

0 comments on commit 27148d6

Please sign in to comment.