Skip to content

Commit 38a3d89

Browse files
authored
Merge pull request SynaptiveMedical#4 from SynaptiveMedical/synaptive/dev/sever/COM-466_OverflowPolicy
Overflow Policy
2 parents c48dd19 + 15d0afd commit 38a3d89

File tree

4 files changed

+192
-91
lines changed

4 files changed

+192
-91
lines changed

include/thread_pool/rouser.hpp

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ namespace tp
2424
*/
2525
class Rouser final
2626
{
27+
/**
28+
* @brief State An Enum representing the Rouser thread state.
29+
*/
30+
enum class State
31+
{
32+
Initialized,
33+
Running,
34+
Stopped
35+
};
36+
2737
public:
2838
/**
2939
* @brief Worker Constructor.
@@ -61,9 +71,9 @@ class Rouser final
6171

6272
/**
6373
* @brief start Create the executing thread and start tasks execution.
64-
* @param workers A pointer to the vector containing sibling workers for performing round robin work stealing.
65-
* @param idle_workers A pointer to the slotted bag containing all idle workers.
66-
* @param num_busy_waiters A pointer to the atomic busy waiter counter.
74+
* @param workers A reference to the vector containing sibling workers for performing round robin work stealing.
75+
* @param idle_workers A reference to the slotted bag containing all idle workers.
76+
* @param num_busy_waiters A reference to the atomic busy waiter counter.
6777
* @note The parameters passed into this function generally relate to the global thread pool state.
6878
*/
6979
template <typename Task, template<typename> class Queue>
@@ -72,29 +82,29 @@ class Rouser final
7282
/**
7383
* @brief stop Stop all worker's thread and stealing activity.
7484
* Waits until the executing thread becomes finished.
85+
* @note Stop may only be called once start() has been invoked.
86+
* Repeated successful calls to stop() will be no-ops after the first.
7587
*/
7688
void stop();
7789

7890
private:
7991

8092
/**
8193
* @brief threadFunc Executing thread function.
82-
* @param workers A pointer to the vector containing sibling workers for performing round robin work stealing.
83-
* @param idle_workers A pointer to the slotted bag containing all idle workers.
84-
* @param num_busy_waiters A pointer to the atomic busy waiter counter.
94+
* @param workers A reference to the vector containing sibling workers for performing round robin work stealing.
95+
* @param idle_workers A reference to the slotted bag containing all idle workers.
96+
* @param num_busy_waiters A reference to the atomic busy waiter counter.
8597
*/
8698
template <typename Task, template<typename> class Queue>
8799
void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
88100

89-
std::atomic<bool> m_running_flag;
90-
std::atomic<bool> m_started_flag;
101+
std::atomic<State> m_state;
91102
std::thread m_thread;
92103
std::chrono::microseconds m_rouse_period;
93104
};
94105

95106
inline Rouser::Rouser(std::chrono::microseconds rouse_period)
96-
: m_running_flag(false)
97-
, m_started_flag(false)
107+
: m_state(State::Initialized)
98108
, m_rouse_period(std::move(rouse_period))
99109
{
100110
}
@@ -108,8 +118,7 @@ inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept
108118
{
109119
if (this != &rhs)
110120
{
111-
m_running_flag = rhs.m_running_flag.load();
112-
m_started_flag = rhs.m_started_flag.load();
121+
m_state = rhs.m_state.load();
113122
m_thread = std::move(rhs.m_thread);
114123
m_rouse_period = std::move(rhs.m_rouse_period);
115124
}
@@ -125,24 +134,26 @@ inline Rouser::~Rouser()
125134
template <typename Task, template<typename> class Queue>
126135
inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
127136
{
128-
if (m_started_flag.exchange(true, std::memory_order_acq_rel))
129-
throw std::runtime_error("The Rouser has already been started.");
137+
auto expectedState = State::Initialized;
138+
if (!m_state.compare_exchange_strong(expectedState, State::Running, std::memory_order_acq_rel))
139+
throw std::runtime_error("Cannot start Rouser: it has previously been started or stopped.");
130140

131-
m_running_flag.store(true, std::memory_order_release);
132141
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters));
133142
}
134143

