Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pool for any 32/64-bit architecture that supports the corresponding atomics. #458

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Added `Extend` impls for `Deque`.
- Added `Deque::make_contiguous`.
- Added `VecView`, the `!Sized` version of `Vec`.
- Added pool implementations for 64-bit architectures.

### Changed

Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ defmt-03 = ["dep:defmt"]
# Enable larger MPMC sizes.
mpmc_large = []

nightly = []

[dependencies]
portable-atomic = { version = "1.0", optional = true }
hash32 = "0.3.0"
Expand All @@ -47,7 +49,7 @@ ufmt-write = { version = "0.1", optional = true }
defmt = { version = ">=0.2.0,<0.4", optional = true }

# for the pool module
[target.'cfg(any(target_arch = "arm", target_arch = "x86"))'.dependencies]
[target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies]
stable_deref_trait = { version = "1", default-features = false }

[dev-dependencies]
Expand Down
29 changes: 25 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
//!
//! List of currently implemented data structures:
#![cfg_attr(
any(arm_llsc, target_arch = "x86"),
any(arm_llsc, target_pointer_width = "32", target_pointer_width = "64"),
doc = "- [`Arc`](pool::arc::Arc) -- like `std::sync::Arc` but backed by a lock-free memory pool rather than `#[global_allocator]`"
)]
#![cfg_attr(
any(arm_llsc, target_arch = "x86"),
any(arm_llsc, target_pointer_width = "32", target_pointer_width = "64"),
doc = "- [`Box`](pool::boxed::Box) -- like `std::boxed::Box` but backed by a lock-free memory pool rather than `#[global_allocator]`"
)]
//! - [`BinaryHeap`] -- priority queue
Expand All @@ -57,7 +57,7 @@
//! - [`IndexSet`] -- hash set
//! - [`LinearMap`]
#![cfg_attr(
any(arm_llsc, target_arch = "x86"),
any(arm_llsc, target_pointer_width = "32", target_pointer_width = "64"),
doc = "- [`Object`](pool::object::Object) -- objects managed by an object pool"
)]
//! - [`sorted_linked_list::SortedLinkedList`]
Expand All @@ -76,6 +76,14 @@
#![cfg_attr(docsrs, feature(doc_cfg), feature(doc_auto_cfg))]
#![cfg_attr(not(test), no_std)]
#![deny(missing_docs)]
#![cfg_attr(
all(
feature = "nightly",
target_pointer_width = "64",
target_has_atomic = "128"
),
feature(integer_atomics)
)]

pub use binary_heap::BinaryHeap;
pub use deque::Deque;
Expand Down Expand Up @@ -125,7 +133,20 @@ mod defmt;
all(not(feature = "mpmc_large"), target_has_atomic = "8")
))]
pub mod mpmc;
#[cfg(any(arm_llsc, target_arch = "x86"))]
#[cfg(any(
arm_llsc,
all(
target_pointer_width = "32",
any(target_has_atomic = "64", feature = "portable-atomic")
),
all(
target_pointer_width = "64",
any(
all(target_has_atomic = "128", feature = "nightly"),
feature = "portable-atomic"
)
)
))]
pub mod pool;
pub mod sorted_linked_list;
#[cfg(any(
Expand Down
2 changes: 1 addition & 1 deletion src/pool/treiber.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::mem::ManuallyDrop;

#[cfg_attr(target_arch = "x86", path = "treiber/cas.rs")]
#[cfg_attr(not(arm_llsc), path = "treiber/cas.rs")]
#[cfg_attr(arm_llsc, path = "treiber/llsc.rs")]
mod impl_;

Expand Down
174 changes: 121 additions & 53 deletions src/pool/treiber/cas.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,54 @@
use core::{
marker::PhantomData,
num::{NonZeroU32, NonZeroU64},
ptr::NonNull,
sync::atomic::{AtomicU64, Ordering},
};
use core::{marker::PhantomData, ptr::NonNull, sync::atomic::Ordering};

#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;

use super::{Node, Stack};

#[cfg(target_pointer_width = "32")]
mod types {
use super::atomic;

pub type Inner = u64;
pub type InnerAtomic = atomic::AtomicU64;
pub type InnerNonZero = core::num::NonZeroU64;

pub type Tag = core::num::NonZeroU32;
pub type Address = u32;
}

#[cfg(target_pointer_width = "64")]
mod types {
use super::atomic;

pub type Inner = u128;
pub type InnerAtomic = atomic::AtomicU128;
pub type InnerNonZero = core::num::NonZeroU128;

pub type Tag = core::num::NonZeroU64;
pub type Address = u64;
}

