Skip to content

Commit 3d54895

Browse files
author
Chris Sullivan
committed
Merge branch 'lockfree_merge' into lockfree
2 parents 055b7b8 + 5d18a81 commit 3d54895

File tree

4 files changed

+124
-16
lines changed

4 files changed

+124
-16
lines changed

include/LockfreeThreadPool.hh

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#ifndef LOCKFREETHREADPOOL_H
2+
#define LOCKFREETHREADPOOL_H
3+
4+
#include <iostream>
5+
#include <chrono>
6+
#include <thread>
7+
#include <functional>
8+
#include <vector>
9+
#include <utility>
10+
#include <future>
11+
12+
#include "concurrentqueue.h"
13+
14+
using namespace std;
15+
using namespace moodycamel;
16+
17+
typedef std::function<void()> WorkType;
18+
19+
class LockfreeThreadPool {
20+
public:
21+
LockfreeThreadPool(uint32_t numthreads);
22+
~LockfreeThreadPool();
23+
24+
void Enqueue(function<void()> task);
25+
void Worker();
26+
void JoinAll();
27+
void Finish();
28+
29+
template <typename T, typename... Params>
30+
void ParallelFor(uint32_t begin, uint32_t end, int32_t n_tasks, T SerialFunction, Params&&... params) {
31+
32+
n_tasks = (n_tasks >= m_nthreads) ? n_tasks : m_nthreads;
33+
int chunk = (end - begin) / n_tasks;
34+
for (int i = 0; i < n_tasks; ++i) {
35+
m_promises.emplace_back();
36+
int mypromise = m_promises.size() - 1;
37+
m_taskQueue.enqueue([=]{
38+
uint32_t threadstart = begin + i*chunk;
39+
uint32_t threadstop = (i == n_tasks - 1) ? end : threadstart + chunk;
40+
for (uint32_t it = threadstart; it < threadstop; ++it) {
41+
SerialFunction(it, params...);
42+
}
43+
m_promises[mypromise].set_value();
44+
});
45+
}
46+
Finish();
47+
}
48+
template<typename InputIt, typename T>
49+
void ParallelMap(InputIt begin, InputIt end, InputIt outputBegin, const std::function<T(T)>& func)
50+
{
51+
int chunkSize = (end - begin) / m_nthreads;
52+
for (int i = 0; i < m_nthreads; i++) {
53+
m_promises.emplace_back();
54+
int mypromise = m_promises.size() - 1;
55+
m_taskQueue.enqueue([=]{
56+
InputIt threadBegin = begin + i*chunkSize;
57+
InputIt threadOutput = outputBegin + i*chunkSize;
58+
InputIt threadEnd = (i == m_nthreads - 1) ? end : threadBegin + chunkSize;
59+
while (threadBegin != threadEnd) {
60+
*(threadOutput++) = func(*(threadBegin++));
61+
}
62+
m_promises[mypromise].set_value();
63+
});
64+
}
65+
Finish();
66+
}
67+
68+
private:
69+
// threads and task queue
70+
int m_nthreads;
71+
vector<thread> m_workers;
72+
vector<promise<void>> m_promises;
73+
bool m_stopWorkers;
74+
75+
ConcurrentQueue<WorkType> m_taskQueue;
76+
};
77+
78+
#endif /* end of include guard: THREADPOOL_H */

include/ThreadPool.hh

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
#include <utility>
1010
#include <future>
1111

12-
#include "concurrentqueue.h"
12+
#include "ThreadsafeQueue.hh"
1313

1414
using namespace std;
15-
using namespace moodycamel;
1615

1716
typedef std::function<void()> WorkType;
1817

@@ -27,23 +26,22 @@ public:
2726
void Finish();
2827

