Skip to content

DiscordJim/lfqueue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

lfqueue

GitHub Actions Status · Crates.io Version · Crates.io Downloads · License: MIT

A lock-free queue for asynchronous & synchronous code from the ACM paper, "A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev (arXiV/1908.04511). This implements the proposed SCQ (bounded) and LSCQ (unbounded) variant of the queue using atomics.

Features

  • #![no_std] option for embedded environments
  • bounded & unbounded MPMC concurrent queues
  • very fast performance characteristics under high & low contentions.

Quickstart

The following section will give quickstart code examples. One limitation of the library is that sizes must be powers of two, therefore, only 1, 2, 4, 8, 16, ... are supported as lengths.

Allocated Queues

Usually what you want is an allocated queue, which means that the values are all on the heap. There are two types: AllocBoundedQueue, which is the bounded queue but heap-allocated, and the UnboundedQueue which is always heap allocated.

// Import the allocated quuees.
use lfqueue::AllocBoundedQueue;

// Make an allocated queue of size 8.
let queue = AllocBoundedQueue::new(8);
assert!(queue.enqueue(0).is_ok()); // this should enqueue correctly.
assert_eq!(queue.dequeue(), Some(0));

You may also want an UnboundedQueue, which requires std which is enabled by default. These are created with an initial segment size:

use lfqueue::UnboundedQueue;

// Make an unbounded queue.
let queue = UnboundedQueue::new(8);
queue.enqueue(0);
assert_eq!(queue.dequeue(), Some(0));

Additionally, if you must have the len method, there are CountedUnboundedQueues which introduce the method at a small performance penalty. They work the same as UnboundedQueue, except that as of now you cannot have handles to them.

Constant Queues

This queue is not backed by the heap and instead lives on the stack. These can be created manually, but we almost always want to use the macro which will set it up easily.

use lfqueue::{const_queue, ConstBoundedQueue};

// Make a constant queue of size 8.
let queue = const_queue!(usize; 8);
assert!(queue.enqueue(8).is_ok());
assert_eq!(queue.dequeue(), Some(8));

SingleSize Queues

This queue has a capacity of one, meaning that you can enqueue one message and dequeue one message. Logically, it is equivalent to const_queue!(usize; 1).

use lfqueue::SingleSize;

// Make a constant queue of size 8.
let queue = SingleSize::new();
assert!(queue.enqueue(8).is_ok());
assert_eq!(queue.dequeue(), Some(8));

Benchmarks

The queues within the library were tested against several other queues. The benchmarking is not exhaustive, but the process can be seen in benches/syncqueue.rs.

Larger Queues

crate structure test time (ms)
lfqueue ConstBoundedQueue (32) t=1,o=100 99.721µs
lfqueue ConstBoundedQueue (32) t=10,o=100 879.07 µs
lfqueue ConstBoundedQueue (32) t=100,o=100 16.303 ms
lfqueue ConstBoundedQueue (32) t=100,o=10000 129.34 ms
lfqueue AllocBoundedQueue (32) t=1,o=100 101.92µs
lfqueue AllocBoundedQueue (32) t=10,o=100 912.69µs
lfqueue AllocBoundedQueue (32) t=100,o=100 11.239ms
lfqueue AllocBoundedQueue (32) t=100,o=10000 114.89ms
lfqueue UnboundedQueue (seg=1024) t=1,o=100 107.90µs
lfqueue UnboundedQueue (seg=1024) t=10,o=100 882.55µs
lfqueue UnboundedQueue (seg=1024) t=100,o=100 11.888ms
lfqueue UnboundedQueue (seg=1024) t=100,o=10000 144.09ms
crossbeam SegQueue t=1,o=100 111.29µs
crossbeam SegQueue t=10,o=100 995.10µs
crossbeam SegQueue t=100,o=100 20.831ms
crossbeam SegQueue t=100,o=10000 41.886ms
crossbeam ArrayQueue (32) t=1,o=100 155.42µs
crossbeam ArrayQueue (32) t=10,o=100 939.54µs
crossbeam ArrayQueue (32) t=100,o=100 11.161ms
crossbeam ArrayQueue (32) t=100,o=10000 99.484ms
lockfree Queue t=1,o=100 114.87µs
lockfree Queue t=10,o=100 1.0594ms
lockfree Queue t=100,o=100 13.756ms
lockfree Queue t=100,o=10000 496.96ms
std Mutex<VecDequeue> t=1,o=100 100.49µs
std Mutex<VecDequeue> t=10,o=100 1.2141ms
std Mutex<VecDequeue> t=100,o=100 13.509ms
std Mutex<VecDequeue> t=100,o=10000 234.65ms

Single shot

Testing with a queue size of 1.

