Skip to content

Commit d8bda53

Browse files
author
Chris Sullivan
committed
Ported ThreadPool over to ConcurrentQueue for testing. Need to decide on how to terminate Worker thread.
1 parent eb976d1 commit d8bda53

File tree

5 files changed

+15
-10
lines changed

5 files changed

+15
-10
lines changed

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "concurrentqueue"]
2+
path = concurrentqueue
3+
url = https://github.com/cameron314/concurrentqueue

concurrentqueue

Submodule concurrentqueue added at 0059f8c

default.inc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ SRC_DIRECTORIES = src
3636

3737
# A list of directories containing include files. Each directory will
3838
# be made available for #include directives for included files.
39-
INC_DIRECTORIES = include
39+
INC_DIRECTORIES = include concurrentqueue
4040

4141
# A list of directories that contain libraries. The list can also
4242
# contain patterns that expand to directories that contain libraries.

include/ThreadPool.hh

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
#include <utility>
1010
#include <future>
1111

12-
#include "ThreadsafeQueue.hh"
12+
#include "concurrentqueue.h"
1313

1414
using namespace std;
15+
using namespace moodycamel;
1516

1617
typedef std::function<void()> WorkType;
1718

@@ -32,7 +33,7 @@ public:
3233
for (int i = 0; i < m_nthreads; ++i) {
3334
m_promises.emplace_back();
3435
int mypromise = m_promises.size() - 1;
35-
m_taskQueue.push([=]{
36+
m_taskQueue.enqueue([=]{
3637
uint32_t threadstart = begin + i*chunk;
3738
uint32_t threadstop = (i == m_nthreads - 1) ? end : threadstart + chunk;
3839
for (uint32_t it = threadstart; it < threadstop; ++it) {
@@ -41,7 +42,8 @@ public:
4142
m_promises[mypromise].set_value();
4243
});
4344
}
44-
m_taskQueue.pop()(); // master thread is also a worker
45+
WorkType work;
46+
if (m_taskQueue.try_dequeue(work)) work(); // master thread is also a worker
4547
Finish();
4648
}
4749
template<typename InputIt, typename T>
@@ -51,17 +53,17 @@ public:
5153
for (int i = 0; i < m_nthreads; i++) {
5254
m_promises.emplace_back();
5355
int mypromise = m_promises.size() - 1;
54-
m_taskQueue.push([=]{
56+
m_taskQueue.enqueue([=]{
5557
InputIt threadBegin = begin + i*chunkSize;
5658
InputIt threadOutput = outputBegin + i*chunkSize;
5759
InputIt threadEnd = (i == m_nthreads - 1) ? end : threadBegin + chunkSize;
5860
while (threadBegin != threadEnd) {
5961
*(threadOutput++) = func(*(threadBegin++));
6062
}
61-
m_promises[mypromise].set_value(); // master thread is also a worker
63+
m_promises[mypromise].set_value();
6264
});
6365
}
64-
m_taskQueue.pop()();
66+
if (m_taskQueue.try_dequeue(work)) work(); // master thread is also a worker
6567
Finish();
6668
}
6769

@@ -72,7 +74,7 @@ private:
7274
vector<promise<void>> m_promises;
7375
bool m_stopWorkers;
7476

75-
ThreadsafeQueue<WorkType> m_taskQueue;
77+
ConcurrentQueue<WorkType> m_taskQueue;
7678
};
7779

7880
#endif /* end of include guard: THREADPOOL_H */

src/ThreadPool.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ void ThreadPool::Worker() {
1515
while(true) {
1616
function<void()> work;
1717
try {
18-
work = m_taskQueue.pop();
19-
work();
18+
if (m_taskQueue.try_dequeue(work)) work();
2019
}
2120
catch (const ThreadsafeQueue<WorkType>::QueueFinished&)
2221
{

0 commit comments

Comments
 (0)