2928
template <typename T, typename... Params>
30-
void ParallelFor(uint32_t begin, uint32_t end, T SerialFunction, Params&&... params) {
29+
void ParallelFor(uint32_t begin, uint32_t end, int32_t n_tasks, T SerialFunction, Params&&... params) {
3130

32-
int chunk = (end - begin) / m_nthreads;
33-
for (int i = 0; i < m_nthreads; ++i) {
31+
n_tasks = (n_tasks >= m_nthreads) ? n_tasks : m_nthreads;
32+
int chunk = (end - begin) / n_tasks;
33+
for (int i = 0; i < n_tasks; ++i) {
3434
m_promises.emplace_back();
3535
int mypromise = m_promises.size() - 1;
36-
m_taskQueue.enqueue([=]{
36+
m_taskQueue.push([=]{
3737
uint32_t threadstart = begin + i*chunk;
38-
uint32_t threadstop = (i == m_nthreads - 1) ? end : threadstart + chunk;
38+
uint32_t threadstop = (i == n_tasks - 1) ? end : threadstart + chunk;
3939
for (uint32_t it = threadstart; it < threadstop; ++it) {
4040
SerialFunction(it, params...);
4141
}
4242
m_promises[mypromise].set_value();
4343
});
4444
}
45-
WorkType work;
46-
if (m_taskQueue.try_dequeue(work)) work(); // master thread is also a worker
4745
Finish();
4846
}
4947
template<typename InputIt, typename T>
@@ -53,7 +51,7 @@ public:
5351
for (int i = 0; i < m_nthreads; i++) {
5452
m_promises.emplace_back();
5553
int mypromise = m_promises.size() - 1;
56-
m_taskQueue.enqueue([=]{
54+
m_taskQueue.push([=]{
5755
InputIt threadBegin = begin + i*chunkSize;
5856
InputIt threadOutput = outputBegin + i*chunkSize;
5957
InputIt threadEnd = (i == m_nthreads - 1) ? end : threadBegin + chunkSize;
@@ -63,8 +61,6 @@ public:
6361
m_promises[mypromise].set_value();
6462
});
6563
}
66-
WorkType work;
67-
if (m_taskQueue.try_dequeue(work)) work(); // master thread is also a worker
6864
Finish();
6965
}
7066

@@ -75,7 +71,7 @@ private:
7571
vector<promise<void>> m_promises;
7672
bool m_stopWorkers;
7773

78-
ConcurrentQueue<WorkType> m_taskQueue;
74+
ThreadsafeQueue<WorkType> m_taskQueue;
7975
};
8076

8177
#endif /* end of include guard: THREADPOOL_H */

src/LockfreeThreadPool.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#include "LockfreeThreadPool.hh"
2+
3+
LockfreeThreadPool::LockfreeThreadPool(uint32_t numthreads) : m_nthreads(numthreads), m_stopWorkers(false) {
4+
for (uint32_t i=0; i<numthreads;i++) {
5+
m_workers.emplace_back(&LockfreeThreadPool::Worker, this);
6+
}
7+
}
8+
LockfreeThreadPool::~LockfreeThreadPool() {
9+
m_stopWorkers = true;
10+
JoinAll();
11+
}
12+
13+
void LockfreeThreadPool::Worker() {
14+
while(true) {
15+
function<void()> work;
16+
if (m_taskQueue.try_dequeue(work)) work();
17+
if (m_stopWorkers) return;
18+
}
19+
}
20+
void LockfreeThreadPool::JoinAll() {
21+
for (auto& worker : m_workers) { worker.join(); }
22+
}
23+
void LockfreeThreadPool::Finish() {
24+
for (auto& promise : m_promises) promise.get_future().get();
25+
m_promises.clear();
26+
}

src/ThreadPool.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
11
#include "ThreadPool.hh"
22

33
ThreadPool::ThreadPool(uint32_t numthreads) : m_nthreads(numthreads), m_stopWorkers(false) {
4-
for (uint32_t i=0; i<numthreads-1;i++) { // -1 b/c main thread is also a worker
4+
//for (uint32_t i=0; i<numthreads-1;i++) { // -1 b/c main thread is also a worker
5+
for (uint32_t i=0; i<numthreads;i++) {
56
m_workers.emplace_back(&ThreadPool::Worker, this);
67
}
78
}
89
ThreadPool::~ThreadPool() {
910
m_stopWorkers = true;
11+
m_taskQueue.join();
1012
JoinAll();
1113
}
1214

1315
void ThreadPool::Worker() {
1416
while(true) {
1517
function<void()> work;
16-
if (m_taskQueue.try_dequeue(work)) work();
17-
if (m_stopWorkers) return;
18+
try {
19+
work = m_taskQueue.pop();
20+
work();
21+
}
22+
catch (const ThreadsafeQueue<WorkType>::QueueFinished&)
23+
{
24+
return;
25+
}
1826
}
1927
}
2028
void ThreadPool::JoinAll() {

0 commit comments

Comments
 (0)