Skip to content

Commit efcf58b

Browse files
authored
refactor (#52)
* try locks on pop * new tpool * run less tests * fix compiler warnings * test runs :/ * compiler warning * just pushing * try own check * tryfix * meh * only thread * push10 * couts * stupid * more * enable push * dumdum * now * more * more * nowait * no push lock * waitk * msgs * stopped check * cleanup * other tests * thread localized steals * Rcout on wait * no Rcout * progress * no pushreturn * mtx name * no pushreturn * wait on destruct * more progress * more msgs * time stamps * fix * no pushreturn * away * wtf * run nothing * detach on destruct * fix * destruct print * more * no print on wait * time on destruct * delete print * more * actually use multiple queues * destruction order * more order * time print * finish print * fix * no globals * print too * clear on stop * fix * parfor * don't wait on join * after cv * before cv * ptr to taskmanager * don't stop on mngr dtor * don't clear either * reorder * manually destroy cv * wrap in pointer * learned: windows hangs when destructing static cv * no global instance in parallel for * 5 test runs by default * with print * fix nBatches docs * get future before push * get after join * copy function on push * check future result * leftover * prints * run 5x * get future? * get first * no print * run 10 * other OSs * fix2 * bench pdfs on gitignore * clean up prints * other sync * some cleanup * finish line -> todo list * more clean up * more clean up * even more * update NEWS * more clean up * even more * clean up unit tests * pull tpool * restrict to 10 runs
1 parent 9395d3e commit efcf58b

File tree

10 files changed

+439
-615
lines changed

10 files changed

+439
-615
lines changed

.Rbuildignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ revdep
1313
^\.github$
1414
^CRAN-RELEASE$
1515
.vscode/
16-
new-benchmarks.R
16+
new-benchmarks.R
17+
bench*

.github/workflows/R-CMD-check.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ jobs:
8080
_R_CHECK_CRAN_INCOMING_: false
8181
run: rcmdcheck::rcmdcheck(args = c("--no-manual", "--as-cran"), error_on = "warning", check_dir = "check")
8282
shell: Rscript {0}
83+
# run: cd .. && R CMD build RcppThread && R CMD check RcppThread_1.1.0.tar.gz
84+
# shell: bash
8385

8486
- name: Show testthat output
8587
if: always()

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ inst/doc
55
revdep
66
__pycache__
77
.vscode/
8-
new-benchmarks.R
8+
new-benchmarks.R
9+
bench*

NEWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* Add classes `ProgressCounter` and `ProgressBar` for tracking progress in long-
66
running loops (#49).
77

8-
* Increased speed for short running tasks due to lock-free queue (#50, #51).
8+
* Increased speed due to lock-free queue (#51).
99

1010

1111
# RcppThread 1.0.0

inst/include/RcppThread/Progress.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class ProgressBar : public ProgressPrinter {
181181
std::string makeBar(size_t pct, size_t numBars = 40) {
182182
std::ostringstream msg;
183183
msg << "[";
184-
int i = 0;
184+
size_t i = 0;
185185
for (; i < pct / 100.0 * numBars; i++)
186186
msg << "=";
187187
for (; i < numBars; i++)

inst/include/RcppThread/ThreadPool.hpp

Lines changed: 35 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,6 @@
2222

2323
namespace RcppThread {
2424

25-
namespace util {
26-
void
27-
waitAndSync(tpool::FinishLine& finishLine)
28-
{
29-
finishLine.wait_for(std::chrono::milliseconds(50));
30-
Rcout << "";
31-
checkUserInterrupt();
32-
}
33-
}
34-
3525
//! Implemenation of the thread pool pattern based on `Thread`.
3626
class ThreadPool
3727
{
@@ -46,13 +36,6 @@ class ThreadPool
4636
ThreadPool& operator=(const ThreadPool&) = delete;
4737
ThreadPool& operator=(ThreadPool&& other) = delete;
4838

49-
//! @brief returns a reference to the global thread pool instance.
50-
static ThreadPool& globalInstance()
51-
{
52-
static ThreadPool instance_;
53-
return instance_;
54-
}
55-
5639
template<class F, class... Args>
5740
void push(F&& f, Args&&... args);
5841

@@ -76,21 +59,13 @@ class ThreadPool
7659
void clear();
7760

7861
private:
79-
void startWorker();
8062
void joinWorkers();
63+
void execute(std::function<void()>& task);
8164

82-
template<class Task>
83-
void executeSafely(Task& task);
84-
85-
bool allJobsDone();
86-
void waitForEvents();
87-
void rethrowExceptions();
88-
89-
std::vector<std::thread> workers_;
90-
size_t nWorkers_;
91-
// variables for synchronization between workers
65+
// variables for synchronization between workers (destructed last)
9266
tpool::detail::TaskManager taskManager_;
93-
tpool::FinishLine finishLine_{ 0 };
67+
tpool::TodoList todoList_;
68+
std::vector<std::thread> workers_;
9469
};
9570

9671
//! constructs a thread pool with as many workers as there are cores.
@@ -102,10 +77,21 @@ inline ThreadPool::ThreadPool()
10277
//! @param nWorkers number of worker threads to create; if `nWorkers = 0`, all
10378
//! work pushed to the pool will be done in the main thread.
10479
inline ThreadPool::ThreadPool(size_t nWorkers)
105-
: nWorkers_(nWorkers)
80+
: taskManager_{ nWorkers }
10681
{
107-
for (size_t w = 0; w < nWorkers_; w++)
108-
this->startWorker();
82+
for (size_t id = 0; id < nWorkers; id++) {
83+
workers_.emplace_back([this, id] {
84+
std::function<void()> task;
85+
while (!taskManager_.stopped()) {
86+
taskManager_.wait_for_jobs(id);
87+
do {
88+
// use inner while to save a few cash misses calling done()
89+
while (taskManager_.try_pop(task, id))
90+
execute(task);
91+
} while (!todoList_.empty());
92+
}
93+
});
94+
}
10995
}
11096

11197
//! destructor joins all threads if possible.
@@ -129,9 +115,10 @@ template<class F, class... Args>
129115
void
130116
ThreadPool::push(F&& f, Args&&... args)
131117
{
132-
if (nWorkers_ == 0) {
118+
if (workers_.size() == 0) {
133119
f(args...); // if there are no workers, do the job in the main thread
134120
} else {
121+
todoList_.add();
135122
taskManager_.push(
136123
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
137124
}
@@ -148,11 +135,11 @@ auto
148135
ThreadPool::pushReturn(F&& f, Args&&... args)
149136
-> std::future<decltype(f(args...))>
150137
{
151-
using task = std::packaged_task<decltype(f(args...))()>;
152-
auto pack = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
153-
auto taskPtr = std::make_shared<task>(std::move(pack));
154-
taskManager_.push([taskPtr] { (*taskPtr)(); });
155-
return taskPtr->get_future();
138+
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
139+
using pack_t = std::packaged_task<decltype(f(args...))()>;
140+
auto ptr = std::make_shared<pack_t>(std::move(task));
141+
this->push([ptr] { (*ptr)(); });
142+
return ptr->get_future();
156143
}
157144

158145
//! maps a function on a list of items, possibly running tasks in parallel.
@@ -202,7 +189,7 @@ ThreadPool::parallelFor(ptrdiff_t begin, size_t size, F&& f, size_t nBatches)
202189
for (ptrdiff_t i = b.begin; i < b.end; i++)
203190
f(i);
204191
};
205-
auto batches = createBatches(begin, size, nWorkers_, nBatches);
192+
auto batches = createBatches(begin, size, workers_.size(), nBatches);
206193
auto pushJob = [=] {
207194
for (const auto& batch : batches)
208195
this->push(doBatch, batch);
@@ -244,8 +231,12 @@ ThreadPool::parallelForEach(I& items, F&& f, size_t nBatches)
244231
inline void
245232
ThreadPool::wait()
246233
{
247-
while (!taskManager_.empty())
248-
util::waitAndSync(finishLine_);
234+
while (!todoList_.empty()) {
235+
todoList_.wait(50);
236+
Rcout << "";
237+
checkUserInterrupt();
238+
}
239+
Rcout << "";
249240
}
250241

251242
//! waits for all jobs to finish and joins all threads.
@@ -264,31 +255,14 @@ ThreadPool::clear()
264255
taskManager_.clear();
265256
}
266257

267-
//! spawns a worker thread waiting for jobs to arrive.
268-
inline void
269-
ThreadPool::startWorker()
270-
{
271-
workers_.emplace_back([this] {
272-
std::function<void()> task;
273-
while (!taskManager_.stopped()) {
274-
taskManager_.wait_for_jobs();
275-
276-
finishLine_.start();
277-
while (taskManager_.try_pop(task))
278-
executeSafely(task);
279-
finishLine_.cross();
280-
}
281-
});
282-
}
283-
284-
template<class Task>
285258
inline void
286-
ThreadPool::executeSafely(Task& task)
259+
ThreadPool::execute(std::function<void()>& task)
287260
{
288261
try {
289262
task();
263+
todoList_.cross();
290264
} catch (...) {
291-
finishLine_.abort(std::current_exception());
265+
todoList_.clear(std::current_exception());
292266
}
293267
}
294268

inst/include/RcppThread/parallelFor.hpp

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ namespace RcppThread {
1515
//! @param begin first index of the loop.
1616
//! @param end the loop runs in the range `[begin, end)`.
1717
//! @param f a function (the 'loop body').
18-
//! @param nThreads deprecated; loop is run on global thread pool.
18+
//! @param nThreads the number of threads to use; the default uses the number
19+
//! of cores in the machine; if `nThreads = 0`, all work will be done in the
20+
//! main thread.
1921
//! @param nBatches the number of batches to create; the default (0)
2022
//! triggers a heuristic to automatically determine the number of batches.
2123
//! @details Consider the following code:
@@ -31,8 +33,9 @@ namespace RcppThread {
3133
//! x[i] = i;
3234
//! });
3335
//! ```
34-
//! The function dispatches to a global thread pool, so it can safely be nested
35-
//! or called multiple times with almost no overhead.
36+
//! The function sets up a `ThreadPool` object to do the scheduling. If you
37+
//! want to run multiple parallel for loops, consider creating a `ThreadPool`
38+
//! yourself and using `ThreadPool::parallelFor()`.
3639
//!
3740
//! **Caution**: if the iterations are not independent from another,
3841
//! the tasks need to be synchronized manually (e.g., using mutexes).
@@ -49,24 +52,18 @@ parallelFor(int begin,
4952
if (end == begin)
5053
return;
5154

52-
nThreads = std::thread::hardware_concurrency();
53-
auto batches = createBatches(begin, end - begin, nThreads, nBatches);
54-
tpool::FinishLine finishLine{ batches.size() };
55-
auto doBatch = [f, &finishLine](const Batch& b) {
56-
for (ptrdiff_t i = b.begin; i < b.end; i++)
57-
f(i);
58-
finishLine.cross();
59-
};
60-
for (const auto& batch : batches)
61-
ThreadPool::globalInstance().push(doBatch, batch);
62-
util::waitAndSync(finishLine);
55+
ThreadPool pool(nThreads);
56+
pool.parallelFor(begin, end, std::forward<F>(f), nBatches);
57+
pool.join();
6358
}
6459

6560
//! computes a range-based for loop in parallel batches.
6661
//! @param items an object allowing for `items.size()` and whose elements
6762
//! are accessed by the `[]` operator.
6863
//! @param f a function (the 'loop body').
69-
//! @param nThreads deprecated; loop is run on global thread pool.
64+
//! @param nThreads the number of threads to use; the default uses the number
65+
//! of cores in the machine; if `nThreads = 0`, all work will be done in the
66+
//! main thread.
7067
//! @param nBatches the number of batches to create; the default (0)
7168
//! triggers a heuristic to automatically determine the number of batches.
7269
//! @details Consider the following code:
@@ -82,8 +79,9 @@ parallelFor(int begin,
8279
//! xx *= 2;
8380
//! });
8481
//! ```
85-
//! The function dispatches to a global thread pool, so it can safely be nested
86-
//! or called multiple times with almost no overhead.
82+
//! The function sets up a `ThreadPool` object to do the scheduling. If you
83+
//! want to run multiple parallel for loops, consider creating a `ThreadPool`
84+
//! yourself and using `ThreadPool::parallelForEach()`.
8785
//!
8886
//! **Caution**: if the iterations are not independent from another,
8987
//! the tasks need to be synchronized manually (e.g., using mutexes).
@@ -95,9 +93,9 @@ parallelForEach(I& items,
9593
size_t nBatches = 0)
9694
{
9795
// loop ranges ranges indicate iterator offset
98-
const auto begin_it = std::begin(items);
99-
const auto end_it = std::end(items);
100-
auto size = std::distance(begin_it, end_it);
101-
parallelFor(0, size, [f, &items, &begin_it](int i) { f(*(begin_it + i)); });
96+
ThreadPool pool(nThreads);
97+
pool.parallelForEach(items, std::forward<F>(f), nBatches);
98+
pool.join();
10299
}
100+
103101
}

0 commit comments

Comments
 (0)