Skip to content

Commit

Permalink
Merge pull request #301 from andlaus/generalized_tasklets
Browse files Browse the repository at this point in the history
generalize the tasklet mechanism to an arbitrary number of worker threads
  • Loading branch information
andlaus authored Mar 15, 2018
2 parents 5290f25 + ac7e574 commit 564b8d0
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 40 deletions.
6 changes: 4 additions & 2 deletions ebos/eclwriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ public:
// create output thread if enabled and rank is I/O rank
// async output is enabled by default if pthread are enabled
bool enableAsyncOutput = EWOMS_GET_PARAM(TypeTag, bool, EnableAsyncEclOutput);
bool createOutputThread = enableAsyncOutput && collectToIORank_.isIORank();
taskletRunner_.reset(new TaskletRunner(createOutputThread));
int numWorkerThreads = 0;
if (enableAsyncOutput && collectToIORank_.isIORank())
numWorkerThreads = 1;
taskletRunner_.reset(new TaskletRunner(numWorkerThreads));
}

~EclWriter()
Expand Down
137 changes: 99 additions & 38 deletions ewoms/parallel/tasklets.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
#include <stdexcept>
#include <cassert>
#include <thread>
#include <mutex>
#include <queue>
#include <mutex>
#include <condition_variable>

namespace Ewoms {

Expand All @@ -43,31 +44,70 @@ namespace Ewoms {
class TaskletInterface
{
public:
TaskletInterface() {}
TaskletInterface(int refCount = 1)
: referenceCount_(refCount)
{}
virtual ~TaskletInterface() {}
virtual void run() = 0;
virtual bool isEndMarker () const { return false; }

void dereference()
{ -- referenceCount_; }

int referenceCount() const
{ return referenceCount_; }

private:
int referenceCount_;
};

/*!
* \brief Handles where a given tasklet is run.
*
* Depending on the "runAsync" constructor parameter, a tasklet can either be a separate
* worker thread or it can be the main thread.
* Depending on the number of worker threads, a tasklet can either be run in a separate
* worker thread or by the main thread.
*/
class TaskletRunner
{
/// \brief Implements a barrier. This class can only be used in the asynchronous case.
class BarrierTasklet : public TaskletInterface
{
public:
BarrierTasklet()
{ barrierMutex.lock(); }
BarrierTasklet(unsigned numWorkers)
: TaskletInterface(/*refCount=*/numWorkers)
{
numWorkers_ = numWorkers;
numWaiting_ = 0;
}

void run()
{ barrierMutex.unlock(); }
{ wait(); }

void wait()
{
numWaitingMutex_.lock();
numWaiting_ += 1;
if (numWaiting_ >= numWorkers_ + 1)
barrierCondition_.notify_all();
numWaitingMutex_.unlock();

if (numWaiting_ < numWorkers_ + 1) {
const auto& areAllWaiting =
[this]() -> bool
{ return this->numWaiting_ >= this->numWorkers_ + 1; };

std::unique_lock<std::mutex> lock(barrierMutex_);
barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
}
}

std::mutex barrierMutex;
private:
unsigned numWorkers_;
unsigned numWaiting_;

std::condition_variable barrierCondition_;
std::mutex numWaitingMutex_;
std::mutex barrierMutex_;
};

/// \brief TerminateThreadTasklet class
Expand All @@ -86,32 +126,39 @@ public:
// prohibit copying of tasklet runners
TaskletRunner(const TaskletRunner&) = delete;

TaskletRunner(const bool runAsync)
/*!
* \brief Creates a tasklet runner with numWorkers underling threads for doing work.
*
* The number of worker threads may be 0. In this case, all work is done by the main
* thread (synchronous mode).
*/
TaskletRunner(unsigned numWorkers)
{
if (runAsync) {
// make sure that the runner thread blocks when the tasklet queue is empty
runnerMutex_.lock();
// make sure that the worker threads block when the tasklet queue is empty
runnerMutex_.lock();

threads_.resize(numWorkers);
for (int i = 0; i < numWorkers; ++i)
// create a worker thread
thread_.reset(new std::thread(startThread_, this));
}
threads_[i].reset(new std::thread(startWorkerThread_, this));
}

/*!
* \brief Destructor
*
* If a worker thread was created to run the tasklets, this method waits until the
* worker thread has been terminated, i.e. all scheduled tasklets are guaranteed to
* If worker threads were created to run the tasklets, this method waits until all
* worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to
* be completed.
*/
~TaskletRunner()
{
if (thread_) {
if (threads_.size() > 0) {
// dispatch a tasklet which will terminate the worker thread
dispatch(std::make_shared<TerminateThreadTasklet>());

// wait until the thread has been terminated
thread_->join();
// wait until all worker threads have terminated
for (auto& thread : threads_)
thread->join();
}
}

Expand All @@ -122,17 +169,21 @@ public:
*/
void dispatch(std::shared_ptr<TaskletInterface> tasklet)
{
if (!thread_)
if (threads_.empty()) {
// run the tasklet immediately in synchronous mode.
tasklet->run();
while (tasklet->referenceCount() > 0) {
tasklet->dereference();
tasklet->run();
}
}
else {
// lock mutex for the tasklet queue to make sure that nobody messes with the
// task queue
taskletQueueMutex_.lock();

// add the tasklet to the queue
taskletQueue_.push(tasklet);
// fire up the worker thread
// fire up a worker thread
runnerMutex_.unlock();

taskletQueueMutex_.unlock();
Expand All @@ -144,19 +195,21 @@ public:
*/
void barrier()
{
if (!thread_)
unsigned numWorkers = threads_.size();
if (numWorkers == 0)
// nothing needs to be done to implement a barrier in synchronous mode
return;

// dispatch a barrier tasklet and wait until it has been run by the worker thread
auto barrierTasklet = std::make_shared<BarrierTasklet>();
auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
dispatch(barrierTasklet);
barrierTasklet->barrierMutex.lock();

barrierTasklet->wait();
}

protected:
// main function of the worker thread
static void startThread_(TaskletRunner* taskletRunner)
static void startWorkerThread_(TaskletRunner* taskletRunner)
{ taskletRunner->run_(); }

//! do the work until the queue received an end tasklet
Expand All @@ -173,29 +226,37 @@ protected:

// remove tasklet from queue
std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
taskletQueue_.pop();

// if the queue is not yet empty, make sure that we are will process the next
// tasklet immediately after we're finished with the current one.
// if tasklet is an end marker, terminate the thread and DO NOT remove the
// tasklet.
if (tasklet->isEndMarker()) {
if(taskletQueue_.size() > 1)
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
taskletQueueMutex_.unlock();
runnerMutex_.unlock();
return;
}

tasklet->dereference();
if (tasklet->referenceCount() == 0)
// remove tasklets from the queue as soon as their reference count
// reaches zero, i.e. the tasklet has been run often enough.
taskletQueue_.pop();

// if the queue is not yet empty, make sure that we the next tasklet
// can be processed immediately.
if (!taskletQueue_.empty())
runnerMutex_.unlock();

// unlock mutex for access to taskletQueue_
taskletQueueMutex_.unlock();

// if tasklet is an end marker, terminate the thread
if (tasklet->isEndMarker()) {
if(!taskletQueue_.empty())
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
return;
}

// execute tasklet action
// execute tasklet
tasklet->run();
}
}

std::unique_ptr<std::thread> thread_;
std::vector<std::unique_ptr<std::thread> > threads_;
std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
std::mutex taskletQueueMutex_;
std::mutex runnerMutex_;
Expand Down

0 comments on commit 564b8d0

Please sign in to comment.