135144
inline void Rouser::stop()
136145
{
137-
if (m_running_flag.exchange(false, std::memory_order_acq_rel))
146+
auto expectedState = State::Running;
147+
if (m_state.compare_exchange_strong(expectedState, State::Stopped, std::memory_order_acq_rel))
138148
m_thread.join();
149+
else if (expectedState == State::Initialized)
150+
throw std::runtime_error("Cannot stop Rouser: stop may only be calld after the Rouser has been started.");
139151
}
140152

141-
142153
template <typename Task, template<typename> class Queue>
143154
inline void Rouser::threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
144155
{
145-
while (m_running_flag.load(std::memory_order_acquire))
156+
while (m_state.load(std::memory_order_acquire) == State::Running)
146157
{
147158
// Try to wake up a thread if there are no current busy waiters.
148159
if (num_busy_waiters.load(std::memory_order_acquire) == 0)

include/thread_pool/thread_pool.hpp

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,27 @@ class GenericThreadPool final
9292

9393
private:
9494
/**
95-
* @brief getWorker Obtain a reference to the local thread's associated worker,
96-
* otherwise return the next worker in the round robin.
95+
* @brief post Try post job to thread pool.
96+
* @param handler Handler to be called from thread pool worker. It has
97+
* to be callable as 'handler()'.
98+
* @param failedWakeupRetryCap The number of retries to perform when worker
99+
* wakeup fails.
100+
* @return 'true' on success, false otherwise.
101+
* @note All exceptions thrown by handler will be suppressed.
97102
*/
98-
Worker<Task, Queue>& getWorker();
103+
template <typename Handler>
104+
bool tryPostImpl(Handler&& handler, size_t failedWakeupRetryCap);
105+
106+
/**
107+
* @brief getWorker Obtain the id of the local thread's associated worker,
108+
* otherwise return the next worker id in the round robin.
109+
*/
110+
size_t getWorkerId();
99111

100112
SlottedBag<Queue> m_idle_workers;
101113
WorkerVector m_workers;
102114
Rouser m_rouser;
115+
size_t m_failed_wakeup_retry_cap;
103116
std::atomic<size_t> m_next_worker;
104117
std::atomic<size_t> m_num_busy_waiters;
105118
};
@@ -112,6 +125,7 @@ inline GenericThreadPool<Task, Queue>::GenericThreadPool(ThreadPoolOptions optio
112125
: m_idle_workers(options.threadCount())
113126
, m_workers(options.threadCount())
114127
, m_rouser(options.rousePeriod())
128+
, m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
115129
, m_next_worker(0)
116130
, m_num_busy_waiters(0)
117131
{
@@ -140,6 +154,7 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
140154
m_idle_workers = std::move(rhs.m_idle_workers);
141155
m_workers = std::move(rhs.m_workers);
142156
m_rouser = std::move(rhs.m_rouser);
157+
m_failed_wakeup_retry_cap = rhs.m_failed_wakeup_retry_cap;
143158
m_next_worker = rhs.m_next_worker.load();
144159
m_num_busy_waiters = rhs.m_num_busy_waiters.load();
145160
}
@@ -159,6 +174,22 @@ inline GenericThreadPool<Task, Queue>::~GenericThreadPool()
159174
template <typename Task, template<typename> class Queue>
160175
template <typename Handler>
161176
inline bool GenericThreadPool<Task, Queue>::tryPost(Handler&& handler)
177+
{
178+
return tryPostImpl(std::forward<Handler>(handler), m_failed_wakeup_retry_cap);
179+
}
180+
181+
template <typename Task, template<typename> class Queue>
182+
template <typename Handler>
183+
inline void GenericThreadPool<Task, Queue>::post(Handler&& handler)
184+
{
185+
const auto ok = tryPost(std::forward<Handler>(handler));
186+
if (!ok)
187+
throw std::runtime_error("Thread pool queue is full.");
188+
}
189+
190+
template <typename Task, template<typename> class Queue>
191+
template <typename Handler>
192+
inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_t failedWakeupRetryCap)
162193
{
163194
// This section of the code increases the probability that our thread pool
164195
// is fully utilized (num active workers = argmin(num tasks, num total workers)).
@@ -169,56 +200,59 @@ inline bool GenericThreadPool<Task, Queue>::tryPost(Handler&& handler)
169200
auto result = m_idle_workers.tryEmptyAny();
170201
if (result.first)
171202
{
172-
if (m_workers[result.second]->tryPost(std::forward<Handler>(handler)))
173-
{
174-
m_workers[result.second]->wake();
175-
return true;
176-
}
203+
auto success = m_workers[result.second]->tryPost(std::forward<Handler>(handler));
204+
m_workers[result.second]->wake();
177205

178-
// If post is unsuccessful, we need to re-add the worker to the idle worker bag.
179-
m_idle_workers.fill(result.second);
180-
return false;
206+
// The above post will only fail if the idle worker's queue is full, which is an extremely
207+
// low probability scenario. In that case, we wake the worker and let it get to work on
208+
// processing the items in its queue. We then re-try posting our current task.
209+
if (success)
210+
return true;
211+
else if (failedWakeupRetryCap > 0)
212+
return tryPostImpl(std::forward<Handler>(handler), failedWakeupRetryCap - 1);
181213
}
182214
}
183215

184216
// No idle threads. Our threads are either active or busy waiting
185217
// Either way, submit the work item in a round robin fashion.
186-
if (!getWorker().tryPost(std::forward<Handler>(handler)))
187-
return false; // Worker's task queue is full.
188-
189-
// The following section increases the probability that tasks will not be dropped.
190-
// This is a soft constraint, the strict task dropping bound is covered by the Rouser
191-
// thread's functionality. This code experimentally lowers task response time under
192-
// low thread pool utilization without incurring significant performance penalties at
193-
// high thread pool utilization.
194-
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
218+
auto id = getWorkerId();
219+
auto initialWorkerId = id;
220+
do
195221
{
196-
auto result = m_idle_workers.tryEmptyAny();
197-
if (result.first)
198-
m_workers[result.second]->wake();
199-
}
222+
if (m_workers[id]->tryPost(std::forward<Handler>(handler)))
223+
{
224+
// The following section increases the probability that tasks will not be dropped.
225+
// This is a soft constraint, the strict task dropping bound is covered by the Rouser
226+
// thread's functionality. This code experimentally lowers task response time under
227+
// low thread pool utilization without incurring significant performance penalties at
228+
// high thread pool utilization.
229+
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
230+
{
231+
auto result = m_idle_workers.tryEmptyAny();
232+
if (result.first)
233+
m_workers[result.second]->wake();
234+
}
200235

201-
return true;
202-
}
236+
return true;
237+
}
203238

