Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace OpenMP with tasklets #318

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions ewoms/common/simulator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,22 @@
#include <ewoms/common/propertysystem.hh>
#include <ewoms/common/timer.hh>
#include <ewoms/common/timerguard.hh>
#include <ewoms/parallel/tasklets.hh>

#include <dune/common/version.hh>
#include <dune/common/parallel/mpihelper.hh>

#ifdef _OPENMP
#include <omp.h>
#endif

#include <iostream>
#include <fstream>
#include <iomanip>
#include <vector>
#include <string>
#include <memory>
#include <cstdlib>

BEGIN_PROPERTIES

Expand All @@ -56,6 +62,7 @@ NEW_PROP_TAG(EndTime);
NEW_PROP_TAG(RestartTime);
NEW_PROP_TAG(InitialTimeStepSize);
NEW_PROP_TAG(PredeterminedTimeStepsFile);
NEW_PROP_TAG(ThreadsPerProcess);

END_PROPERTIES

Expand Down Expand Up @@ -100,6 +107,8 @@ public:
endTime_ = EWOMS_GET_PARAM(TypeTag, Scalar, EndTime);
timeStepSize_ = EWOMS_GET_PARAM(TypeTag, Scalar, InitialTimeStepSize);

taskletRunner_.reset(new TaskletRunner(numWorkerThreads()));

const std::string& predetTimeStepFile =
EWOMS_GET_PARAM(TypeTag, std::string, PredeterminedTimeStepsFile);
if (!predetTimeStepFile.empty()) {
Expand Down Expand Up @@ -525,6 +534,15 @@ public:
* \}
*/

/*!
* \brief Returns the object that herds the flock of worker threads.
*
* Note that the returned object is always non-const, i.e., tasklets can be
* dispatched even if the model object which it belongs to is constant.
*/
TaskletRunner& taskletRunner() const
{ return *taskletRunner_; }

/*!
* \brief Runs the simulation using a given problem class.
*
Expand Down Expand Up @@ -850,6 +868,46 @@ public:
restarter.deserializeSectionEnd();
}

/*!
* \brief Determines the number of worker threads that ought to be used by the object
* returned by taskletManager()
*
* This function shall not be used in performance critical paths because it is
* relatively expensive. On the plus-side it works even before the simulator object
* has been fully initialized. (This avoids some catch-22s with the model and the
* problem objects.)
*/
size_t numWorkerThreads() const
{
int numWorkerThreads = EWOMS_GET_PARAM(TypeTag, int, ThreadsPerProcess);

#ifdef _OPENMP
// actually limit the number of threads and get the number of threads which are
// used in the end.
if (numWorkerThreads < 0)
numWorkerThreads = omp_get_max_threads();
#endif

if (numWorkerThreads < 2)
// instead of just a single worker thread, use synchronous mode, i.e., do all
// work in the main thread.
numWorkerThreads = 0;

return static_cast<size_t>(numWorkerThreads);
}

/*!
* \brief Determines the number of threads used by the object returned by
* taskletManager(), including the master thread.
*
* This function shall not be used in performance critical paths because it is
* relatively expensive. On the plus-side it works even before the simulator object
* has been fully initialized. (This avoids some catch-22s with the model and the
* problem objects.)
*/
size_t numThreads() const
{ return numWorkerThreads() + 1; }

private:
std::unique_ptr<Vanguard> vanguard_;
std::unique_ptr<Model> model_;
Expand All @@ -868,6 +926,9 @@ private:
Ewoms::Timer writeTimer_;

std::vector<Scalar> forcedTimeSteps_;

mutable std::unique_ptr<TaskletRunner> taskletRunner_;

