Skip to content

Commit c48dd19

Browse files
authored
Merge pull request SynaptiveMedical#3 from SynaptiveMedical/synaptive/dev/sever/COM-464_DeepSleepWithRouser
COM-464 Deep Sleep with Rouser
2 parents 3fb0298 + 3411673 commit c48dd19

File tree

7 files changed

+951
-154
lines changed

7 files changed

+951
-154
lines changed

include/thread_pool/fixed_function.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ template <typename SIGNATURE, size_t STORAGE_SIZE = 128>
2121
class FixedFunction;
2222

2323
template <typename R, typename... ARGS, size_t STORAGE_SIZE>
24-
class FixedFunction<R(ARGS...), STORAGE_SIZE>
24+
class FixedFunction<R(ARGS...), STORAGE_SIZE> final
2525
{
2626

2727
typedef R (*func_ptr_type)(ARGS...);

include/thread_pool/mpmc_bounded_queue.hpp

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ namespace tp
5252
* http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5353
*/
5454
template <typename T>
55-
class MPMCBoundedQueue
55+
class MPMCBoundedQueue final
5656
{
5757
static_assert(
5858
std::is_move_constructible<T>::value, "Should be of movable type");
@@ -65,6 +65,16 @@ class MPMCBoundedQueue
6565
*/
6666
explicit MPMCBoundedQueue(size_t size);
6767

68+
/**
69+
* @brief Copy ctor implementation.
70+
*/
71+
MPMCBoundedQueue(MPMCBoundedQueue const&) = delete;
72+
73+
/**
74+
* @brief Copy assignment implementation.
75+
*/
76+
MPMCBoundedQueue& operator=(MPMCBoundedQueue const& rhs) = delete;
77+
6878
/**
6979
* @brief Move ctor implementation.
7080
*/
@@ -76,18 +86,23 @@ class MPMCBoundedQueue
7686
MPMCBoundedQueue& operator=(MPMCBoundedQueue&& rhs) noexcept;
7787

7888
/**
79-
* @brief push Push data to queue.
80-
* @param data Data to be pushed.
81-
* @return true on success.
82-
*/
89+
* @brief MPMCBoundedQueue destructor.
90+
*/
91+
~MPMCBoundedQueue() = default;
92+
93+
/**
94+
* @brief push Push data to queue.
95+
* @param data Data to be pushed.
96+
* @return true on success.
97+
*/
8398
template <typename U>
8499
bool push(U&& data);
85100

86101
/**
87-
* @brief pop Pop data from queue.
88-
* @param data Place to store popped data.
89-
* @return true on sucess.
90-
*/
102+
* @brief pop Pop data from queue.
103+
* @param data Place to store popped data.
104+
* @return true on sucess.
105+
*/
91106
bool pop(T& data);
92107

93108
private:
@@ -115,7 +130,6 @@ class MPMCBoundedQueue
115130
}
116131
};
117132

118-
private:
119133
typedef char Cacheline[64];
120134

121135
Cacheline pad0;
@@ -151,7 +165,7 @@ inline MPMCBoundedQueue<T>::MPMCBoundedQueue(size_t size)
151165
template <typename T>
152166
inline MPMCBoundedQueue<T>::MPMCBoundedQueue(MPMCBoundedQueue&& rhs) noexcept
153167
{
154-
*this = rhs;
168+
*this = std::move(rhs);
155169
}
156170

157171
template <typename T>
@@ -160,10 +174,11 @@ inline MPMCBoundedQueue<T>& MPMCBoundedQueue<T>::operator=(MPMCBoundedQueue&& rh
160174
if (this != &rhs)
161175
{
162176
m_buffer = std::move(rhs.m_buffer);
163-
m_buffer_mask = std::move(rhs.m_buffer_mask);
177+
m_buffer_mask = rhs.m_buffer_mask;
164178
m_enqueue_pos = rhs.m_enqueue_pos.load();
165179
m_dequeue_pos = rhs.m_dequeue_pos.load();
166180
}
181+
167182
return *this;
168183
}
169184