204-
template <typename Task, template<typename> class Queue>
205-
template <typename Handler>
206-
inline void GenericThreadPool<Task, Queue>::post(Handler&& handler)
207-
{
208-
const auto ok = tryPost(std::forward<Handler>(handler));
209-
if (!ok)
210-
throw std::runtime_error("Thread pool queue is full.");
239+
++id %= m_workers.size();
240+
} while (id != initialWorkerId);
241+
242+
// All Queues in our thread pool are full during one whole iteration.
243+
// We consider this a posting failure case.
244+
return false;
211245
}
212246

213247
template <typename Task, template<typename> class Queue>
214-
inline Worker<Task, Queue>& GenericThreadPool<Task, Queue>::getWorker()
248+
inline size_t GenericThreadPool<Task, Queue>::getWorkerId()
215249
{
216250
auto id = Worker<Task, Queue>::getWorkerIdForCurrentThread();
217251

218252
if (id > m_workers.size())
219253
id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size();
220254

221-
return *m_workers[id];
255+
return id;
222256
}
223257

224258
}

include/thread_pool/thread_pool_options.hpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class ThreadPoolOptions final
7272
*/
7373
ThreadPoolOptions(size_t thread_count = defaultThreadCount()
7474
, size_t queue_size = defaultQueueSize()
75+
, size_t failed_wakeup_retry_cap = defaultFailedWakeupRetryCap()
7576
, BusyWaitOptions busy_wait_options = defaultBusyWaitOptions()
7677
, std::chrono::microseconds rouse_period = defaultRousePeriod());
7778

@@ -87,6 +88,11 @@ class ThreadPoolOptions final
8788
*/
8889
void setQueueSize(size_t size);
8990

