Skip to content

Commit

Permalink
Remove ArcCell, MsQueue, TreiberStack
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina committed Jan 24, 2019
1 parent 5226b17 commit 7525667
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 826 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ https://www.rust-lang.org)
This crate provides a set of tools for concurrent programming:

* Atomics
* `ArcCell<T>` is a shared mutable `Arc<T>` pointer.
* `AtomicCell<T>` is equivalent to `Cell<T>`, except it is also thread-safe.
* `AtomicConsume` allows reading from primitive atomic types with "consume" ordering.

* Data structures
* `deque` module contains work-stealing deques for building task schedulers.
* `MsQueue<T>` and `SegQueue<T>` are simple concurrent queues.
* `TreiberStack<T>` is a lock-free stack.
* `ArrayQueue<T>` is a bounded MPMC queue.
* `SegQueue<T>` is an unbounded MPMC queue.

* Thread synchronization
* `channel` module contains multi-producer multi-consumer channels for message passing.
Expand Down
109 changes: 109 additions & 0 deletions crossbeam-epoch/examples/treiber_stack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
extern crate crossbeam_epoch as epoch;
extern crate crossbeam_utils as utils;

use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};

use epoch::{Atomic, Owned};
use utils::thread::scope;

/// Treiber's lock-free stack.
///
/// Usable with any number of producers and consumers.
#[derive(Debug)]
pub struct TreiberStack<T> {
head: Atomic<Node<T>>,
}

#[derive(Debug)]
struct Node<T> {
data: ManuallyDrop<T>,
next: Atomic<Node<T>>,
}

impl<T> TreiberStack<T> {
/// Creates a new, empty stack.
pub fn new() -> TreiberStack<T> {
TreiberStack {
head: Atomic::null(),
}
}

/// Pushes a value on top of the stack.
pub fn push(&self, t: T) {
let mut n = Owned::new(Node {
data: ManuallyDrop::new(t),
next: Atomic::null(),
});

let guard = epoch::pin();

loop {
let head = self.head.load(Relaxed, &guard);
n.next.store(head, Relaxed);

match self.head.compare_and_set(head, n, Release, &guard) {
Ok(_) => break,
Err(e) => n = e.new,
}
}
}

/// Attempts to pop the top element from the stack.
///
/// Returns `None` if the stack is empty.
pub fn pop(&self) -> Option<T> {
let guard = epoch::pin();
loop {
let head = self.head.load(Acquire, &guard);

match unsafe { head.as_ref() } {
Some(h) => {
let next = h.next.load(Relaxed, &guard);

if self
.head
.compare_and_set(head, next, Release, &guard)
.is_ok()
{
unsafe {
guard.defer_destroy(head);
return Some(ManuallyDrop::into_inner(ptr::read(&(*h).data)));
}
}
}
None => return None,
}
}
}

/// Returns `true` if the stack is empty.
pub fn is_empty(&self) -> bool {
let guard = epoch::pin();
self.head.load(Acquire, &guard).is_null()
}
}

impl<T> Drop for TreiberStack<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}

fn main() {
let stack = TreiberStack::new();

scope(|scope| {
for _ in 0..10 {
scope.spawn(|_| {
for i in 0..10_000 {
stack.push(i);
assert!(stack.pop().is_some());
}
});
}
}).unwrap();

assert!(stack.pop().is_none());
}
109 changes: 0 additions & 109 deletions src/arc_cell.rs

This file was deleted.

18 changes: 3 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! Tools for concurrent programming.
//!
//! * Atomics
//! * [`ArcCell<T>`] is a shared mutable [`Arc<T>`] pointer.
//! * [`AtomicCell<T>`] is equivalent to [`Cell<T>`], except it is also thread-safe.
//! * [`AtomicConsume`] allows reading from primitive atomic types with "consume" ordering.
//!
//! * Data structures
//! * [`deque`] module contains work-stealing deques for building task schedulers.
//! * [`MsQueue<T>`] and [`SegQueue<T>`] are simple concurrent queues.
//! * [`TreiberStack<T>`] is a lock-free stack.
//! * [`ArrayQueue<T>`] is a bounded MPMC queue.
//! * [`SegQueue<T>`] is an unbounded MPMC queue.
//!
//! * Thread synchronization
//! * [`channel`] module contains multi-producer multi-consumer channels for message passing.
Expand All @@ -22,15 +21,13 @@
//! * [`CachePadded<T>`] pads and aligns a value to the length of a cache line.
//! * [`scope()`] can spawn threads that borrow local variables from the stack.
//!
//! [`ArcCell<T>`]: atomic/struct.ArcCell.html
//! [`Arc<T>`]: https://doc.rust-lang.org/std/sync/struct.Arc.html
//! [`AtomicCell<T>`]: atomic/struct.AtomicCell.html
//! [`Cell<T>`]: https://doc.rust-lang.org/std/cell/struct.Cell.html
//! [`AtomicConsume`]: atomic/trait.AtomicConsume.html
//! [`deque`]: deque/index.html
//! [`MsQueue<T>`]: queue/struct.MsQueue.html
//! [`ArrayQueue<T>`]: queue/struct.ArrayQueue.html
//! [`SegQueue<T>`]: queue/struct.SegQueue.html
//! [`TreiberStack<T>`]: stack/struct.TreiberStack.html
//! [`channel`]: channel/index.html
//! [`ShardedLock<T>`]: sync/struct.ShardedLock.html
//! [`RwLock<T>`]: https://doc.rust-lang.org/std/sync/struct.RwLock.html
Expand Down Expand Up @@ -68,13 +65,10 @@ mod _epoch {
#[doc(inline)]
pub use _epoch::crossbeam_epoch as epoch;

mod arc_cell;

extern crate crossbeam_utils;

/// Atomic types.
pub mod atomic {
pub use arc_cell::ArcCell;
pub use crossbeam_utils::atomic::AtomicCell;
pub use crossbeam_utils::atomic::AtomicConsume;
}
Expand Down Expand Up @@ -114,17 +108,11 @@ cfg_if! {
extern crate parking_lot;

mod sharded_lock;
mod treiber_stack;
mod wait_group;

/// Concurrent queues.
pub mod queue;

/// Concurrent stacks.
pub mod stack {
pub use treiber_stack::TreiberStack;
}

/// Thread synchronization primitives.
pub mod sync {
pub use crossbeam_utils::sync::Parker;
Expand Down
2 changes: 0 additions & 2 deletions src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ use std::error;
use std::fmt;

mod array_queue;
mod ms_queue;
mod seg_queue;

pub use self::array_queue::ArrayQueue;
pub use self::ms_queue::MsQueue;
pub use self::seg_queue::SegQueue;

/// Error which occurs when popping from an empty queue.
Expand Down
Loading

0 comments on commit 7525667

Please sign in to comment.