@@ -172,27 +187,24 @@ template <typename U>
172187
inline bool MPMCBoundedQueue<T>::push(U&& data)
173188
{
174189
Cell* cell;
175-
size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);
190+
size_t pos = m_enqueue_pos.load(std::memory_order_acquire);
176191
for(;;)
177192
{
178193
cell = &m_buffer[pos & m_buffer_mask];
179194
size_t seq = cell->sequence.load(std::memory_order_acquire);
180195
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
181196
if(dif == 0)
182197
{
183-
if(m_enqueue_pos.compare_exchange_weak(
184-
pos, pos + 1, std::memory_order_relaxed))
185-
{
198+
if(m_enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel))
186199
break;
187-
}
188200
}
189201
else if(dif < 0)
190202
{
191203
return false;
192204
}
193205
else
194206
{
195-
pos = m_enqueue_pos.load(std::memory_order_relaxed);
207+
pos = m_enqueue_pos.load(std::memory_order_acquire);
196208
}
197209
}
198210

@@ -207,34 +219,30 @@ template <typename T>
207219
inline bool MPMCBoundedQueue<T>::pop(T& data)
208220
{
209221
Cell* cell;
210-
size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
222+
size_t pos = m_dequeue_pos.load(std::memory_order_acquire);
211223
for(;;)
212224
{
213225
cell = &m_buffer[pos & m_buffer_mask];
214226
size_t seq = cell->sequence.load(std::memory_order_acquire);
215227
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
216228
if(dif == 0)
217229
{
218-
if(m_dequeue_pos.compare_exchange_weak(
219-
pos, pos + 1, std::memory_order_relaxed))
220-
{
230+
if(m_dequeue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel))
221231
break;
222-
}
223232
}
224233
else if(dif < 0)
225234
{
226235
return false;
227236
}
228237
else
229238
{
230-
pos = m_dequeue_pos.load(std::memory_order_relaxed);
239+
pos = m_dequeue_pos.load(std::memory_order_acquire);
231240
}
232241
}
233242

234243
data = std::move(cell->data);
235244

236-
cell->sequence.store(
237-
pos + m_buffer_mask + 1, std::memory_order_release);
245+
cell->sequence.store(pos + m_buffer_mask + 1, std::memory_order_release);
238246

239247
return true;
240248
}

