Skip to content

Commit

Permalink
Add the MPMC PriorityQueue data structure
Browse files Browse the repository at this point in the history
  • Loading branch information
DNedic committed May 27, 2023
1 parent 69db9e7 commit d6273e0
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ These data structures are more performant and should generally be used whenever

### Multi-producer multi-consumer data structures
* [Queue](docs/mpmc/queue.md) - Best for single element operations, extremely fast, simple API consisting of only 2 methods.
* [Priority Queue](docs/spsc/priority_queue.md) - A Variation of the queue with the ability to provide different priorities for elements, very useful for things like signals, events and communication packets.

These data structures are more general, supporting multiple producers and consumers at the same time, however they have storage and performance overhead compared to single producer single consumer data structures.

Expand Down
46 changes: 46 additions & 0 deletions docs/mpmc/priority_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Priority Queue

## When to use the Priority Queue
The Priority Queue should be used when there are distinct priorities between elements to be enqueued, for instance different urgency signals between threads or packets of different priorities.

> Note: At the moment, the Queue is only meant to be used for [trivial](https://en.cppreference.com/w/cpp/language/classes#Trivial_class) types.
## How to use
Shown here is an example of typical use:
* Initialization
```cpp
#include "lockfree.hpp"
// --snip--
lockfree::mpmc::PriorityQueue<Event, 64, 3> queue_events;
```

* Producer threads/interrupts
```cpp
Event event = actor1.Run();
// --snip--
while(!queue_events.Push(event, priority)) {}
```

* Consumer threads/interrupts
```cpp
Event event_in;
bool read_success = queue_events.Pop(read);
if (read_success) {
actor2.ProcessEvent(event_in);
}
```

There is also a `std::optional` API for `Pop`:
```c
auto read = queue_events.PopOptional();

if (read) {
actor2.ProcessEvent(read);
}
```

## Performance and memory use

This implementation has `O(1)` time complexity for `Push` and `O(current_max_priority)` for `Pop` making it extremely fast.

On the other hand the memory usage is a function of `size * priority_count`, so adequately chosing the number of priorities is necessary.
1 change: 1 addition & 0 deletions lockfree/lockfree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@
#include "spsc/queue.hpp"
#include "spsc/ring_buf.hpp"

#include "mpmc/priority_queue.hpp"
#include "mpmc/queue.hpp"
106 changes: 106 additions & 0 deletions lockfree/mpmc/priority_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**************************************************************
* @file priority_queue.hpp
* @brief A priority queue implementation written in standard
* c++11 suitable for both low-end microcontrollers all the way
* to HPC machines. Lock-free for single consumer single
* producer scenarios.
* @version 1.1.0
* @date 19. May 2023
* @author Djordje Nedic
**************************************************************/

/**************************************************************
* Copyright (c) 2023 Djordje Nedic
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to
* whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall
* be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
* KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* This file is part of lockfree
*
* Author: Djordje Nedic <nedic.djordje2@gmail.com>
* Version: v1.1.0
**************************************************************/

/************************** INCLUDE ***************************/
#ifndef LOCKFREE_MPMC_PRIORITY_QUEUE_HPP
#define LOCKFREE_MPMC_PRIORITY_QUEUE_HPP

#include <atomic>
#include <cstddef>
#include <type_traits>

#if __cplusplus >= 201703L
#include <optional>
#endif

#include "queue.hpp"

namespace lockfree {
namespace mpmc {
/*************************** TYPES ****************************/

template <typename T, size_t size, size_t priority_count> class PriorityQueue {
static_assert(std::is_trivial<T>::value, "The type T must be trivial");
static_assert(size > 2, "Buffer size must be bigger than 2");

/********************** PUBLIC METHODS ************************/
public:
/**
* @brief Adds an element with a specified priority into the queue.
* Should only be called from the producer thread.
* @param[in] Element
* @param[in] Element priority
* @retval Operation success
*/
bool Push(const T &element, const size_t priority);

/**
* @brief Removes an element with the highest priority from the queue.
* Should only be called from the consumer thread.
* @param[out] Element
* @retval Operation success
*/
bool Pop(T &element);

#if __cplusplus >= 201703L
/**
* @brief Removes an element with the highest priority from the queue.
* Should only be called from the consumer thread.
* @retval Either the element or nothing if the queue is empty.
*/
std::optional<T> PopOptional();
#endif

/********************** PRIVATE MEMBERS ***********************/
private:
Queue<T, size> _subqueue[priority_count];
};

/************************** INCLUDE ***************************/

/* Include the implementation */
#include "priority_queue_impl.hpp"

} /* namespace mpmc */
} /* namespace lockfree */

#endif /* LOCKFREE_MPMC_PRIORITY_QUEUE_HPP */
83 changes: 83 additions & 0 deletions lockfree/mpmc/priority_queue_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**************************************************************
* @file priority_queue_impl.hpp
* @brief A priority queue implementation written in standard
* c++11 suitable for both low-end microcontrollers all the way
* to HPC machines. Lock-free for single consumer single
* producer scenarios.
* @version 1.1.0
* @date 19. May 2023
* @author Djordje Nedic
**************************************************************/

/**************************************************************
* Copyright (c) 2023 Djordje Nedic
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to
* whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall
* be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
* KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* This file is part of lockfree
*
* Author: Djordje Nedic <nedic.djordje2@gmail.com>
* Version: v1.1.0
**************************************************************/

/************************** INCLUDE ***************************/

#include <cassert>

/********************** PUBLIC METHODS ************************/

template <typename T, size_t size, size_t priority_count>
bool PriorityQueue<T, size, priority_count>::Push(const T &element,
const size_t priority) {
assert(priority < priority_count);

return _subqueue[priority].Push(element);
}

template <typename T, size_t size, size_t priority_count>
bool PriorityQueue<T, size, priority_count>::Pop(T &element) {

for (size_t priority = priority_count; priority-- > 0;) {
if (_subqueue[priority].Pop(element)) {
return true;
}
}

/* Could find no elements at all */
return false;
}

/********************* std::optional API **********************/
#if __cplusplus >= 201703L
template <typename T, size_t size, size_t priority_count>
std::optional<T> PriorityQueue<T, size, priority_count>::PopOptional() {
T element;
bool result = Pop(element);

if (result) {
return element;
} else {
return {};
}
}
#endif
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ add_executable(tests
spsc_queue.cpp
ring_buf.cpp
bipartite_buf.cpp
priority_queue.cpp
spsc_priority_queue.cpp
mpmc_queue.cpp
mpmc_priority_queue.cpp
)

# Required in order to test the std::span API as well
Expand Down
77 changes: 77 additions & 0 deletions tests/mpmc_priority_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <algorithm>
#include <math.h>

#include <catch2/catch_test_macros.hpp>

#include "lockfree.hpp"

TEST_CASE("Write to empty, lowest priority and read back",
"[mpmc_pq_write_empty_lowest]") {
lockfree::mpmc::PriorityQueue<int16_t, 20, 3> queue;

bool const push_success = queue.Push(-1024, 0);
REQUIRE(push_success);

int16_t read = 0;
bool const pop_success = queue.Pop(read);
REQUIRE(pop_success);
REQUIRE(read == -1024);
}

TEST_CASE("Write to empty, highest priority and read back",
"[mpmc_pq_write_empty_highest]") {
lockfree::mpmc::PriorityQueue<int16_t, 20, 3> queue;

bool const push_success = queue.Push(-1024, 2);
REQUIRE(push_success);

int16_t read = 0;
bool const pop_success = queue.Pop(read);
REQUIRE(pop_success);
REQUIRE(read == -1024);
}

TEST_CASE("Write multiple with different priority and read back ensuring "
"proper sequence",
"[mpmc_pq_write_multiple_read_multiple]") {
lockfree::mpmc::PriorityQueue<uint64_t, 10, 4> queue;

bool push_success = queue.Push(256, 2);
REQUIRE(push_success);

push_success = queue.Push(1024, 0);
REQUIRE(push_success);

push_success = queue.Push(128, 1);
REQUIRE(push_success);

push_success = queue.Push(512, 3);
REQUIRE(push_success);

uint64_t read = 0;
bool pop_success = queue.Pop(read);
REQUIRE(pop_success);
REQUIRE(read == 512);

pop_success = queue.Pop(read);
REQUIRE(pop_success);
REQUIRE(read == 256);

pop_success = queue.Pop(read);
REQUIRE(pop_success);
REQUIRE(read == 128);

pop_success = queue.Pop(read);
REQUIRE(pop_success);
REQUIRE(read == 1024);
}

TEST_CASE("Optional API", "[mpmc_pq_optional_api]") {
lockfree::mpmc::PriorityQueue<int16_t, 20, 3> queue;

bool const push_success = queue.Push(-1024, 0);
REQUIRE(push_success);

auto const read = queue.PopOptional();
REQUIRE(read == -1024);
}
File renamed without changes.

0 comments on commit d6273e0

Please sign in to comment.