diff --git a/src/mpmc.rs b/src/mpmc.rs index 9b0b6feb18..3da72d7f31 100644 --- a/src/mpmc.rs +++ b/src/mpmc.rs @@ -101,9 +101,14 @@ type AtomicTargetSize = atomic::AtomicUsize; type AtomicTargetSize = atomic::AtomicU8; #[cfg(feature = "mpmc_large")] -type IntSize = usize; +type UintSize = usize; #[cfg(not(feature = "mpmc_large"))] -type IntSize = u8; +type UintSize = u8; + +#[cfg(feature = "mpmc_large")] +type IntSize = isize; +#[cfg(not(feature = "mpmc_large"))] +type IntSize = i8; /// MPMC queue with a capability for 2 elements. pub type Q2 = MpMcQueue; @@ -133,7 +138,7 @@ pub struct MpMcQueue { } impl MpMcQueue { - const MASK: IntSize = (N - 1) as IntSize; + const MASK: UintSize = (N - 1) as UintSize; const EMPTY_CELL: Cell = Cell::new(0); const ASSERT: [(); 1] = [()]; @@ -146,7 +151,7 @@ impl MpMcQueue { // Const assert on size. #[allow(clippy::no_effect)] - Self::ASSERT[(N >= (IntSize::MAX as usize)) as usize]; + Self::ASSERT[(N >= (UintSize::MAX as usize)) as usize]; let mut cell_count = 0; @@ -200,7 +205,7 @@ impl Cell { const fn new(seq: usize) -> Self { Self { data: MaybeUninit::uninit(), - sequence: AtomicTargetSize::new(seq as IntSize), + sequence: AtomicTargetSize::new(seq as UintSize), } } } @@ -208,7 +213,7 @@ impl Cell { unsafe fn dequeue( buffer: *mut Cell, dequeue_pos: &AtomicTargetSize, - mask: IntSize, + mask: UintSize, ) -> Option { let mut pos = dequeue_pos.load(Ordering::Relaxed); @@ -216,7 +221,7 @@ unsafe fn dequeue( loop { cell = buffer.add(usize::from(pos & mask)); let seq = (*cell).sequence.load(Ordering::Acquire); - let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8); + let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize); match dif.cmp(&0) { core::cmp::Ordering::Equal => { @@ -251,7 +256,7 @@ unsafe fn dequeue( unsafe fn enqueue( buffer: *mut Cell, enqueue_pos: &AtomicTargetSize, - mask: IntSize, + mask: UintSize, item: T, ) -> Result<(), T> { let mut pos = enqueue_pos.load(Ordering::Relaxed); @@ -260,7 +265,7 @@ unsafe fn enqueue( loop { cell = buffer.add(usize::from(pos & mask)); let seq = (*cell).sequence.load(Ordering::Acquire); - let dif = (seq as i8).wrapping_sub(pos as i8); + let dif = (seq as IntSize).wrapping_sub(pos as IntSize); match dif.cmp(&0) { core::cmp::Ordering::Equal => { @@ -320,7 +325,8 @@ mod tests { assert!(q.enqueue(0).is_ok()); assert_eq!(q.dequeue(), Some(0)); } - // this should not block forever + + // Queue is empty, this should not block forever. assert_eq!(q.dequeue(), None); } @@ -336,4 +342,22 @@ mod tests { // this should not block forever assert!(q.enqueue(0).is_err()); } + + #[test] + fn enqueue_full() { + #[cfg(not(feature = "mpmc_large"))] + const CAPACITY: usize = 128; + + #[cfg(feature = "mpmc_large")] + const CAPACITY: usize = 256; + + let q: MpMcQueue = MpMcQueue::new(); + + for _ in 0..CAPACITY { + q.enqueue(0xAA).unwrap(); + } + + // Queue is full, this should not block forever. + q.enqueue(0x55).unwrap_err(); + } }