Skip to content

Commit

Permalink
Merge pull request #462 from reitermarkus/fix-mpmc-large
Browse files Browse the repository at this point in the history
Fix `MpMcQueue` with `mpmc_large` feature.
  • Loading branch information
reitermarkus authored May 20, 2024
2 parents 07c072c + 62908ca commit 329d733
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Fixed `{arc,box,object}_pool!` emitting clippy lints.
- Fixed the list of implemented data structures in the crate docs, by adding `Deque`,
`HistoryBuffer` and `SortedLinkedList` to the list.
- Fixed `MpMcQueue` with `mpmc_large` feature.

## [v0.8.0] - 2023-11-07

Expand Down
44 changes: 34 additions & 10 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ type AtomicTargetSize = atomic::AtomicUsize;
type AtomicTargetSize = atomic::AtomicU8;

#[cfg(feature = "mpmc_large")]
type IntSize = usize;
type UintSize = usize;
#[cfg(not(feature = "mpmc_large"))]
type IntSize = u8;
type UintSize = u8;

#[cfg(feature = "mpmc_large")]
type IntSize = isize;
#[cfg(not(feature = "mpmc_large"))]
type IntSize = i8;

/// MPMC queue with a capability for 2 elements.
pub type Q2<T> = MpMcQueue<T, 2>;
Expand Down Expand Up @@ -133,7 +138,7 @@ pub struct MpMcQueue<T, const N: usize> {
}

impl<T, const N: usize> MpMcQueue<T, N> {
const MASK: IntSize = (N - 1) as IntSize;
const MASK: UintSize = (N - 1) as UintSize;
const EMPTY_CELL: Cell<T> = Cell::new(0);

const ASSERT: [(); 1] = [()];
Expand All @@ -146,7 +151,7 @@ impl<T, const N: usize> MpMcQueue<T, N> {

// Const assert on size.
#[allow(clippy::no_effect)]
Self::ASSERT[(N >= (IntSize::MAX as usize)) as usize];
Self::ASSERT[(N >= (UintSize::MAX as usize)) as usize];

let mut cell_count = 0;

Expand Down Expand Up @@ -200,23 +205,23 @@ impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as IntSize),
sequence: AtomicTargetSize::new(seq as UintSize),
}
}
}

unsafe fn dequeue<T>(
buffer: *mut Cell<T>,
dequeue_pos: &AtomicTargetSize,
mask: IntSize,
mask: UintSize,
) -> Option<T> {
let mut pos = dequeue_pos.load(Ordering::Relaxed);

let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8);
let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);

match dif.cmp(&0) {
core::cmp::Ordering::Equal => {
Expand Down Expand Up @@ -251,7 +256,7 @@ unsafe fn dequeue<T>(
unsafe fn enqueue<T>(
buffer: *mut Cell<T>,
enqueue_pos: &AtomicTargetSize,
mask: IntSize,
mask: UintSize,
item: T,
) -> Result<(), T> {
let mut pos = enqueue_pos.load(Ordering::Relaxed);
Expand All @@ -260,7 +265,7 @@ unsafe fn enqueue<T>(
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as i8).wrapping_sub(pos as i8);
let dif = (seq as IntSize).wrapping_sub(pos as IntSize);

match dif.cmp(&0) {
core::cmp::Ordering::Equal => {
Expand Down Expand Up @@ -320,7 +325,8 @@ mod tests {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
// this should not block forever

// Queue is empty, this should not block forever.
assert_eq!(q.dequeue(), None);
}

Expand All @@ -336,4 +342,22 @@ mod tests {
// this should not block forever
assert!(q.enqueue(0).is_err());
}

#[test]
fn enqueue_full() {
#[cfg(not(feature = "mpmc_large"))]
const CAPACITY: usize = 128;

#[cfg(feature = "mpmc_large")]
const CAPACITY: usize = 256;

let q: MpMcQueue<u8, CAPACITY> = MpMcQueue::new();

for _ in 0..CAPACITY {
q.enqueue(0xAA).unwrap();
}

// Queue is full, this should not block forever.
q.enqueue(0x55).unwrap_err();
}
}

0 comments on commit 329d733

Please sign in to comment.