include/thread_pool/rouser.hpp

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#pragma once
2+
3+
#include <thread_pool/slotted_bag.hpp>
4+
#include <thread_pool/thread_pool_options.hpp>
5+
#include <thread_pool/worker.hpp>
6+
7+
#include <atomic>
8+
#include <thread>
9+
#include <limits>
10+
#include <mutex>
11+
#include <chrono>
12+
#include <condition_variable>
13+
14+
namespace tp
15+
{
16+
17+
/**
18+
* @brief Rouser is a specialized worker that periodically wakes other workers.
19+
* @detail This serves two purposes:
20+
* The first is that it emplaces an upper bound on the processing time of tasks in the thread pool, since
21+
* it is otherwise possible for the thread pool to enter a state where all threads are asleep, and tasks exist
22+
* in worker queues. The second is that it increases the likelihood of at least one worker busy-waiting at any
23+
* point in time, which speeds up task processing response time.
24+
*/
25+
class Rouser final
26+
{
27+
public:
28+
/**
29+
* @brief Worker Constructor.
30+
*/
31+
Rouser(std::chrono::microseconds rouse_period);
32+
33+
/**
34+
* @brief Copy ctor implementation.
35+
*/
36+
Rouser(Rouser const&) = delete;
37+
38+
/**
39+
* @brief Copy assignment implementation.
40+
*/
41+
Rouser& operator=(Rouser const& rhs) = delete;
42+
43+
/**
44+
* @brief Move ctor implementation.
45+
* @note Be very careful when invoking this while the thread pool is
46+
* active, or in an otherwise undefined state.
47+
*/
48+
Rouser(Rouser&& rhs) noexcept;
49+
50+
/**
51+
* @brief Move assignment implementaion.
52+
* @note Be very careful when invoking this while the thread pool is
53+
* active, or in an otherwise undefined state.
54+
*/
55+
Rouser& operator=(Rouser&& rhs) noexcept;
56+
57+
/**
58+
* @brief Destructor implementation.
59+
*/
60+
~Rouser();
61+
62+
/**
63+
* @brief start Create the executing thread and start tasks execution.
64+
* @param workers A pointer to the vector containing sibling workers for performing round robin work stealing.
65+
* @param idle_workers A pointer to the slotted bag containing all idle workers.
66+
* @param num_busy_waiters A pointer to the atomic busy waiter counter.
67+
* @note The parameters passed into this function generally relate to the global thread pool state.
68+
*/
69+
template <typename Task, template<typename> class Queue>
70+
void start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
71+
72+
/**
73+
* @brief stop Stop all worker's thread and stealing activity.
74+
* Waits until the executing thread becomes finished.
75+
*/
76+
void stop();
77+
78+
private:
79+
80+
/**
81+
* @brief threadFunc Executing thread function.
82+
* @param workers A pointer to the vector containing sibling workers for performing round robin work stealing.
83+
* @param idle_workers A pointer to the slotted bag containing all idle workers.
84+
* @param num_busy_waiters A pointer to the atomic busy waiter counter.
85+
*/
86+
template <typename Task, template<typename> class Queue>
87+
void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
88+
89+
std::atomic<bool> m_running_flag;
90+
std::atomic<bool> m_started_flag;
91+
std::thread m_thread;
92+
std::chrono::microseconds m_rouse_period;
93+
};
94+
95+
inline Rouser::Rouser(std::chrono::microseconds rouse_period)
96+
: m_running_flag(false)
97+
, m_started_flag(false)
98+
, m_rouse_period(std::move(rouse_period))
99+
{
100+
}
101+
102+
inline Rouser::Rouser(Rouser&& rhs) noexcept
103+
{
104+
*this = std::move(rhs);
105+
}
106+
107+
inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept
108+
{
109+
if (this != &rhs)
110+
{
111+
m_running_flag = rhs.m_running_flag.load();
112+
m_started_flag = rhs.m_started_flag.load();
113+
m_thread = std::move(rhs.m_thread);
114+
m_rouse_period = std::move(rhs.m_rouse_period);
115+
}
116+
117+
return *this;
118+
}
119+
120+
inline Rouser::~Rouser()
121+
{
122+
stop();
123+
}
124+
125+
template <typename Task, template<typename> class Queue>
126+
inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
127+
{
128+
if (m_started_flag.exchange(true, std::memory_order_acq_rel))
129+
throw std::runtime_error("The Rouser has already been started.");
130+
131+
m_running_flag.store(true, std::memory_order_release);
132+
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters));
133+
}
134+
135+
inline void Rouser::stop()
136+
{
137+
if (m_running_flag.exchange(false, std::memory_order_acq_rel))
138+
m_thread.join();
139+
}
140+
141+
142+
template <typename Task, template<typename> class Queue>
143+
inline void Rouser::threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
144+
{
145+
while (m_running_flag.load(std::memory_order_acquire))
146+
{
147+
// Try to wake up a thread if there are no current busy waiters.
148+
if (num_busy_waiters.load(std::memory_order_acquire) == 0)
149+
{
150+
auto result = idle_workers.tryEmptyAny();
151+
if (result.first)
152+
workers[result.second]->wake();
153+
}
154+
155+
// Sleep.
156+
std::this_thread::sleep_for(m_rouse_period);
157+
}
158+
}
159+
160+
}

0 commit comments

Comments
 (0)