Skip to content

Implement lock-free queue for scheduler message queue #9710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Prev Previous commit
Next Next commit
clean up
  • Loading branch information
toffaletti committed Oct 7, 2013
commit d0064f94ea9381b9c9fcddabdb0b6ec5e2ea324e
74 changes: 38 additions & 36 deletions src/libstd/rt/mpsc_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,28 @@ impl<T: Send> fmt::Default for Queue<T> {
}
}

impl<T: Send> Queue<T> {
pub fn new() -> Queue<T> {
let mut q = Queue{state: UnsafeArc::new(State {
impl<T: Send> State<T> {
pub fn new() -> State<T> {
let mut state = State {
pad0: [0, ..64],
head: AtomicPtr::new(mut_null()),
pad1: [0, ..64],
stub: Default::default(),
pad2: [0, ..64],
tail: mut_null(),
pad3: [0, ..64],
})};
let stub = q.get_stub_unsafe();
q.get_head().store(stub, Relaxed);
q.set_tail(stub);
q
};
let stub = state.get_stub_unsafe();
state.head.store(stub, Relaxed);
state.tail = stub;
state
}

pub fn push(&mut self, value: T) {
fn get_stub_unsafe(&mut self) -> *mut Node<T> {
unsafe { to_mut_unsafe_ptr(&mut self.stub) }
}

fn push(&mut self, value: T) {
unsafe {
let node = cast::transmute(~Node::new(value));
self.push_node(node);
Expand All @@ -110,65 +114,63 @@ impl<T: Send> Queue<T> {
fn push_node(&mut self, node: *mut Node<T>) {
unsafe {
(*node).next.store(mut_null(), Release);
let prev = self.get_head().swap(node, Relaxed);
let prev = self.head.swap(node, Relaxed);
(*prev).next.store(node, Release);
}
}

fn get_stub_unsafe(&mut self) -> *mut Node<T> {
unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) }
}

fn get_head(&mut self) -> &mut AtomicPtr<Node<T>> {
unsafe { &mut (*self.state.get()).head }
}

fn get_tail(&mut self) -> *mut Node<T> {
unsafe { (*self.state.get()).tail }
}

fn set_tail(&mut self, tail: *mut Node<T>) {
unsafe { (*self.state.get()).tail = tail }
}

pub fn casual_pop(&mut self) -> Option<T> {
self.pop()
}

pub fn pop(&mut self) -> Option<T> {
fn pop(&mut self) -> Option<T> {
unsafe {
let mut tail = self.get_tail();
let mut tail = self.tail;
let mut next = (*tail).next.load(Acquire);
let stub = self.get_stub_unsafe();
if tail == stub {
if mut_null() == next {
return None
}
self.set_tail(next);
self.tail = next;
tail = next;
next = (*next).next.load(Acquire);
}
if next != mut_null() {
let tail: ~Node<T> = cast::transmute(tail);
self.set_tail(next);
self.tail = next;
return tail.value
}
let head = self.get_head().load(Relaxed);
let head = self.head.load(Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit I don't quite follow the logic after this point in the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the logic after this point is for the case when the queue either empty or is about to become empty. If it is empty, it just returns None, if there is a single item in the queue, it pushes the stub node that represents an empty queue so head is the stub node and then checks again to see if next is empty (another thread might have added something to the queue).

if tail != head {
return None
}
self.push_node(stub);
next = (*tail).next.load(Acquire);
if next != mut_null() {
let tail: ~Node<T> = cast::transmute(tail);
self.set_tail(next);
self.tail = next;
return tail.value
}
}
None
}
}

impl<T: Send> Queue<T> {
pub fn new() -> Queue<T> {
Queue{state: UnsafeArc::new(State::new())}
}

pub fn push(&mut self, value: T) {
unsafe { (*self.state.get()).push(value) }
}

pub fn casual_pop(&mut self) -> Option<T> {
unsafe { (*self.state.get()).pop() }
}

pub fn pop(&mut self) -> Option<T> {
unsafe{ (*self.state.get()).pop() }
}
}

#[cfg(test)]
mod tests {
use prelude::*;
Expand Down