Skip to content

Commit ab0c31f

Browse files
author
Chris Sullivan
committed
merged ntasks branch
2 parents d213279 + e0867e7 commit ab0c31f

File tree

2 files changed

+113
-0
lines changed

2 files changed

+113
-0
lines changed

include/ThreadPool.hh

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#ifndef THREADPOOL_H
2+
#define THREADPOOL_H
3+
4+
#include <iostream>
5+
#include <chrono>
6+
#include <thread>
7+
#include <functional>
8+
#include <vector>
9+
#include <utility>
10+
#include <future>
11+
12+
#include "ThreadsafeQueue.hh"
13+
14+
using namespace std;
15+
16+
typedef std::function<void()> WorkType;
17+
18+
class ThreadPool {
19+
public:
20+
ThreadPool(uint32_t numthreads);
21+
~ThreadPool();
22+
23+
void Enqueue(function<void()> task);
24+
void Worker();
25+
void JoinAll();
26+
void Finish();
27+
28+
template <typename T, typename... Params>
29+
void ParallelFor(uint32_t begin, uint32_t end, int32_t n_tasks, T SerialFunction, Params&&... params) {
30+
31+
n_tasks = (n_tasks >= m_nthreads) ? n_tasks : m_nthreads;
32+
int chunk = (end - begin) / n_tasks;
33+
for (int i = 0; i < n_tasks; ++i) {
34+
m_promises.emplace_back();
35+
int mypromise = m_promises.size() - 1;
36+
m_taskQueue.push([=]{
37+
uint32_t threadstart = begin + i*chunk;
38+
uint32_t threadstop = (i == n_tasks - 1) ? end : threadstart + chunk;
39+
for (uint32_t it = threadstart; it < threadstop; ++it) {
40+
SerialFunction(it, params...);
41+
}
42+
m_promises[mypromise].set_value();
43+
});
44+
}
45+
//m_taskQueue.pop()(); // master thread is also a worker
46+
Finish();
47+
}
48+
template<typename InputIt, typename T>
49+
void ParallelMap(InputIt begin, InputIt end, InputIt outputBegin, const std::function<T(T)>& func)
50+
{
51+
int chunkSize = (end - begin) / m_nthreads;
52+
for (int i = 0; i < m_nthreads; i++) {
53+
m_promises.emplace_back();
54+
int mypromise = m_promises.size() - 1;
55+
m_taskQueue.push([=]{
56+
InputIt threadBegin = begin + i*chunkSize;
57+
InputIt threadOutput = outputBegin + i*chunkSize;
58+
InputIt threadEnd = (i == m_nthreads - 1) ? end : threadBegin + chunkSize;
59+
while (threadBegin != threadEnd) {
60+
*(threadOutput++) = func(*(threadBegin++));
61+
}
62+
m_promises[mypromise].set_value(); // master thread is also a worker
63+
});
64+
}
65+
//m_taskQueue.pop()();
66+
Finish();
67+
}
68+
69+
private:
70+
// threads and task queue
71+
int m_nthreads;
72+
vector<thread> m_workers;
73+
vector<promise<void>> m_promises;
74+
bool m_stopWorkers;
75+
76+
ThreadsafeQueue<WorkType> m_taskQueue;
77+
};
78+
79+
#endif /* end of include guard: THREADPOOL_H */

src/ThreadPool.cc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#include "ThreadPool.hh"
2+
3+
ThreadPool::ThreadPool(uint32_t numthreads) : m_nthreads(numthreads), m_stopWorkers(false) {
4+
//for (uint32_t i=0; i<numthreads-1;i++) { // -1 b/c main thread is also a worker
5+
for (uint32_t i=0; i<numthreads;i++) {
6+
m_workers.emplace_back(&ThreadPool::Worker, this);
7+
}
8+
}
9+
ThreadPool::~ThreadPool() {
10+
m_stopWorkers = true;
11+
m_taskQueue.join();
12+
JoinAll();
13+
}
14+
15+
void ThreadPool::Worker() {
16+
while(true) {
17+
function<void()> work;
18+
try {
19+
work = m_taskQueue.pop();
20+
work();
21+
}
22+
catch (const ThreadsafeQueue<WorkType>::QueueFinished&)
23+
{
24+
return;
25+
}
26+
}
27+
}
28+
void ThreadPool::JoinAll() {
29+
for (auto& worker : m_workers) { worker.join(); }
30+
}
31+
void ThreadPool::Finish() {
32+
for (auto& promise : m_promises) promise.get_future().get();
33+
m_promises.clear();
34+
}

0 commit comments

Comments
 (0)