From ac7e5746f283e981ffdd4acdacd95b20ce1c4638 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Thu, 15 Mar 2018 11:05:10 +0100 Subject: [PATCH] generalize the tasklet mechanism to an arbitrary number of worker threads this has been quite a bit more complicated than I initially thought. --- ebos/eclwriter.hh | 6 +- ewoms/parallel/tasklets.hh | 137 +++++++++++++++++++++++++++---------- 2 files changed, 103 insertions(+), 40 deletions(-) diff --git a/ebos/eclwriter.hh b/ebos/eclwriter.hh index efee07933b..bf2cd06ebb 100644 --- a/ebos/eclwriter.hh +++ b/ebos/eclwriter.hh @@ -115,8 +115,10 @@ public: // create output thread if enabled and rank is I/O rank // async output is enabled by default if pthread are enabled bool enableAsyncOutput = EWOMS_GET_PARAM(TypeTag, bool, EnableAsyncEclOutput); - bool createOutputThread = enableAsyncOutput && collectToIORank_.isIORank(); - taskletRunner_.reset(new TaskletRunner(createOutputThread)); + int numWorkerThreads = 0; + if (enableAsyncOutput && collectToIORank_.isIORank()) + numWorkerThreads = 1; + taskletRunner_.reset(new TaskletRunner(numWorkerThreads)); } ~EclWriter() diff --git a/ewoms/parallel/tasklets.hh b/ewoms/parallel/tasklets.hh index 403ae91990..6202ea69e6 100644 --- a/ewoms/parallel/tasklets.hh +++ b/ewoms/parallel/tasklets.hh @@ -30,8 +30,9 @@ #include #include #include -#include #include +#include +#include namespace Ewoms { @@ -43,17 +44,28 @@ namespace Ewoms { class TaskletInterface { public: - TaskletInterface() {} + TaskletInterface(int refCount = 1) + : referenceCount_(refCount) + {} virtual ~TaskletInterface() {} virtual void run() = 0; virtual bool isEndMarker () const { return false; } + + void dereference() + { -- referenceCount_; } + + int referenceCount() const + { return referenceCount_; } + +private: + int referenceCount_; }; /*! * \brief Handles where a given tasklet is run. * - * Depending on the "runAsync" constructor parameter, a tasklet can either be a separate - * worker thread or it can be the main thread. + * Depending on the number of worker threads, a tasklet can either be run in a separate + * worker thread or by the main thread. */ class TaskletRunner { @@ -61,13 +73,41 @@ class TaskletRunner class BarrierTasklet : public TaskletInterface { public: - BarrierTasklet() - { barrierMutex.lock(); } + BarrierTasklet(unsigned numWorkers) + : TaskletInterface(/*refCount=*/numWorkers) + { + numWorkers_ = numWorkers; + numWaiting_ = 0; + } void run() - { barrierMutex.unlock(); } + { wait(); } + + void wait() + { + numWaitingMutex_.lock(); + numWaiting_ += 1; + if (numWaiting_ >= numWorkers_ + 1) + barrierCondition_.notify_all(); + numWaitingMutex_.unlock(); + + if (numWaiting_ < numWorkers_ + 1) { + const auto& areAllWaiting = + [this]() -> bool + { return this->numWaiting_ >= this->numWorkers_ + 1; }; + + std::unique_lock lock(barrierMutex_); + barrierCondition_.wait(lock, /*predicate=*/areAllWaiting); + } + } - std::mutex barrierMutex; + private: + unsigned numWorkers_; + unsigned numWaiting_; + + std::condition_variable barrierCondition_; + std::mutex numWaitingMutex_; + std::mutex barrierMutex_; }; /// \brief TerminateThreadTasklet class @@ -86,32 +126,39 @@ public: // prohibit copying of tasklet runners TaskletRunner(const TaskletRunner&) = delete; - TaskletRunner(const bool runAsync) + /*! + * \brief Creates a tasklet runner with numWorkers underling threads for doing work. + * + * The number of worker threads may be 0. In this case, all work is done by the main + * thread (synchronous mode). + */ + TaskletRunner(unsigned numWorkers) { - if (runAsync) { - // make sure that the runner thread blocks when the tasklet queue is empty - runnerMutex_.lock(); + // make sure that the worker threads block when the tasklet queue is empty + runnerMutex_.lock(); + threads_.resize(numWorkers); + for (int i = 0; i < numWorkers; ++i) // create a worker thread - thread_.reset(new std::thread(startThread_, this)); - } + threads_[i].reset(new std::thread(startWorkerThread_, this)); } /*! * \brief Destructor * - * If a worker thread was created to run the tasklets, this method waits until the - * worker thread has been terminated, i.e. all scheduled tasklets are guaranteed to + * If worker threads were created to run the tasklets, this method waits until all + * worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to * be completed. */ ~TaskletRunner() { - if (thread_) { + if (threads_.size() > 0) { // dispatch a tasklet which will terminate the worker thread dispatch(std::make_shared()); - // wait until the thread has been terminated - thread_->join(); + // wait until all worker threads have terminated + for (auto& thread : threads_) + thread->join(); } } @@ -122,9 +169,13 @@ public: */ void dispatch(std::shared_ptr tasklet) { - if (!thread_) + if (threads_.empty()) { // run the tasklet immediately in synchronous mode. - tasklet->run(); + while (tasklet->referenceCount() > 0) { + tasklet->dereference(); + tasklet->run(); + } + } else { // lock mutex for the tasklet queue to make sure that nobody messes with the // task queue @@ -132,7 +183,7 @@ public: // add the tasklet to the queue taskletQueue_.push(tasklet); - // fire up the worker thread + // fire up a worker thread runnerMutex_.unlock(); taskletQueueMutex_.unlock(); @@ -144,19 +195,21 @@ public: */ void barrier() { - if (!thread_) + unsigned numWorkers = threads_.size(); + if (numWorkers == 0) // nothing needs to be done to implement a barrier in synchronous mode return; // dispatch a barrier tasklet and wait until it has been run by the worker thread - auto barrierTasklet = std::make_shared(); + auto barrierTasklet = std::make_shared(numWorkers); dispatch(barrierTasklet); - barrierTasklet->barrierMutex.lock(); + + barrierTasklet->wait(); } protected: // main function of the worker thread - static void startThread_(TaskletRunner* taskletRunner) + static void startWorkerThread_(TaskletRunner* taskletRunner) { taskletRunner->run_(); } //! do the work until the queue received an end tasklet @@ -173,29 +226,37 @@ protected: // remove tasklet from queue std::shared_ptr tasklet = taskletQueue_.front(); - taskletQueue_.pop(); - // if the queue is not yet empty, make sure that we are will process the next - // tasklet immediately after we're finished with the current one. + // if tasklet is an end marker, terminate the thread and DO NOT remove the + // tasklet. + if (tasklet->isEndMarker()) { + if(taskletQueue_.size() > 1) + throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); + taskletQueueMutex_.unlock(); + runnerMutex_.unlock(); + return; + } + + tasklet->dereference(); + if (tasklet->referenceCount() == 0) + // remove tasklets from the queue as soon as their reference count + // reaches zero, i.e. the tasklet has been run often enough. + taskletQueue_.pop(); + + // if the queue is not yet empty, make sure that we the next tasklet + // can be processed immediately. if (!taskletQueue_.empty()) runnerMutex_.unlock(); // unlock mutex for access to taskletQueue_ taskletQueueMutex_.unlock(); - // if tasklet is an end marker, terminate the thread - if (tasklet->isEndMarker()) { - if(!taskletQueue_.empty()) - throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); - return; - } - - // execute tasklet action + // execute tasklet tasklet->run(); } } - std::unique_ptr thread_; + std::vector > threads_; std::queue > taskletQueue_; std::mutex taskletQueueMutex_; std::mutex runnerMutex_;