91+
/**
92+
* @brief setFailedWakeupRetryCap Set retry cap when a worker wakeup fails.
93+
* @param cap The retry cap.
94+
*/
95+
void setFailedWakeupRetryCap(size_t cap);
9096

9197
/**
9298
* @brief setBusyWaitOptions Set the parameters relating to worker busy waiting behaviour.
@@ -111,6 +117,11 @@ class ThreadPoolOptions final
111117
*/
112118
size_t queueSize() const;
113119

120+
/**
121+
* @brief failedWakeupRetryCap Return the retry cap when a worker wakeup fails.
122+
*/
123+
size_t failedWakeupRetryCap() const;
124+
114125
/**
115126
* @brief busyWaitOptions Return a reference to the busy wait options.
116127
*/
@@ -131,6 +142,11 @@ class ThreadPoolOptions final
131142
*/
132143
static size_t defaultQueueSize();
133144

145+
/**
146+
* @brief defaultFailedWakeupRetryCap Obtain the default retry cap when a worker wakeup fails.
147+
*/
148+
static size_t defaultFailedWakeupRetryCap();
149+
134150
/**
135151
* @brief defaultBusyWaitOptions Obtain the default busy wait options.
136152
*/
@@ -145,6 +161,7 @@ class ThreadPoolOptions final
145161
private:
146162
size_t m_thread_count;
147163
size_t m_queue_size;
164+
size_t m_failed_wakeup_retry_cap;
148165
BusyWaitOptions m_busy_wait_options;
149166
std::chrono::microseconds m_rouse_period;
150167
};
@@ -189,9 +206,10 @@ inline ThreadPoolOptions::BusyWaitOptions::IterationFunction ThreadPoolOptions::
189206
return [](size_t i) { return std::chrono::microseconds(static_cast<size_t>(pow(2, i))*1000); };
190207
}
191208

192-
inline ThreadPoolOptions::ThreadPoolOptions(size_t thread_count, size_t queue_size, BusyWaitOptions busy_wait_options, std::chrono::microseconds rouse_period)
209+
inline ThreadPoolOptions::ThreadPoolOptions(size_t thread_count, size_t queue_size, size_t failed_wakeup_retry_cap, BusyWaitOptions busy_wait_options, std::chrono::microseconds rouse_period)
193210
: m_thread_count(thread_count)
194211
, m_queue_size(queue_size)
212+
, m_failed_wakeup_retry_cap(failed_wakeup_retry_cap)
195213
, m_busy_wait_options(std::move(busy_wait_options))
196214
, m_rouse_period(std::move(rouse_period))
197215
{
@@ -207,6 +225,11 @@ inline void ThreadPoolOptions::setQueueSize(size_t size)
207225
m_queue_size = std::max<size_t>(1u, size);
208226
}
209227

228+
inline void ThreadPoolOptions::setFailedWakeupRetryCap(size_t cap)
229+
{
230+
m_failed_wakeup_retry_cap = std::max<size_t>(1u, cap);
231+
}
232+
210233
inline void ThreadPoolOptions::setBusyWaitOptions(BusyWaitOptions busy_wait_options)
211234
{
212235
m_busy_wait_options = std::move(busy_wait_options);
@@ -227,6 +250,11 @@ inline size_t ThreadPoolOptions::queueSize() const
227250
return m_queue_size;
228251
}
229252

253+
inline size_t ThreadPoolOptions::failedWakeupRetryCap() const
254+
{
255+
return m_failed_wakeup_retry_cap;
256+
}
257+
230258
inline ThreadPoolOptions::BusyWaitOptions const& ThreadPoolOptions::busyWaitOptions() const
231259
{
232260
return m_busy_wait_options;
@@ -248,6 +276,11 @@ inline size_t ThreadPoolOptions::defaultQueueSize()
248276
return 1024;
249277
}
250278

279+
inline size_t ThreadPoolOptions::defaultFailedWakeupRetryCap()
280+
{
281+
return 5;
282+
}
283+
251284
inline ThreadPoolOptions::BusyWaitOptions ThreadPoolOptions::defaultBusyWaitOptions()
252285
{
253286
return ThreadPoolOptions::BusyWaitOptions();

0 commit comments

Comments
 (0)