Skip to content

Commit 9395d3e

Browse files
author
tnagler
committed
fix synchronization
1 parent 7c89fbf commit 9395d3e

File tree

3 files changed

+22
-27
lines changed

3 files changed

+22
-27
lines changed

inst/include/RcppThread/ThreadPool.hpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@
2222

2323
namespace RcppThread {
2424

25+
namespace util {
26+
void
27+
waitAndSync(tpool::FinishLine& finishLine)
28+
{
29+
finishLine.wait_for(std::chrono::milliseconds(50));
30+
Rcout << "";
31+
checkUserInterrupt();
32+
}
33+
}
34+
2535
//! Implemenation of the thread pool pattern based on `Thread`.
2636
class ThreadPool
2737
{
@@ -234,19 +244,8 @@ ThreadPool::parallelForEach(I& items, F&& f, size_t nBatches)
234244
inline void
235245
ThreadPool::wait()
236246
{
237-
while (!taskManager_.empty()) {
238-
finishLine_.wait_for(std::chrono::milliseconds(50));
239-
if (isInterrupted()) {
240-
taskManager_.stop();
241-
break;
242-
}
243-
if (taskManager_.empty())
244-
break;
245-
Rcout << "";
246-
std::this_thread::yield();
247-
}
248-
Rcout << "";
249-
checkUserInterrupt();
247+
while (!taskManager_.empty())
248+
util::waitAndSync(finishLine_);
250249
}
251250

252251
//! waits for all jobs to finish and joins all threads.

inst/include/RcppThread/parallelFor.hpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,14 @@ parallelFor(int begin,
5959
};
6060
for (const auto& batch : batches)
6161
ThreadPool::globalInstance().push(doBatch, batch);
62-
finishLine.wait();
62+
util::waitAndSync(finishLine);
6363
}
6464

6565
//! computes a range-based for loop in parallel batches.
6666
//! @param items an object allowing for `items.size()` and whose elements
6767
//! are accessed by the `[]` operator.
6868
//! @param f a function (the 'loop body').
69-
//! @param nThreads the number of threads to use; the default uses the
70-
//! number
71-
//! of cores in the machine; if `nThreads = 0`, all work will be done in
72-
//! the main thread.
69+
//! @param nThreads deprecated; loop is run on global thread pool.
7370
//! @param nBatches the number of batches to create; the default (0)
7471
//! triggers a heuristic to automatically determine the number of batches.
7572
//! @details Consider the following code:
@@ -101,7 +98,6 @@ parallelForEach(I& items,
10198
const auto begin_it = std::begin(items);
10299
const auto end_it = std::end(items);
103100
auto size = std::distance(begin_it, end_it);
104-
parallelFor(
105-
0, size, [f, &items, &begin_it](int i) { f(*(begin_it + i)); });
101+
parallelFor(0, size, [f, &items, &begin_it](int i) { f(*(begin_it + i)); });
106102
}
107103
}

tests/tests.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -379,10 +379,10 @@ void testNestedParallelForEach()
379379
// {
380380
// ThreadPool pool(2);
381381
// auto dummy = [] {
382-
// std::this_thread::sleep_for(std::chrono::milliseconds(500));
382+
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
383383
// checkUserInterrupt();
384384
// };
385-
// for (size_t i = 0; i < 10; i++)
385+
// for (size_t i = 0; i < 20; i++)
386386
// pool.push(dummy);
387387
// pool.join();
388388
// std::this_thread::sleep_for(std::chrono::milliseconds(5000));
@@ -391,15 +391,15 @@ void testNestedParallelForEach()
391391
// // [[Rcpp::export]]
392392
// void testThreadPoolInterruptWait()
393393
// {
394-
// ThreadPool pool(0);
394+
// ThreadPool pool(2);
395395
// auto dummy = [] {
396-
// std::this_thread::sleep_for(std::chrono::milliseconds(500));
396+
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
397397
// checkUserInterrupt();
398398
// };
399399
// for (size_t i = 0; i < 20; i++) {
400400
// pool.push(dummy);
401401
// }
402-
// pool.join();
402+
// pool.wait();
403403
// std::this_thread::sleep_for(std::chrono::milliseconds(3000));
404404
// }
405405

@@ -409,7 +409,7 @@ void testProgressCounter()
409409
{
410410
RcppThread::ProgressCounter cntr(20, 1);
411411
RcppThread::parallelFor(0, 20, [&] (int i) {
412-
std::this_thread::sleep_for(std::chrono::milliseconds(200));
412+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
413413
cntr++;
414414
});
415415
}
@@ -420,7 +420,7 @@ void testProgressBar()
420420
// 20 iterations in loop, update progress every 1 sec
421421
RcppThread::ProgressBar bar(20, 1);
422422
RcppThread::parallelFor(0, 20, [&] (int i) {
423-
std::this_thread::sleep_for(std::chrono::milliseconds(200));
423+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
424424
++bar;
425425
});
426426
}

0 commit comments

Comments
 (0)