Scalar startTime_;
Scalar time_;
Scalar endTime_;
Expand Down
5 changes: 0 additions & 5 deletions ewoms/common/start.hh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ template <class TypeTag>
static inline void registerAllParameters_(bool finalizeRegistration = true)
{
typedef typename GET_PROP_TYPE(TypeTag, Simulator) Simulator;
typedef typename GET_PROP_TYPE(TypeTag, ThreadManager) ThreadManager;

EWOMS_REGISTER_PARAM(TypeTag, std::string, ParameterFile,
"An .ini file which contains a set of run-time "
Expand All @@ -104,7 +103,6 @@ static inline void registerAllParameters_(bool finalizeRegistration = true)
"start of the simulation");

Simulator::registerParameters();
ThreadManager::registerParameters();

if (finalizeRegistration)
EWOMS_END_PARAM_REGISTRATION(TypeTag);
Expand Down Expand Up @@ -250,7 +248,6 @@ static inline int start(int argc, char **argv)
typedef typename GET_PROP_TYPE(TypeTag, Scalar) Scalar;
typedef typename GET_PROP_TYPE(TypeTag, Simulator) Simulator;
typedef typename GET_PROP_TYPE(TypeTag, Problem) Problem;
typedef typename GET_PROP_TYPE(TypeTag, ThreadManager) ThreadManager;

// set the signal handlers to reset the TTY to a well defined state on unexpected
// program aborts
Expand All @@ -275,8 +272,6 @@ static inline int start(int argc, char **argv)
if (paramStatus == 2)
return 0;

ThreadManager::init();

// initialize MPI, finalize is done automatically on exit
#if HAVE_DUNE_FEM
Dune::Fem::MPIManager::initialize(argc, argv);
Expand Down
55 changes: 33 additions & 22 deletions ewoms/disc/common/fvbasediscretization.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include "baseauxiliarymodule.hh"

#include <ewoms/parallel/gridcommhandles.hh>
#include <ewoms/parallel/threadmanager.hh>
#include <ewoms/linear/nullborderlistmanager.hh>
#include <ewoms/linear/istlsparsematrixadapter.hh>
#include <ewoms/common/simulator.hh>
Expand Down Expand Up @@ -210,7 +209,7 @@ SET_TYPE_PROP(FvBaseDiscretization, BoundaryContext, Ewoms::FvBaseBoundaryContex
SET_TYPE_PROP(FvBaseDiscretization, ConstraintsContext, Ewoms::FvBaseConstraintsContext<TypeTag>);

/*!
* \brief The OpenMP threads manager
* \brief The multi-threading related properties
*/
SET_TYPE_PROP(FvBaseDiscretization, ThreadManager, Ewoms::ThreadManager<TypeTag>);
SET_INT_PROP(FvBaseDiscretization, ThreadsPerProcess, 1);
Expand Down Expand Up @@ -321,7 +320,6 @@ class FvBaseDiscretization
typedef typename GET_PROP_TYPE(TypeTag, DiscBaseOutputModule) DiscBaseOutputModule;
typedef typename GET_PROP_TYPE(TypeTag, GridCommHandleFactory) GridCommHandleFactory;
typedef typename GET_PROP_TYPE(TypeTag, NewtonMethod) NewtonMethod;
typedef typename GET_PROP_TYPE(TypeTag, ThreadManager) ThreadManager;

typedef typename GET_PROP_TYPE(TypeTag, LocalLinearizer) LocalLinearizer;
typedef typename GET_PROP_TYPE(TypeTag, LocalResidual) LocalResidual;
Expand Down Expand Up @@ -395,7 +393,7 @@ public:
, vertexMapper_(gridView_)
#endif
, newtonMethod_(simulator)
, localLinearizer_(ThreadManager::maxThreads())
, localLinearizer_(simulator.numThreads())
, linearizer_(new Linearizer())
#if HAVE_DUNE_FEM
, space_( simulator.vanguard().gridPart() )
Expand Down Expand Up @@ -468,6 +466,7 @@ public:
// register runtime parameters of the output modules
Ewoms::VtkPrimaryVarsModule<TypeTag>::registerParameters();

EWOMS_REGISTER_PARAM(TypeTag, int, ThreadsPerProcess, "The number of worker threads spawned for each MPI process.");
EWOMS_REGISTER_PARAM(TypeTag, bool, EnableGridAdaptation, "Enable adaptive grid refinement/coarsening");
EWOMS_REGISTER_PARAM(TypeTag, bool, EnableVtkOutput, "Global switch for turning on writing VTK files");
EWOMS_REGISTER_PARAM(TypeTag, bool, EnableThermodynamicHints, "Enable thermodynamic hints");
Expand Down Expand Up @@ -535,7 +534,8 @@ public:
gridTotalVolume_ = gridView_.comm().sum(gridTotalVolume_);

linearizer_->init(simulator_);
for (unsigned threadId = 0; threadId < ThreadManager::maxThreads(); ++threadId)
int numThreads = std::max(1, simulator_.taskletRunner().numWorkerThreads());
for (int threadId = 0; threadId < numThreads; ++threadId)
localLinearizer_[threadId].init(simulator_);

resizeAndResetIntensiveQuantitiesCache_();
Expand Down Expand Up @@ -833,14 +833,14 @@ public:
dest = 0;

std::mutex mutex;
auto& taskletRunner = simulator_.taskletRunner();
int numThreads = std::max(taskletRunner.numWorkerThreads(), 1);
ThreadedEntityIterator<GridView, /*codim=*/0> threadedElemIt(gridView_);
#ifdef _OPENMP
#pragma omp parallel
#endif
auto globalResidLambda = [this, &mutex, &threadedElemIt, &dest]() -> void
{
// Attention: the variables below are thread specific and thus cannot be
// moved in front of the #pragma!
unsigned threadId = ThreadManager::threadId();
// moved out of the lambda!
int threadId = std::max(simulator_.taskletRunner().workerThreadIndex(), 0);
ElementContext elemCtx(simulator_);
ElementIterator elemIt = threadedElemIt.beginParallel();
LocalEvalBlockVector residual, storageTerm;
Expand All @@ -864,7 +864,10 @@ public:
}
mutex.unlock();
}
}
};

