Skip to content

Commit 5d0ca72

Browse files
committed
Simplifications and fixes
Replaced deque by a queue (which by default uses a deque). Simplified the task packaging as suggested by Ang3lus in issue progschj#4. The destructor now waits for the queue to empty but throws a std::runtime_error if enqueue is called on the stopped pool (see issue progschj#3).
1 parent f76ae84 commit 5d0ca72

File tree

1 file changed

+14
-41
lines changed

1 file changed

+14
-41
lines changed

ThreadPool.h

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,45 +2,14 @@
22
#define THREAD_POOL_H
33

44
#include <vector>
5-
#include <deque>
5+
#include <queue>
66
#include <memory>
77
#include <thread>
88
#include <mutex>
99
#include <condition_variable>
1010
#include <future>
11-
12-
// need this type to "erase" the return type of the packaged task
13-
struct any_packaged_base {
14-
virtual void execute() = 0;
15-
};
16-
17-
template<class R>
18-
struct any_packaged : public any_packaged_base {
19-
any_packaged(std::packaged_task<R()> &&t)
20-
: task(std::move(t))
21-
{
22-
}
23-
void execute()
24-
{
25-
task();
26-
}
27-
std::packaged_task<R()> task;
28-
};
29-
30-
class any_packaged_task {
31-
public:
32-
template<class R>
33-
any_packaged_task(std::packaged_task<R()> &&task)
34-
: ptr(new any_packaged<R>(std::move(task)))
35-
{
36-
}
37-
void operator()()
38-
{
39-
ptr->execute();
40-
}
41-
private:
42-
std::shared_ptr<any_packaged_base> ptr;
43-
};
11+
#include <functional>
12+
#include <stdexcept>
4413

4514
class ThreadPool;
4615

@@ -66,7 +35,7 @@ class ThreadPool {
6635
// need to keep track of threads so we can join them
6736
std::vector< std::thread > workers;
6837
// the task queue
69-
std::deque< any_packaged_task > tasks;
38+
std::queue< std::function<void()> > tasks;
7039

7140
// synchronization
7241
std::mutex queue_mutex;
@@ -81,10 +50,10 @@ void Worker::operator()()
8150
std::unique_lock<std::mutex> lock(pool.queue_mutex);
8251
while(!pool.stop && pool.tasks.empty())
8352
pool.condition.wait(lock);
84-
if(pool.stop)
53+
if(pool.stop && pool.tasks.empty())
8554
return;
86-
any_packaged_task task(pool.tasks.front());
87-
pool.tasks.pop_front();
55+
std::function<void()> task(pool.tasks.front());
56+
pool.tasks.pop();
8857
lock.unlock();
8958
task();
9059
}
@@ -102,11 +71,15 @@ ThreadPool::ThreadPool(size_t threads)
10271
template<class T, class F>
10372
std::future<T> ThreadPool::enqueue(F f)
10473
{
105-
std::packaged_task<T()> task(f);
106-
std::future<T> res= task.get_future();
74+
// don't allow enqueueing after stopping the pool
75+
if(stop)
76+
throw std::runtime_error("enqueue on stopped ThreadPool");
77+
78+
auto task = std::make_shared< std::packaged_task<T()> >(f);
79+
std::future<T> res = task->get_future();
10780
{
10881
std::unique_lock<std::mutex> lock(queue_mutex);
109-
tasks.push_back(any_packaged_task(std::move(task)));
82+
tasks.push([task](){ (*task)(); });
11083
}
11184
condition.notify_one();
11285
return res;

0 commit comments

Comments
 (0)