Skip to content

Commit

Permalink
Add the MPMC Queue data structure
Browse files Browse the repository at this point in the history
  • Loading branch information
DNedic committed May 27, 2023
1 parent 3435740 commit 69db9e7
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Lock-free data structures are data structures that are thread and interrupt safe
These data structures are more performant and should generally be used whenever there is only one thread/interrupt pushing data and another one retrieving it.

### Multi-producer multi-consumer data structures
There are no multi producer multi consumer data structures implemented currently.
* [Queue](docs/mpmc/queue.md) - Best for single element operations, extremely fast, simple API consisting of only 2 methods.

These data structures are more general, supporting multiple producers and consumers at the same time, however they have storage and performance overhead compared to single producer single consumer data structures.

Expand Down
40 changes: 40 additions & 0 deletions docs/mpmc/queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Queue

## When to use the Queue
The Queue is the simplest data structure in the library, and it should be used when single element operations are dominant. It has the simplest API and lowest overhead per operation.

> Note: At the moment, the Queue is only meant to be used for [trivial](https://en.cppreference.com/w/cpp/language/classes#Trivial_class) types.
## How to use
Shown here is an example of typical use:
* Initialization
```cpp
#include "lockfree.hpp"
// --snip--
lockfree::mpmc::Queue<uint32_t, 128U> queue_jobs;
```

* Producer threads
```cpp
Job job = connection.GetWorkItems();
bool write_success = queue_jobs.Push(job);
```

* Consumer threads/interrupts
```cpp
Job job;
bool read_success = queue_jobs.Pop(job);

if (read_success) {
worker.ProcessJob(read);
}
```

There is also a `std::optional` API for the `Pop` method:
```c
auto job = queue_jobs.PopOptional();

if (job) {
worker.ProcessJob(read);
}
```
2 changes: 2 additions & 0 deletions lockfree/lockfree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@
#include "spsc/priority_queue.hpp"
#include "spsc/queue.hpp"
#include "spsc/ring_buf.hpp"

#include "mpmc/queue.hpp"
120 changes: 120 additions & 0 deletions lockfree/mpmc/queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**************************************************************
* @file queue.hpp
* @brief A queue implementation written in standard c++11
* suitable for both low-end microcontrollers all the way
* to HPC machines. Lock-free for all scenarios.
* @version 1.1.0
* @date 19. May 2023
* @author Djordje Nedic
**************************************************************/

/**************************************************************
* Copyright (c) 2023 Djordje Nedic
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to
* whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall
* be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
* KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* This file is part of lockfree
*
* Author: Djordje Nedic <nedic.djordje2@gmail.com>
* Version: v1.1.0
**************************************************************/

/************************** INCLUDE ***************************/
#ifndef LOCKFREE_MPMC_QUEUE_HPP
#define LOCKFREE_MPMC_QUEUE_HPP

#include <atomic>
#include <cstddef>
#include <type_traits>

#if __cplusplus >= 201703L
#include <optional>
#endif

namespace lockfree {
namespace mpmc {
/*************************** TYPES ****************************/

template <typename T, size_t size> class Queue {
static_assert(std::is_trivial<T>::value, "The type T must be trivial");
static_assert(size > 2, "Buffer size must be bigger than 2");

/********************** PUBLIC METHODS ************************/
public:
Queue();

/**
* @brief Adds an element into the queue.
* @param[in] element
* @retval Operation success
*/
bool Push(const T &element);

/**
* @brief Removes an element from the queue.
* @param[out] element
* @retval Operation success
*/
bool Pop(T &element);

#if __cplusplus >= 201703L
/**
* @brief Removes an element from the queue.
* @retval Either the element or nothing
*/
std::optional<T> PopOptional();
#endif

/*********************** PRIVATE TYPES ************************/
private:
struct Slot {
T val;
std::atomic_size_t pop_count;
std::atomic_size_t push_count;

Slot() : pop_count(0U), push_count(0U) {}
};

/********************** PRIVATE MEMBERS ***********************/
private:
Slot _data[size]; /**< Data array */
#if LOCKFREE_CACHE_COHERENT
alignas(LOCKFREE_CACHELINE_LENGTH)
std::atomic_size_t _r_count; /**< Read monotonic counter */
alignas(LOCKFREE_CACHELINE_LENGTH)
std::atomic_size_t _w_count; /**< Write monotonic counter */
#else
std::atomic_size_t _r_count; /**< Read monotonic counter */
std::atomic_size_t _w_count; /**< Write monotonic counter */
#endif
};

/************************** INCLUDE ***************************/

/* Include the implementation */
#include "queue_impl.hpp"

} /* namespace mpmc */
} /* namespace lockfree */

#endif /* LOCKFREE_MPMC_QUEUE_HPP */
126 changes: 126 additions & 0 deletions lockfree/mpmc/queue_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**************************************************************
* @file queue_impl.hpp
* @brief A queue implementation written in standard c++11
* suitable for both low-end microcontrollers all the way
* to HPC machines. Lock-free for all scenarios.
* @version 1.1.0
* @date 19. May 2023
* @author Djordje Nedic
**************************************************************/

/**************************************************************
* Copyright (c) 2023 Djordje Nedic
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to
* whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall
* be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
* KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* This file is part of lockfree
*
* Author: Djordje Nedic <nedic.djordje2@gmail.com>
* Version: v1.1.0
**************************************************************/

/********************** PUBLIC METHODS ************************/

template <typename T, size_t size>
Queue<T, size>::Queue() : _r_count(0U), _w_count(0U) {}

template <typename T, size_t size> bool Queue<T, size>::Push(const T &element) {
size_t w_count = _w_count.load(std::memory_order_relaxed);

while (true) {
const size_t index = w_count % size;

const size_t push_count =
_data[index].push_count.load(std::memory_order_acquire);
const size_t pop_count =
_data[index].pop_count.load(std::memory_order_relaxed);

if (push_count > pop_count) {
return false;
}

const size_t revolution_count = w_count / size;
const bool our_turn = revolution_count == push_count;

if (our_turn) {
/* Try to acquire the slot by bumping the monotonic write counter */
if (_w_count.compare_exchange_weak(w_count, w_count + 1U,
std::memory_order_relaxed)) {
_data[index].val = element;
_data[index].push_count.store(push_count + 1U,
std::memory_order_release);
return true;
}
} else {
w_count = _w_count.load(std::memory_order_relaxed);
}
}
}

template <typename T, size_t size> bool Queue<T, size>::Pop(T &element) {
size_t r_count = _r_count.load(std::memory_order_relaxed);

while (true) {
const size_t index = r_count % size;

const size_t pop_count =
_data[index].pop_count.load(std::memory_order_acquire);
const size_t push_count =
_data[index].push_count.load(std::memory_order_relaxed);

if (pop_count == push_count) {
return false;
}

const size_t revolution_count = r_count / size;
const bool our_turn = revolution_count == pop_count;

if (our_turn) {
/* Try to acquire the slot by bumping the monotonic read counter. */
if (_r_count.compare_exchange_weak(r_count, r_count + 1U,
std::memory_order_relaxed)) {
element = _data[index].val;
_data[index].pop_count.store(pop_count + 1U,
std::memory_order_release);
return true;
}
} else {
r_count = _r_count.load(std::memory_order_relaxed);
}
}
}

/********************* std::optional API **********************/
#if __cplusplus >= 201703L
template <typename T, size_t size>
std::optional<T> Queue<T, size>::PopOptional() {
T element;
bool result = Pop(element);

if (result) {
return element;
} else {
return {};
}
}
#endif
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ FetchContent_Declare(
FetchContent_MakeAvailable(Catch2)

add_executable(tests
queue.cpp
spsc_queue.cpp
ring_buf.cpp
bipartite_buf.cpp
priority_queue.cpp
mpmc_queue.cpp
)

# Required in order to test the std::span API as well
Expand Down
Loading

0 comments on commit 69db9e7

Please sign in to comment.