crate structure test time (ms)
lfqueue SingleSize (1) t=1,o=100 105.35µs
lfqueue SingleSize (1) t=10,o=100 924.79µs
lfqueue SingleSize (1) t=100,o=100 11.914ms
lfqueue SingleSize (1) t=100,o=10000 13.093ms
lfqueue ConstBoundedQueue (1) t=1,o=100 133.09µs
lfqueue ConstBoundedQueue (1) t=10,o=100 980.12 µs
lfqueue ConstBoundedQueue (1) t=100,o=100 12.108ms
lfqueue ConstBoundedQueue (1) t=100,o=10000 34.351ms
lfqueue AllocBoundedQueue (1) t=1,o=100 107.35µs
lfqueue AllocBoundedQueue (1) t=10,o=100 1.0678ms
lfqueue AllocBoundedQueue (1) t=100,o=100 11.853ms
lfqueue AllocBoundedQueue (1) t=100,o=10000 165.84ms
std Mutex<Option<T>> (1) t=1,o=100 98.686µs
std Mutex<Option<T>> (1) t=10,o=100 879.31 µs
std Mutex<Option<T>> (1) t=100,o=100 12.136ms
std Mutex<Option<T>> (1) t=100,o=10000 165.66ms
crossbeam ArrayQueue (1) t=1,o=100 124.43µs
crossbeam ArrayQueue (1) t=10,o=100 1.1164ms
crossbeam ArrayQueue (1) t=100,o=100 12.566ms
crossbeam ArrayQueue (1) t=100,o=10000 97.277ms

Examples

  1. Basic Trading Order Scheduler: (examples/trading.rs) A very basic example showing a single loop publishing orders to a network, and another process that sends the orders out to the exchange.

Design

SCQ Rings

Each data structure fundamentally relies on the SCQ ring described in the ACM paper. The ring is a MPMC queue for indices. The size must be a power of two, hence why on initialization we must pass an order instead of a capacity directly.

Bounded Queue

The bounded queue is the SCQ queue from the ACM paper. It works by maintaining two rings:

  • Free Ring: This ring contains all the available indexes that we can slot a value into.
  • Allocation Ring: This ring contains all the indexes that are currently allocated.

Additionally, the bounded queue contains a backing buffer, which stores the entries itself. The correctness of the various methods relies on the fact that both of these are correct MPMC index rings.

The bounded queue is initialized by filling the free ring with values from 0 to n - 1 (all the valid indexes) and by leaving the allocation queue empty. Therefore, any given index can only be in at most one of these rings, and since they are correct concurrent MPMC index rings, only one thread possesses said index.

Enqueue

The enqueue method works by first finding an available slot. To do this, an index is dequeued from the free queue. We can then access the backing buffer at this index, with two guarantees:

  1. Since any index can only be in at most one of the queues, and only one thread can "possess" an index at any point in time, we have a unique index into the array and thus can safely take mutable access to that slot.
  2. Since any index coming from the free queue is between 0 and n (0, 1, 2, ..., n-1).

Using this, we access the slot, set the value, and then we insert the index into the allocation array, indicating we are done.

Dequeue

The dequeue method works similarly and off the same guarantees, except in reverse. An index is dequeued from the allocation ring, we remove it from it's slot using our unique access, and then we return the index to the free ring so it can be used.

Unbounded Queue

This queue is described in the ACM paper as the LCSQ queue. It is a more 'classical' implementation of a lock-free queue (and thus slower), but composed of bounded queues. Since adding and removing the queues themselves is a relatively rare operation, the cost is dominated by the operations on the bounded queue.

Testing

Loom

The loom tests are not exhaustive and should be improved in the future, since there are loops the state space can explode fast, and thus at times loom cannot run the full set of tests we would like.

$ LOOM_MAX_PREEMPTIONS=3 RUSTFLAGS="--cfg loom" cargo test loom_ --release

Fuzzing

There are a whole suite of fuzz tests for finding unwanted behaviour, memory leaks, etc.

Running Fuzz Tests

$ RUSTFLAGS="--cfg fuzzing" cargo +nightly fuzz run initfull

Tests

  • scq_grind (fuzz/fuzz_targets/scq_grind.rs) performs an arbitrary sequence of operations on an SCQ queue and checks that it lines up with the VecDeque implementation from std. This checks for correctness as a queue, ensuring that order is preserved correctly.
  • lscq_grind (fuzz/fuzz_targets/lscq_grind.rs) performs an arbitrary sequence of operations on an SCQ queue and checks that it lines up with the VecDeque implementation from std. This checks for correctness as a queue, ensuring that order is preserved correctly.
  • scq_grind_rt (fuzz/fuzz_target/scq_grind_rt.rs) performs an arbitrary sequence of operations on an SCQ queue on multiple threads. The idea is to find an error or a panic.
  • lscq_grind_rt (fuzz/fuzz_target/scq_grind_rt.rs) performs an arbitrary sequence of operations on an unbounded queue on multiple threads. The idea is to find an error or a panic.
  • const_grind (fuzz/fuzz_target/const_grind.rs) performs an arbitrary sequence of operations on a constant queues. It compares it with the VecDeque operation in std to verify correctness.

Miri

$ rustup +nightly component add miri
$ cargo +nightly miri test

About

A lock-free queue for asynchronous & synchronous code.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages