Skip to content

Commit

Permalink
Remove hp::queue::{push,pop}_node functions.
Browse files Browse the repository at this point in the history
They were buggy, and not really needed. Also make a new function that
takes a closure which is passed the HP before returning. This makes it
possible to either push it to the hp queue or spin, such that we don't
need to chunk up the HPs in blocks, as we do with EBR. Chunking would be
possible, but tiresome, as we would need to mark all HPs in a block or
something, so that we don't `scan` all of them every check for when the
block is safe to free.

`hp::queue::test::stress_test` runs fine.
  • Loading branch information
Martin Hafskjold Thoresen committed Nov 20, 2017
1 parent 821eb7e commit e364d15
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 140 deletions.
15 changes: 14 additions & 1 deletion src/hp/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,14 +992,19 @@ impl<T> HazardPtr<T> {
/// Check if the pointer is marked as hazardous by any other thread. This should only be called
/// after deregistering, or else we will report itself.
pub fn scan(&self) -> bool {
HazardPtr::<()>::scan_addr(self.data)
}

pub fn scan_addr(addr: usize) -> bool {
for e in ::hp::ENTRIES.iter() {
for p in e.hazard_pointers.iter() {
if self.data == p.load(Ordering::SeqCst) {
if addr == p.load(Ordering::SeqCst) {
return true;
}
}
}
false

}

// TODO: name
Expand All @@ -1016,6 +1021,7 @@ impl<T> HazardPtr<T> {
#[cfg(not(feature = "hp-wait"))]
pub fn wait(&self) {}

/// Block until no other thread has this HP registered. Do not drop the pointer.
pub fn spin(&self) {
assert!(self.deregister().is_ok());
while self.scan() {
Expand All @@ -1039,6 +1045,13 @@ impl<T> HazardPtr<T> {
}
}

pub fn from_data(data: usize) -> Self {
Self {
data,
_marker: PhantomData,
}
}

pub fn from_ptr(ptr: Ptr<T>) -> Self {
Self::from_raw(ptr.as_raw() as usize)
}
Expand Down
28 changes: 16 additions & 12 deletions src/hp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ lazy_static! {

}

struct Garbage(Box<FnOnce()>);
struct Garbage(Box<FnOnce()>, usize);

unsafe impl Send for Garbage {}
unsafe impl Sync for Garbage {}
Expand All @@ -79,7 +79,11 @@ impl Garbage {
T: 'static,
{
let d = t.data;
Garbage(Box::new(move || { ::std::mem::forget(t); }))
Garbage(Box::new(move || { ::std::mem::forget(t); }), d)
}

fn address(&self) -> usize {
self.1
}
}

Expand All @@ -104,18 +108,18 @@ where
fn free_from_queue() {
const N: usize = 3;
for _ in 0..N {
if let Some(mut owned_ptr) = HAZARD_QUEUE.pop_node() {
let hp: HazardPtr<Garbage> = HazardPtr::fake(owned_ptr.data);
if hp.scan() {
if let Some(garbage) = HAZARD_QUEUE.pop_hp_fn(|h| {
h.spin();
unsafe {
h.into_owned();
}
})
{
if HazardPtr::<()>::scan_addr(garbage.address()) {
// used
HAZARD_QUEUE.push_node(owned_ptr);
HAZARD_QUEUE.push(garbage);
} else {
forget(hp);
{
let mut node: &mut queue::Node<Garbage> = &mut *owned_ptr;
unsafe { ManuallyDrop::drop(&mut node.data) };
}
drop(owned_ptr);
drop(garbage);
}
} else {
return;
Expand Down
158 changes: 31 additions & 127 deletions src/hp/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ where
let new_node = node.into_ptr();
loop {
// TODO: what's up with orderings here?
let tail: Ptr<Node<T>> = self.tail.load(Acquire);
let tail: Ptr<Node<T>> = self.tail.load(SeqCst);
let tail_hp = tail.hazard();
{
if self.tail.load(Acquire) != tail {
println!("[push] validate failed");
if self.tail.load(SeqCst) != tail {
continue;
}
}
Expand All @@ -77,8 +76,6 @@ where
// We try to help out by moving the tail pointer
// on queue to the real tail we've seen, which is `next`.
let _ = self.tail.compare_and_set(tail, next, Release);
println!("[push] tail wasn't tail");
println!("{:x} != {:x}", self.tail.load(SeqCst).data, tail.data);
} else {
let succ = t.next
.compare_and_set(Ptr::null(), new_node, Release)
Expand All @@ -89,108 +86,19 @@ where
// thread could have helped by moving the tail pointer.
let _ = self.tail.compare_and_set(tail, new_node, Release);
break;
} else {
println!("[push] failed to set Q.tail");
}
}
}
}

pub fn pop(&self) -> Option<T> {
let head: Ptr<Node<T>> = self.head.load(Acquire);
let head_hp = head.hazard();
// validate:
{
let new_head: Ptr<Node<T>> = self.head.load(Acquire);
// If head changed after registering, restart.
if head != new_head {
drop(head_hp);
return self.pop();
}
}
let h: &Node<T> = unsafe { head.deref() };
let next: Ptr<Node<T>> = h.next.load(Acquire);
if next.is_null() {
return None;
}
let next_hp = next.hazard();
{
if h.next.load(Acquire) != next {
drop(head_hp);
drop(next_hp);
return self.pop();
}
}
// Register the `next` pointer as hazardous
match unsafe { next.as_ref() } {
Some(node) => unsafe {
// NOTE(martin): We don't really return the correct node here:
// we CAS the old sentinel node out, and make the first data
// node the new sentinel node, but return the data of `node`,
// instead of `head`. In other words, the data we return
// belongs on the node that is the new sentinel node.
let res = self.head.compare_and_set(head, next, SeqCst);
match res {
Ok(()) => {
let ret = Some(ManuallyDrop::into_inner(::std::ptr::read(&node.data)));
drop(next_hp);
// While someone is using the head pointer, keep it here.
head_hp.free();
ret
}
// TODO: we would rather want to loop here, instead of
// giving up if there is contention?
Err(e) => None,
}
},
None => None,
}
}

pub fn push_node(&self, node: Owned<Node<T>>) {
let new_node = node.into_ptr();
loop {
let tail: Ptr<Node<T>> = self.tail.load(Acquire);
let tail_hp = tail.hazard();
{
if self.tail.load(Acquire) != tail {
drop(tail_hp);
println!("hp validate fail");
continue;
}
}
let t = unsafe { tail.deref() };
let next = t.next.load(Acquire);
assert!(next != tail);
if unsafe { next.as_ref().is_some() } {
// tail wasnt't tail after all.
// We try to help out by moving the tail pointer
// on queue to the real tail we've seen, which is `next`.
let _res = self.tail.compare_and_set(tail, next, Release);
println!("tail was changed. move tail ptr");
if let Err(r) = _res {
println!("-> {:?} ==> {:?}", tail.data, next.data);
println!(" Q.tail = {:?}", self.tail.load(SeqCst).data);
}
} else {
let succ = t.next
.compare_and_set(Ptr::null(), new_node, Release)
.is_ok();
if succ {
// the CAS succeded, and the new node is linked into the list.
// Update `queue.tail`. If we fail here it's OK, since another
// thread could have helped by moving the tail pointer.
let _ = self.tail.compare_and_set(tail, new_node, Release);
break;
} else {
println!("failed to set tail.next.")
}
}
}
self.pop_hp_fn(|hp| unsafe { hp.free() })
}


pub fn pop_node(&self) -> Option<Owned<Node<T>>> {
pub fn pop_hp_fn<F>(&self, f: F) -> Option<T>
where
F: FnOnce(super::atomic::HazardPtr<Node<T>>),
{
'outer: loop {
let head: Ptr<Node<T>> = self.head.load(Acquire);
let head_hp = head.hazard();
Expand All @@ -199,7 +107,8 @@ where
let new_head: Ptr<Node<T>> = self.head.load(Acquire);
// If head changed after registering, restart.
if head != new_head {
continue 'outer;
drop(head_hp);
return self.pop();
}
}
let h: &Node<T> = unsafe { head.deref() };
Expand All @@ -210,29 +119,35 @@ where
let next_hp = next.hazard();
{
if h.next.load(Acquire) != next {
continue 'outer;
drop(head_hp);
drop(next_hp);
return self.pop();
}
}
return match unsafe { next.as_ref() } {
// Register the `next` pointer as hazardous
match unsafe { next.as_ref() } {
Some(node) => unsafe {
// TODO: do we need this to be a HP as well?
let new_next = node.next.load(SeqCst);
let res = h.next.compare_and_set(next, new_next, SeqCst);
// NOTE(martin): We don't really return the correct node here:
// we CAS the old sentinel node out, and make the first data
// node the new sentinel node, but return the data of `node`,
// instead of `head`. In other words, the data we return
// belongs on the node that is the new sentinel node.
let res = self.head.compare_and_set(head, next, SeqCst);
match res {
Ok(()) => {
let ret = Some(ManuallyDrop::into_inner(::std::ptr::read(&node.data)));
drop(next_hp);
head_hp.spin();
// Reset the next field on the node we are returning.
node.next.store(Ptr::null(), SeqCst);
Some(next.into_owned())
// While someone is using the head pointer, keep it here.
f(head_hp);
return ret;
}
// TODO: we would rather want to loop here, instead of giving up if there
// is contention?
Err(e) => None,
// TODO: we would rather want to loop here, instead of
// giving up if there is contention?
Err(e) => continue 'outer,
}
},
None => None,
};
None => return None,
}
}
}

Expand Down Expand Up @@ -460,22 +375,10 @@ mod test {
}
}

#[test]
fn pop_node() {
const N: usize = 4;
let q = Queue::new();
for i in 0..N {
q.push(i);
}

let node = q.pop_node().expect("pop_node returned None");
assert_eq!(*node.data(), 0);
}

#[test]
fn stress_test() {
const N_THREADS: usize = 16;
const N: usize = 1024 * 1024;
const N: usize = 1024 * 32;

// NOTE: we can replace the arc problems by using crossbeams's `scope`,
// instead of `thread::spawn`.
Expand All @@ -499,6 +402,7 @@ mod test {
while let Some(i) = source.pop() {
sink.push(i);
}
println!("thread {} is done", thread_id);
})
})
.collect::<Vec<_>>();
Expand Down

0 comments on commit e364d15

Please sign in to comment.