taskletRunner.dispatchFunction(globalResidLambda, /*numInvokations=*/numThreads);
taskletRunner.barrier();

// add up the residuals on the process borders
const auto sumHandle =
Expand Down Expand Up @@ -895,13 +898,12 @@ public:

std::mutex mutex;
ThreadedEntityIterator<GridView, /*codim=*/0> threadedElemIt(gridView());
#ifdef _OPENMP
#pragma omp parallel
#endif

auto globalStorageLambda = [this, &mutex, &threadedElemIt, &storage, timeIdx]() -> void
{
// Attention: the variables below are thread specific and thus cannot be
// moved in front of the #pragma!
unsigned threadId = ThreadManager::threadId();
// moved out of the lambda!
unsigned threadId = std::max(0, simulator_.taskletRunner().workerThreadIndex());
ElementContext elemCtx(simulator_);
ElementIterator elemIt = threadedElemIt.beginParallel();
LocalEvalBlockVector elemStorage;
Expand Down Expand Up @@ -929,7 +931,12 @@ public:
storage[eqIdx] += Toolbox::value(elemStorage[dofIdx][eqIdx]);
mutex.unlock();
}
}
};

auto& taskletRunner = simulator_.taskletRunner();
unsigned numThreads = std::max(1, taskletRunner.numWorkerThreads());
taskletRunner.dispatchFunction(globalStorageLambda, /*numInvokations=*/numThreads);
taskletRunner.barrier();

storage = gridView_.comm().sum(storage);
}
Expand Down Expand Up @@ -1666,10 +1673,10 @@ public:

// iterate over grid
ThreadedEntityIterator<GridView, /*codim=*/0> threadedElemIt(gridView());
#ifdef _OPENMP
#pragma omp parallel
#endif

auto prepareOutputLambda = [this, &threadedElemIt, needFullContextUpdate]() -> void
{
const auto& modEndIt = outputModules_.end();
ElementContext elemCtx(simulator_);
ElementIterator elemIt = threadedElemIt.beginParallel();
for (; !threadedElemIt.isFinished(elemIt); elemIt = threadedElemIt.increment()) {
Expand All @@ -1692,7 +1699,12 @@ public:
for (; modIt2 != modEndIt; ++modIt2)
(*modIt2)->processElement(elemCtx);
}
}
};

auto& taskletRunner = simulator_.taskletRunner();
int numThreads = std::max(1, taskletRunner.numWorkerThreads());
taskletRunner.dispatchFunction(prepareOutputLambda, /*numInvokations=*/numThreads);
taskletRunner.barrier();
}

/*!
Expand Down Expand Up @@ -1907,7 +1919,6 @@ protected:
std::unique_ptr<AdaptationManager> adaptationManager_;
#endif


std::list<BaseOutputModule<TypeTag>*> outputModules_;

Scalar gridTotalVolume_;
Expand Down
Loading