Skip to content

Commit

Permalink
Merge pull request #2588 from JasonRuonanWang/ssc-threading
Browse files Browse the repository at this point in the history
Add threading to SSC reader
  • Loading branch information
JasonRuonanWang authored Jan 19, 2021
2 parents 5948ca8 + 033b28f commit 321aa4d
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 80 deletions.
2 changes: 1 addition & 1 deletion docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The SSC engine takes the following parameters:

2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait.

3. ``Threading``: Default **False**. SSC will use threads to hide the time cost of metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**.
3. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
Expand Down
219 changes: 141 additions & 78 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
m_OpenTimeoutSecs);

Expand All @@ -43,6 +44,49 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::BeginStepConsequentFixed()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
}
}

void SscReader::BeginStepFlexible(StepStatus &status)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
m_LocalReadPattern.clear();
m_GlobalWritePatternJson.clear();
bool finalStep = SyncWritePattern();
if (finalStep)
{
status = StepStatus::EndOfStream;
return;
}
MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
}

StepStatus SscReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{
Expand All @@ -62,45 +106,23 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
m_LocalReadPattern.clear();
m_GlobalWritePatternJson.clear();
bool finalStep = SyncWritePattern();
if (finalStep)
{
return StepStatus::EndOfStream;
}

MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
}
else
{
if (m_MpiMode == "twosided")
if (m_Threading && m_EndStepThread.joinable())
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
m_EndStepThread.join();
}
else if (m_MpiMode == "onesidedfencepush")
else
{
MPI_Win_fence(0, m_MpiWin);
BeginStepFlexible(m_StepStatus);
}
else if (m_MpiMode == "onesidedpostpush")
if (m_StepStatus == StepStatus::EndOfStream)
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
return StepStatus::EndOfStream;
}
}
else
{
BeginStepConsequentFixed();
}

for (const auto &r : m_GlobalWritePattern)
{
Expand Down Expand Up @@ -254,6 +276,75 @@ void SscReader::PerformGets()

size_t SscReader::CurrentStep() const { return m_CurrentStep; }

void SscReader::EndStepFixed()
{
if (m_CurrentStep == 0)
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
}
}

void SscReader::EndStepFirstFlexible()
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
}

void SscReader::EndStepConsequentFlexible() { MPI_Win_free(&m_MpiWin); }

void SscReader::EndBeginStepFirstFlexible()
{
EndStepFirstFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndBeginStepConsequentFlexible()
{
EndStepConsequentFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -267,69 +358,41 @@ void SscReader::EndStep()

PerformGets();

if (m_WriterDefinitionsLocked &&
m_ReaderSelectionsLocked) // fixed IO pattern
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
EndStepFixed();
}
else
{
if (m_CurrentStep == 0)
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllReceivingWriterRanks)
if (m_Threading)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first,
0, m_StreamComm, &m_MpiRequests.back());
m_EndStepThread =
std::thread(&SscReader::EndBeginStepFirstFlexible, this);
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
else
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
EndStepFirstFlexible();
}
}
else if (m_MpiMode == "onesidedpostpull")
else
{
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
if (m_Threading)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
m_EndStepThread = std::thread(
&SscReader::EndBeginStepConsequentFlexible, this);
}
else
{
EndStepConsequentFlexible();
}
}
}
else // flexible IO pattern
{
MPI_Win_free(&m_MpiWin);
if (m_CurrentStep == 0)
{
SyncReadPattern();
}
}

m_StepBegun = false;
}

// PRIVATE

void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
Expand Down
12 changes: 11 additions & 1 deletion source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class SscReader : public Engine
MPI_Win m_MpiWin;
MPI_Group m_MpiAllWritersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
StepStatus m_StepStatus;
std::thread m_EndStepThread;

int m_StreamRank;
int m_StreamSize;
Expand All @@ -63,6 +64,13 @@ class SscReader : public Engine
void SyncMpiPattern();
bool SyncWritePattern();
void SyncReadPattern();
void BeginStepConsequentFixed();
void BeginStepFlexible(StepStatus &status);
void EndStepFixed();
void EndStepFirstFlexible();
void EndStepConsequentFlexible();
void EndBeginStepFirstFlexible();
void EndBeginStepConsequentFlexible();

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand All @@ -89,6 +97,8 @@ class SscReader : public Engine

int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down

0 comments on commit 321aa4d

Please sign in to comment.