From e364d151fd5ae41be6e4c97825d66cd1ccdd8ea9 Mon Sep 17 00:00:00 2001 From: Martin Hafskjold Thoresen Date: Mon, 20 Nov 2017 15:25:18 +0100 Subject: [PATCH] Remove `hp::queue::{push,pop}_node` functions. They were buggy, and not really needed. Also make a new function that takes a closure which is passed the HP before returning. This makes it possible to either push it to the hp queue or spin, such that we don't need to chunk up the HPs in blocks, as we do with EBR. Chunking would be possible, but tiresome, as we would need to mark all HPs in a block or something, so that we don't `scan` all of them every check for when the block is safe to free. `hp::queue::test::stress_test` runs fine. --- src/hp/atomic.rs | 15 ++++- src/hp/mod.rs | 28 +++++---- src/hp/queue.rs | 158 ++++++++++------------------------------------- 3 files changed, 61 insertions(+), 140 deletions(-) diff --git a/src/hp/atomic.rs b/src/hp/atomic.rs index 2cfaef8..be71436 100644 --- a/src/hp/atomic.rs +++ b/src/hp/atomic.rs @@ -992,14 +992,19 @@ impl HazardPtr { /// Check if the pointer is marked as hazardous by any other thread. This should only be called /// after deregistering, or else we will report itself. pub fn scan(&self) -> bool { + HazardPtr::<()>::scan_addr(self.data) + } + + pub fn scan_addr(addr: usize) -> bool { for e in ::hp::ENTRIES.iter() { for p in e.hazard_pointers.iter() { - if self.data == p.load(Ordering::SeqCst) { + if addr == p.load(Ordering::SeqCst) { return true; } } } false + } // TODO: name @@ -1016,6 +1021,7 @@ impl HazardPtr { #[cfg(not(feature = "hp-wait"))] pub fn wait(&self) {} + /// Block until no other thread has this HP registered. Do not drop the pointer. pub fn spin(&self) { assert!(self.deregister().is_ok()); while self.scan() { @@ -1039,6 +1045,13 @@ impl HazardPtr { } } + pub fn from_data(data: usize) -> Self { + Self { + data, + _marker: PhantomData, + } + } + pub fn from_ptr(ptr: Ptr) -> Self { Self::from_raw(ptr.as_raw() as usize) } diff --git a/src/hp/mod.rs b/src/hp/mod.rs index 714cef3..0c772c1 100644 --- a/src/hp/mod.rs +++ b/src/hp/mod.rs @@ -67,7 +67,7 @@ lazy_static! { } -struct Garbage(Box); +struct Garbage(Box, usize); unsafe impl Send for Garbage {} unsafe impl Sync for Garbage {} @@ -79,7 +79,11 @@ impl Garbage { T: 'static, { let d = t.data; - Garbage(Box::new(move || { ::std::mem::forget(t); })) + Garbage(Box::new(move || { ::std::mem::forget(t); }), d) + } + + fn address(&self) -> usize { + self.1 } } @@ -104,18 +108,18 @@ where fn free_from_queue() { const N: usize = 3; for _ in 0..N { - if let Some(mut owned_ptr) = HAZARD_QUEUE.pop_node() { - let hp: HazardPtr = HazardPtr::fake(owned_ptr.data); - if hp.scan() { + if let Some(garbage) = HAZARD_QUEUE.pop_hp_fn(|h| { + h.spin(); + unsafe { + h.into_owned(); + } + }) + { + if HazardPtr::<()>::scan_addr(garbage.address()) { // used - HAZARD_QUEUE.push_node(owned_ptr); + HAZARD_QUEUE.push(garbage); } else { - forget(hp); - { - let mut node: &mut queue::Node = &mut *owned_ptr; - unsafe { ManuallyDrop::drop(&mut node.data) }; - } - drop(owned_ptr); + drop(garbage); } } else { return; diff --git a/src/hp/queue.rs b/src/hp/queue.rs index a2f34e1..531cf5e 100644 --- a/src/hp/queue.rs +++ b/src/hp/queue.rs @@ -61,11 +61,10 @@ where let new_node = node.into_ptr(); loop { // TODO: what's up with orderings here? - let tail: Ptr> = self.tail.load(Acquire); + let tail: Ptr> = self.tail.load(SeqCst); let tail_hp = tail.hazard(); { - if self.tail.load(Acquire) != tail { - println!("[push] validate failed"); + if self.tail.load(SeqCst) != tail { continue; } } @@ -77,8 +76,6 @@ where // We try to help out by moving the tail pointer // on queue to the real tail we've seen, which is `next`. let _ = self.tail.compare_and_set(tail, next, Release); - println!("[push] tail wasn't tail"); - println!("{:x} != {:x}", self.tail.load(SeqCst).data, tail.data); } else { let succ = t.next .compare_and_set(Ptr::null(), new_node, Release) @@ -89,108 +86,19 @@ where // thread could have helped by moving the tail pointer. let _ = self.tail.compare_and_set(tail, new_node, Release); break; - } else { - println!("[push] failed to set Q.tail"); } } } } pub fn pop(&self) -> Option { - let head: Ptr> = self.head.load(Acquire); - let head_hp = head.hazard(); - // validate: - { - let new_head: Ptr> = self.head.load(Acquire); - // If head changed after registering, restart. - if head != new_head { - drop(head_hp); - return self.pop(); - } - } - let h: &Node = unsafe { head.deref() }; - let next: Ptr> = h.next.load(Acquire); - if next.is_null() { - return None; - } - let next_hp = next.hazard(); - { - if h.next.load(Acquire) != next { - drop(head_hp); - drop(next_hp); - return self.pop(); - } - } - // Register the `next` pointer as hazardous - match unsafe { next.as_ref() } { - Some(node) => unsafe { - // NOTE(martin): We don't really return the correct node here: - // we CAS the old sentinel node out, and make the first data - // node the new sentinel node, but return the data of `node`, - // instead of `head`. In other words, the data we return - // belongs on the node that is the new sentinel node. - let res = self.head.compare_and_set(head, next, SeqCst); - match res { - Ok(()) => { - let ret = Some(ManuallyDrop::into_inner(::std::ptr::read(&node.data))); - drop(next_hp); - // While someone is using the head pointer, keep it here. - head_hp.free(); - ret - } - // TODO: we would rather want to loop here, instead of - // giving up if there is contention? - Err(e) => None, - } - }, - None => None, - } - } - - pub fn push_node(&self, node: Owned>) { - let new_node = node.into_ptr(); - loop { - let tail: Ptr> = self.tail.load(Acquire); - let tail_hp = tail.hazard(); - { - if self.tail.load(Acquire) != tail { - drop(tail_hp); - println!("hp validate fail"); - continue; - } - } - let t = unsafe { tail.deref() }; - let next = t.next.load(Acquire); - assert!(next != tail); - if unsafe { next.as_ref().is_some() } { - // tail wasnt't tail after all. - // We try to help out by moving the tail pointer - // on queue to the real tail we've seen, which is `next`. - let _res = self.tail.compare_and_set(tail, next, Release); - println!("tail was changed. move tail ptr"); - if let Err(r) = _res { - println!("-> {:?} ==> {:?}", tail.data, next.data); - println!(" Q.tail = {:?}", self.tail.load(SeqCst).data); - } - } else { - let succ = t.next - .compare_and_set(Ptr::null(), new_node, Release) - .is_ok(); - if succ { - // the CAS succeded, and the new node is linked into the list. - // Update `queue.tail`. If we fail here it's OK, since another - // thread could have helped by moving the tail pointer. - let _ = self.tail.compare_and_set(tail, new_node, Release); - break; - } else { - println!("failed to set tail.next.") - } - } - } + self.pop_hp_fn(|hp| unsafe { hp.free() }) } - - pub fn pop_node(&self) -> Option>> { + pub fn pop_hp_fn(&self, f: F) -> Option + where + F: FnOnce(super::atomic::HazardPtr>), + { 'outer: loop { let head: Ptr> = self.head.load(Acquire); let head_hp = head.hazard(); @@ -199,7 +107,8 @@ where let new_head: Ptr> = self.head.load(Acquire); // If head changed after registering, restart. if head != new_head { - continue 'outer; + drop(head_hp); + return self.pop(); } } let h: &Node = unsafe { head.deref() }; @@ -210,29 +119,35 @@ where let next_hp = next.hazard(); { if h.next.load(Acquire) != next { - continue 'outer; + drop(head_hp); + drop(next_hp); + return self.pop(); } } - return match unsafe { next.as_ref() } { + // Register the `next` pointer as hazardous + match unsafe { next.as_ref() } { Some(node) => unsafe { - // TODO: do we need this to be a HP as well? - let new_next = node.next.load(SeqCst); - let res = h.next.compare_and_set(next, new_next, SeqCst); + // NOTE(martin): We don't really return the correct node here: + // we CAS the old sentinel node out, and make the first data + // node the new sentinel node, but return the data of `node`, + // instead of `head`. In other words, the data we return + // belongs on the node that is the new sentinel node. + let res = self.head.compare_and_set(head, next, SeqCst); match res { Ok(()) => { + let ret = Some(ManuallyDrop::into_inner(::std::ptr::read(&node.data))); drop(next_hp); - head_hp.spin(); - // Reset the next field on the node we are returning. - node.next.store(Ptr::null(), SeqCst); - Some(next.into_owned()) + // While someone is using the head pointer, keep it here. + f(head_hp); + return ret; } - // TODO: we would rather want to loop here, instead of giving up if there - // is contention? - Err(e) => None, + // TODO: we would rather want to loop here, instead of + // giving up if there is contention? + Err(e) => continue 'outer, } }, - None => None, - }; + None => return None, + } } } @@ -460,22 +375,10 @@ mod test { } } - #[test] - fn pop_node() { - const N: usize = 4; - let q = Queue::new(); - for i in 0..N { - q.push(i); - } - - let node = q.pop_node().expect("pop_node returned None"); - assert_eq!(*node.data(), 0); - } - #[test] fn stress_test() { const N_THREADS: usize = 16; - const N: usize = 1024 * 1024; + const N: usize = 1024 * 32; // NOTE: we can replace the arc problems by using crossbeams's `scope`, // instead of `thread::spawn`. @@ -499,6 +402,7 @@ mod test { while let Some(i) = source.pop() { sink.push(i); } + println!("thread {} is done", thread_id); }) }) .collect::>();