A lock-free queue for asynchronous & synchronous code from the ACM paper, "A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev (arXiV/1908.04511). This implements the proposed SCQ (bounded) and LSCQ (unbounded) variant of the queue using atomics.
#![no_std]option for embedded environments- bounded & unbounded MPMC concurrent queues
- very fast performance characteristics under high & low contentions.
The following section will give quickstart code examples. One limitation of the library is that sizes must be powers of two, therefore, only 1, 2, 4, 8, 16, ... are supported as lengths.
Usually what you want is an allocated queue, which means that the values are all on the heap. There are two types: AllocBoundedQueue, which is the bounded queue but heap-allocated, and the UnboundedQueue which is always heap allocated.
// Import the allocated quuees.
use lfqueue::AllocBoundedQueue;
// Make an allocated queue of size 8.
let queue = AllocBoundedQueue::new(8);
assert!(queue.enqueue(0).is_ok()); // this should enqueue correctly.
assert_eq!(queue.dequeue(), Some(0));You may also want an UnboundedQueue, which requires std which is enabled by default. These are created with an initial segment size:
use lfqueue::UnboundedQueue;
// Make an unbounded queue.
let queue = UnboundedQueue::new(8);
queue.enqueue(0);
assert_eq!(queue.dequeue(), Some(0));Additionally, if you must have the len method, there are CountedUnboundedQueues which introduce the method at a small performance penalty. They work the same as UnboundedQueue, except that as of now you cannot have handles to them.
This queue is not backed by the heap and instead lives on the stack. These can be created manually, but we almost always want to use the macro which will set it up easily.
use lfqueue::{const_queue, ConstBoundedQueue};
// Make a constant queue of size 8.
let queue = const_queue!(usize; 8);
assert!(queue.enqueue(8).is_ok());
assert_eq!(queue.dequeue(), Some(8));This queue has a capacity of one, meaning that you can enqueue one message and dequeue one message. Logically, it is equivalent to const_queue!(usize; 1).
use lfqueue::SingleSize;
// Make a constant queue of size 8.
let queue = SingleSize::new();
assert!(queue.enqueue(8).is_ok());
assert_eq!(queue.dequeue(), Some(8));The queues within the library were tested against several other queues. The benchmarking is not exhaustive, but the process can be seen in benches/syncqueue.rs.
| crate | structure | test | time (ms) |
|---|---|---|---|
| lfqueue | ConstBoundedQueue (32) |
t=1,o=100 | 99.721µs |
| lfqueue | ConstBoundedQueue (32) |
t=10,o=100 | 879.07 µs |
| lfqueue | ConstBoundedQueue (32) |
t=100,o=100 | 16.303 ms |
| lfqueue | ConstBoundedQueue (32) |
t=100,o=10000 | 129.34 ms |
| lfqueue | AllocBoundedQueue (32) |
t=1,o=100 | 101.92µs |
| lfqueue | AllocBoundedQueue (32) |
t=10,o=100 | 912.69µs |
| lfqueue | AllocBoundedQueue (32) |
t=100,o=100 | 11.239ms |
| lfqueue | AllocBoundedQueue (32) |
t=100,o=10000 | 114.89ms |
| lfqueue | UnboundedQueue (seg=1024) |
t=1,o=100 | 107.90µs |
| lfqueue | UnboundedQueue (seg=1024) |
t=10,o=100 | 882.55µs |
| lfqueue | UnboundedQueue (seg=1024) |
t=100,o=100 | 11.888ms |
| lfqueue | UnboundedQueue (seg=1024) |
t=100,o=10000 | 144.09ms |
| crossbeam | SegQueue |
t=1,o=100 | 111.29µs |
| crossbeam | SegQueue |
t=10,o=100 | 995.10µs |
| crossbeam | SegQueue |
t=100,o=100 | 20.831ms |
| crossbeam | SegQueue |
t=100,o=10000 | 41.886ms |
| crossbeam | ArrayQueue (32) |
t=1,o=100 | 155.42µs |
| crossbeam | ArrayQueue (32) |
t=10,o=100 | 939.54µs |
| crossbeam | ArrayQueue (32) |
t=100,o=100 | 11.161ms |
| crossbeam | ArrayQueue (32) |
t=100,o=10000 | 99.484ms |
| lockfree | Queue |
t=1,o=100 | 114.87µs |
| lockfree | Queue |
t=10,o=100 | 1.0594ms |
| lockfree | Queue |
t=100,o=100 | 13.756ms |
| lockfree | Queue |
t=100,o=10000 | 496.96ms |
| std | Mutex<VecDequeue> |
t=1,o=100 | 100.49µs |
| std | Mutex<VecDequeue> |
t=10,o=100 | 1.2141ms |
| std | Mutex<VecDequeue> |
t=100,o=100 | 13.509ms |
| std | Mutex<VecDequeue> |
t=100,o=10000 | 234.65ms |
Testing with a queue size of 1.
| crate | structure | test | time (ms) |
|---|---|---|---|
| lfqueue | SingleSize (1) |
t=1,o=100 | 105.35µs |
| lfqueue | SingleSize (1) |
t=10,o=100 | 924.79µs |
| lfqueue | SingleSize (1) |
t=100,o=100 | 11.914ms |
| lfqueue | SingleSize (1) |
t=100,o=10000 | 13.093ms |
| lfqueue | ConstBoundedQueue (1) |
t=1,o=100 | 133.09µs |
| lfqueue | ConstBoundedQueue (1) |
t=10,o=100 | 980.12 µs |
| lfqueue | ConstBoundedQueue (1) |
t=100,o=100 | 12.108ms |
| lfqueue | ConstBoundedQueue (1) |
t=100,o=10000 | 34.351ms |
| lfqueue | AllocBoundedQueue (1) |
t=1,o=100 | 107.35µs |
| lfqueue | AllocBoundedQueue (1) |
t=10,o=100 | 1.0678ms |
| lfqueue | AllocBoundedQueue (1) |
t=100,o=100 | 11.853ms |
| lfqueue | AllocBoundedQueue (1) |
t=100,o=10000 | 165.84ms |
| std | Mutex<Option<T>> (1) |
t=1,o=100 | 98.686µs |
| std | Mutex<Option<T>> (1) |
t=10,o=100 | 879.31 µs |
| std | Mutex<Option<T>> (1) |
t=100,o=100 | 12.136ms |
| std | Mutex<Option<T>> (1) |
t=100,o=10000 | 165.66ms |
| crossbeam | ArrayQueue (1) |
t=1,o=100 | 124.43µs |
| crossbeam | ArrayQueue (1) |
t=10,o=100 | 1.1164ms |
| crossbeam | ArrayQueue (1) |
t=100,o=100 | 12.566ms |
| crossbeam | ArrayQueue (1) |
t=100,o=10000 | 97.277ms |
- Basic Trading Order Scheduler: (
examples/trading.rs) A very basic example showing a single loop publishing orders to a network, and another process that sends the orders out to the exchange.
Each data structure fundamentally relies on the SCQ ring described in the ACM paper. The ring is a MPMC queue for indices. The size must be a power of two, hence why on initialization we must pass an order instead of a capacity directly.
The bounded queue is the SCQ queue from the ACM paper. It works by maintaining two rings:
- Free Ring: This ring contains all the available indexes that we can slot a value into.
- Allocation Ring: This ring contains all the indexes that are currently allocated.
Additionally, the bounded queue contains a backing buffer, which stores the entries itself. The correctness of the various methods relies on the fact that both of these are correct MPMC index rings.
The bounded queue is initialized by filling the free ring with values from 0 to n - 1 (all the valid indexes) and by leaving the allocation queue empty. Therefore, any given index can only be in at most one of these rings, and since they are correct concurrent MPMC index rings, only one thread possesses said index.
The enqueue method works by first finding an available slot. To do this, an index is dequeued from the free queue. We can then access the backing buffer at this index, with two guarantees:
- Since any index can only be in at most one of the queues, and only one thread can "possess" an index at any point in time, we have a unique index into the array and thus can safely take mutable access to that slot.
- Since any index coming from the free queue is between
0andn(0, 1, 2, ..., n-1).
Using this, we access the slot, set the value, and then we insert the index into the allocation array, indicating we are done.
The dequeue method works similarly and off the same guarantees, except in reverse. An index is dequeued from the allocation ring, we remove it from it's slot using our unique access, and then we return the index to the free ring so it can be used.
This queue is described in the ACM paper as the LCSQ queue. It is a more 'classical' implementation of a lock-free queue (and thus slower), but composed of bounded queues. Since adding and removing the queues themselves is a relatively rare operation, the cost is dominated by the operations on the bounded queue.
The loom tests are not exhaustive and should be improved in the future, since there are loops the state space can explode fast, and thus at times loom cannot run the full set of tests we would like.
$ LOOM_MAX_PREEMPTIONS=3 RUSTFLAGS="--cfg loom" cargo test loom_ --releaseThere are a whole suite of fuzz tests for finding unwanted behaviour, memory leaks, etc.
$ RUSTFLAGS="--cfg fuzzing" cargo +nightly fuzz run initfullscq_grind(fuzz/fuzz_targets/scq_grind.rs) performs an arbitrary sequence of operations on an SCQ queue and checks that it lines up with theVecDequeimplementation fromstd. This checks for correctness as a queue, ensuring that order is preserved correctly.lscq_grind(fuzz/fuzz_targets/lscq_grind.rs) performs an arbitrary sequence of operations on an SCQ queue and checks that it lines up with theVecDequeimplementation fromstd. This checks for correctness as a queue, ensuring that order is preserved correctly.scq_grind_rt(fuzz/fuzz_target/scq_grind_rt.rs) performs an arbitrary sequence of operations on an SCQ queue on multiple threads. The idea is to find an error or a panic.lscq_grind_rt(fuzz/fuzz_target/scq_grind_rt.rs) performs an arbitrary sequence of operations on an unbounded queue on multiple threads. The idea is to find an error or a panic.const_grind(fuzz/fuzz_target/const_grind.rs) performs an arbitrary sequence of operations on a constant queues. It compares it with theVecDequeoperation instdto verify correctness.
$ rustup +nightly component add miri
$ cargo +nightly miri test