Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generalize the tasklet mechanism to an arbitrary number of worker threads #301

Merged
merged 1 commit into from
Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ebos/eclwriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ public:
// create output thread if enabled and rank is I/O rank
// async output is enabled by default if pthread are enabled
bool enableAsyncOutput = EWOMS_GET_PARAM(TypeTag, bool, EnableAsyncEclOutput);
bool createOutputThread = enableAsyncOutput && collectToIORank_.isIORank();
taskletRunner_.reset(new TaskletRunner(createOutputThread));
int numWorkerThreads = 0;
if (enableAsyncOutput && collectToIORank_.isIORank())
numWorkerThreads = 1;
taskletRunner_.reset(new TaskletRunner(numWorkerThreads));
}

~EclWriter()
Expand Down
137 changes: 99 additions & 38 deletions ewoms/parallel/tasklets.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
#include <stdexcept>
#include <cassert>
#include <thread>
#include <mutex>
#include <queue>
#include <mutex>
#include <condition_variable>

namespace Ewoms {

Expand All @@ -43,31 +44,70 @@ namespace Ewoms {
class TaskletInterface
{
public:
TaskletInterface() {}
TaskletInterface(int refCount = 1)
: referenceCount_(refCount)
{}
virtual ~TaskletInterface() {}
virtual void run() = 0;
virtual bool isEndMarker () const { return false; }

void dereference()
{ -- referenceCount_; }

int referenceCount() const
{ return referenceCount_; }

private:
int referenceCount_;
};

/*!
* \brief Handles where a given tasklet is run.
*
* Depending on the "runAsync" constructor parameter, a tasklet can either be a separate
* worker thread or it can be the main thread.
* Depending on the number of worker threads, a tasklet can either be run in a separate
* worker thread or by the main thread.
*/
class TaskletRunner
{
/// \brief Implements a barrier. This class can only be used in the asynchronous case.
class BarrierTasklet : public TaskletInterface
{
public:
BarrierTasklet()
{ barrierMutex.lock(); }
BarrierTasklet(unsigned numWorkers)
: TaskletInterface(/*refCount=*/numWorkers)
{
numWorkers_ = numWorkers;
numWaiting_ = 0;
}

void run()
{ barrierMutex.unlock(); }
{ wait(); }

void wait()
{
numWaitingMutex_.lock();
numWaiting_ += 1;
if (numWaiting_ >= numWorkers_ + 1)
barrierCondition_.notify_all();
numWaitingMutex_.unlock();

if (numWaiting_ < numWorkers_ + 1) {
const auto& areAllWaiting =
[this]() -> bool
{ return this->numWaiting_ >= this->numWorkers_ + 1; };

std::unique_lock<std::mutex> lock(barrierMutex_);
barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
}
}

std::mutex barrierMutex;
private:
unsigned numWorkers_;
unsigned numWaiting_;

std::condition_variable barrierCondition_;
std::mutex numWaitingMutex_;
std::mutex barrierMutex_;
};

/// \brief TerminateThreadTasklet class
Expand All @@ -86,32 +126,39 @@ public:
// prohibit copying of tasklet runners
TaskletRunner(const TaskletRunner&) = delete;

TaskletRunner(const bool runAsync)
/*!
* \brief Creates a tasklet runner with numWorkers underling threads for doing work.
*
* The number of worker threads may be 0. In this case, all work is done by the main
* thread (synchronous mode).
*/
TaskletRunner(unsigned numWorkers)
{
if (runAsync) {
// make sure that the runner thread blocks when the tasklet queue is empty
runnerMutex_.lock();
// make sure that the worker threads block when the tasklet queue is empty
runnerMutex_.lock();

threads_.resize(numWorkers);
for (int i = 0; i < numWorkers; ++i)
// create a worker thread
thread_.reset(new std::thread(startThread_, this));
}
threads_[i].reset(new std::thread(startWorkerThread_, this));
}

/*!
* \brief Destructor
*
* If a worker thread was created to run the tasklets, this method waits until the
* worker thread has been terminated, i.e. all scheduled tasklets are guaranteed to
* If worker threads were created to run the tasklets, this method waits until all
* worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to
* be completed.
*/
~TaskletRunner()
{
if (thread_) {
if (threads_.size() > 0) {
// dispatch a tasklet which will terminate the worker thread
dispatch(std::make_shared<TerminateThreadTasklet>());

// wait until the thread has been terminated
thread_->join();
// wait until all worker threads have terminated
for (auto& thread : threads_)
thread->join();
}
}

Expand All @@ -122,17 +169,21 @@ public:
*/
void dispatch(std::shared_ptr<TaskletInterface> tasklet)
{
if (!thread_)
if (threads_.empty()) {
// run the tasklet immediately in synchronous mode.
tasklet->run();
while (tasklet->referenceCount() > 0) {
tasklet->dereference();
tasklet->run();
}
}
else {
// lock mutex for the tasklet queue to make sure that nobody messes with the
// task queue
taskletQueueMutex_.lock();

// add the tasklet to the queue
taskletQueue_.push(tasklet);
// fire up the worker thread
// fire up a worker thread
runnerMutex_.unlock();

taskletQueueMutex_.unlock();
Expand All @@ -144,19 +195,21 @@ public:
*/
void barrier()
{
if (!thread_)
unsigned numWorkers = threads_.size();
if (numWorkers == 0)
// nothing needs to be done to implement a barrier in synchronous mode
return;

// dispatch a barrier tasklet and wait until it has been run by the worker thread
auto barrierTasklet = std::make_shared<BarrierTasklet>();
auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
dispatch(barrierTasklet);
barrierTasklet->barrierMutex.lock();

barrierTasklet->wait();
}

protected:
// main function of the worker thread
static void startThread_(TaskletRunner* taskletRunner)
static void startWorkerThread_(TaskletRunner* taskletRunner)
{ taskletRunner->run_(); }

//! do the work until the queue received an end tasklet
Expand All @@ -173,29 +226,37 @@ protected:

// remove tasklet from queue
std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
taskletQueue_.pop();

// if the queue is not yet empty, make sure that we are will process the next
// tasklet immediately after we're finished with the current one.
// if tasklet is an end marker, terminate the thread and DO NOT remove the
// tasklet.
if (tasklet->isEndMarker()) {
if(taskletQueue_.size() > 1)
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
taskletQueueMutex_.unlock();
runnerMutex_.unlock();
return;
}

tasklet->dereference();
if (tasklet->referenceCount() == 0)
// remove tasklets from the queue as soon as their reference count
// reaches zero, i.e. the tasklet has been run often enough.
taskletQueue_.pop();

// if the queue is not yet empty, make sure that we the next tasklet
// can be processed immediately.
if (!taskletQueue_.empty())
runnerMutex_.unlock();

// unlock mutex for access to taskletQueue_
taskletQueueMutex_.unlock();

// if tasklet is an end marker, terminate the thread
if (tasklet->isEndMarker()) {
if(!taskletQueue_.empty())
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
return;
}

// execute tasklet action
// execute tasklet
tasklet->run();
}
}

std::unique_ptr<std::thread> thread_;
std::vector<std::unique_ptr<std::thread> > threads_;
std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
std::mutex taskletQueueMutex_;
std::mutex runnerMutex_;
Expand Down