use types::*;

pub struct AtomicPtr<N>
where
N: Node,
{
inner: AtomicU64,
inner: InnerAtomic,
_marker: PhantomData<*mut N>,
}

impl<N> AtomicPtr<N>
where
N: Node,
{
#[inline]
pub const fn null() -> Self {
Self {
inner: AtomicU64::new(0),
inner: InnerAtomic::new(0),
_marker: PhantomData,
}
}
Expand All @@ -35,37 +62,38 @@ where
) -> Result<(), Option<NonNullPtr<N>>> {
self.inner
.compare_exchange_weak(
current
.map(|pointer| pointer.into_u64())
.unwrap_or_default(),
new.map(|pointer| pointer.into_u64()).unwrap_or_default(),
current.map(NonNullPtr::into_inner).unwrap_or_default(),
new.map(NonNullPtr::into_inner).unwrap_or_default(),
success,
failure,
)
.map(drop)
.map_err(NonNullPtr::from_u64)
.map_err(|value| {
// SAFETY: `value` cam from a `NonNullPtr::into_inner` call.
unsafe { NonNullPtr::from_inner(value) }
})
}

#[inline]
fn load(&self, order: Ordering) -> Option<NonNullPtr<N>> {
NonZeroU64::new(self.inner.load(order)).map(|inner| NonNullPtr {
inner,
Some(NonNullPtr {
inner: InnerNonZero::new(self.inner.load(order))?,
_marker: PhantomData,
})
}

#[inline]
fn store(&self, value: Option<NonNullPtr<N>>, order: Ordering) {
self.inner.store(
value.map(|pointer| pointer.into_u64()).unwrap_or_default(),
order,
)
self.inner
.store(value.map(NonNullPtr::into_inner).unwrap_or_default(), order)
}
}

pub struct NonNullPtr<N>
where
N: Node,
{
inner: NonZeroU64,
inner: InnerNonZero,
_marker: PhantomData<*mut N>,
}

Expand All @@ -84,65 +112,72 @@ impl<N> NonNullPtr<N>
where
N: Node,
{
#[inline]
pub fn as_ptr(&self) -> *mut N {
self.inner.get() as *mut N
}

pub fn from_static_mut_ref(ref_: &'static mut N) -> NonNullPtr<N> {
let non_null = NonNull::from(ref_);
Self::from_non_null(non_null)
#[inline]
pub fn from_static_mut_ref(reference: &'static mut N) -> NonNullPtr<N> {
// SAFETY: `reference` is a static mutable reference, i.e. a valid pointer.
unsafe { Self::new_unchecked(initial_tag(), NonNull::from(reference)) }
}

fn from_non_null(ptr: NonNull<N>) -> Self {
let address = ptr.as_ptr() as u32;
let tag = initial_tag().get();

let value = (u64::from(tag) << 32) | u64::from(address);
/// # Safety
///
/// - `ptr` must be a valid pointer.
#[inline]
unsafe fn new_unchecked(tag: Tag, ptr: NonNull<N>) -> Self {
let value =
(Inner::from(tag.get()) << Address::BITS) | Inner::from(ptr.as_ptr() as Address);

Self {
inner: unsafe { NonZeroU64::new_unchecked(value) },
// SAFETY: `value` is constructed from a `Tag` which is non-zero and half the
// size of the `InnerNonZero` type, and a `NonNull<N>` pointer.
inner: unsafe { InnerNonZero::new_unchecked(value) },
_marker: PhantomData,
}
}

fn from_u64(value: u64) -> Option<Self> {
NonZeroU64::new(value).map(|inner| Self {
inner,
/// # Safety
///
/// - `value` must come from a `Self::into_inner` call.
#[inline]
unsafe fn from_inner(value: Inner) -> Option<Self> {
Some(Self {
inner: InnerNonZero::new(value)?,
_marker: PhantomData,
})
}

#[inline]
fn non_null(&self) -> NonNull<N> {
unsafe { NonNull::new_unchecked(self.inner.get() as *mut N) }
// SAFETY: `Self` can only be constructed using a `NonNull<N>`.
unsafe { NonNull::new_unchecked(self.as_ptr()) }
}

fn tag(&self) -> NonZeroU32 {
unsafe { NonZeroU32::new_unchecked((self.inner.get() >> 32) as u32) }
}

fn into_u64(self) -> u64 {
#[inline]
fn into_inner(self) -> Inner {
self.inner.get()
}

fn increase_tag(&mut self) {
let address = self.as_ptr() as u32;

let new_tag = self
.tag()
.get()
.checked_add(1)
.map(|val| unsafe { NonZeroU32::new_unchecked(val) })
.unwrap_or_else(initial_tag)
.get();
#[inline]
fn tag(&self) -> Tag {
// SAFETY: `self.inner` was constructed from a non-zero `Tag`.
unsafe { Tag::new_unchecked((self.inner.get() >> Address::BITS) as Address) }
}

let value = (u64::from(new_tag) << 32) | u64::from(address);
fn increment_tag(&mut self) {
let new_tag = self.tag().checked_add(1).unwrap_or_else(initial_tag);

self.inner = unsafe { NonZeroU64::new_unchecked(value) };
// SAFETY: `self.non_null()` is a valid pointer.
*self = unsafe { Self::new_unchecked(new_tag, self.non_null()) };
}
}

fn initial_tag() -> NonZeroU32 {
unsafe { NonZeroU32::new_unchecked(1) }
#[inline]
const fn initial_tag() -> Tag {
Tag::MIN
}

pub unsafe fn push<N>(stack: &Stack<N>, new_top: NonNullPtr<N>)
Expand Down Expand Up @@ -184,7 +219,40 @@ where
.compare_and_exchange_weak(Some(top), next, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
top.increase_tag();
// Prevent the ABA problem (https://en.wikipedia.org/wiki/Treiber_stack#Correctness).
//
// Without this, the following would be possible:
//
// | Thread 1 | Thread 2 | Stack |
// |-------------------------------|-------------------------|------------------------------|
// | push((1, 1)) | | (1, 1) |
// | push((1, 2)) | | (1, 2) -> (1, 1) |
// | p = try_pop()::load // (1, 2) | | (1, 2) -> (1, 1) |
// | | p = try_pop() // (1, 2) | (1, 1) |
// | | push((1, 3)) | (1, 3) -> (1, 1) |
// | | push(p) | (1, 2) -> (1, 3) -> (1, 1) |
// | try_pop()::cas(p, p.next) | | (1, 1) |
//
// As can be seen, the `cas` operation succeeds, wrongly removing pointer `3` from the stack.
//
// By incrementing the tag before returning the pointer, it cannot be pushed again with the,
// same tag, preventing the `try_pop()::cas(p, p.next)` operation from succeeding.
//
// With this fix, `try_pop()` in thread 2 returns `(2, 2)` and the comparison between
// `(1, 2)` and `(2, 2)` fails, restarting the loop and correctly removing the new top:
//
// | Thread 1 | Thread 2 | Stack |
// |-------------------------------|-------------------------|------------------------------|
// | push((1, 1)) | | (1, 1) |
// | push((1, 2)) | | (1, 2) -> (1, 1) |
// | p = try_pop()::load // (1, 2) | | (1, 2) -> (1, 1) |
// | | p = try_pop() // (2, 2) | (1, 1) |
// | | push((1, 3)) | (1, 3) -> (1, 1) |
// | | push(p) | (2, 2) -> (1, 3) -> (1, 1) |
// | try_pop()::cas(p, p.next) | | (2, 2) -> (1, 3) -> (1, 1) |
// | p = try_pop()::load // (2, 2) | | (2, 2) -> (1, 3) -> (1, 1) |
// | try_pop()::cas(p, p.next) | | (1, 3) -> (1, 1) |
top.increment_tag();

return Some(top);
}
Expand Down
9 changes: 7 additions & 2 deletions src/pool/treiber/llsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ impl<N> AtomicPtr<N>
where
N: Node,
{
#[inline]
pub const fn null() -> Self {
Self {
inner: UnsafeCell::new(None),
Expand All @@ -34,10 +35,12 @@ impl<N> NonNullPtr<N>
where
N: Node,
{
#[inline]
pub fn as_ptr(&self) -> *mut N {
self.inner.as_ptr().cast()
}

#[inline]
pub fn from_static_mut_ref(ref_: &'static mut N) -> Self {
Self {
inner: NonNull::from(ref_),
Expand Down Expand Up @@ -122,7 +125,8 @@ mod arch {
}

/// # Safety
/// - `addr` must be a valid pointer
///
/// - `addr` must be a valid pointer.
#[inline(always)]
pub unsafe fn load_link(addr: *const usize) -> usize {
let value;
Expand All @@ -131,7 +135,8 @@ mod arch {
}

/// # Safety
/// - `addr` must be a valid pointer
///
/// - `addr` must be a valid pointer.
#[inline(always)]
pub unsafe fn store_conditional(value: usize, addr: *mut usize) -> Result<(), ()> {
let outcome: usize;
Expand Down
Loading