Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threading to SSC reader #2588

Merged
merged 4 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The SSC engine takes the following parameters:

2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait.

3. ``Threading``: Default **False**. SSC will use threads to hide the time cost of metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**.
3. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
Expand Down
219 changes: 141 additions & 78 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
m_OpenTimeoutSecs);

Expand All @@ -43,6 +44,49 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::BeginStepConsequentFixed()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
}
}

void SscReader::BeginStepFlexible(StepStatus &status)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
m_LocalReadPattern.clear();
m_GlobalWritePatternJson.clear();
bool finalStep = SyncWritePattern();
if (finalStep)
{
status = StepStatus::EndOfStream;
return;
}
MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
}

StepStatus SscReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{
Expand All @@ -62,45 +106,23 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
m_LocalReadPattern.clear();
m_GlobalWritePatternJson.clear();
bool finalStep = SyncWritePattern();
if (finalStep)
{
return StepStatus::EndOfStream;
}

MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
}
else
{
if (m_MpiMode == "twosided")
if (m_Threading && m_EndStepThread.joinable())
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
m_EndStepThread.join();
}
else if (m_MpiMode == "onesidedfencepush")
else
{
MPI_Win_fence(0, m_MpiWin);
BeginStepFlexible(m_StepStatus);
}
else if (m_MpiMode == "onesidedpostpush")
if (m_StepStatus == StepStatus::EndOfStream)
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
return StepStatus::EndOfStream;
}
}
else
{
BeginStepConsequentFixed();
}

for (const auto &r : m_GlobalWritePattern)
{
Expand Down Expand Up @@ -254,6 +276,75 @@ void SscReader::PerformGets()

size_t SscReader::CurrentStep() const { return m_CurrentStep; }

void SscReader::EndStepFixed()
{
if (m_CurrentStep == 0)
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
}
}

void SscReader::EndStepFirstFlexible()
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
}

void SscReader::EndStepConsequentFlexible() { MPI_Win_free(&m_MpiWin); }

void SscReader::EndBeginStepFirstFlexible()
{
EndStepFirstFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndBeginStepConsequentFlexible()
{
EndStepConsequentFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -267,69 +358,41 @@ void SscReader::EndStep()

PerformGets();

if (m_WriterDefinitionsLocked &&
m_ReaderSelectionsLocked) // fixed IO pattern
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
EndStepFixed();
}
else
{
if (m_CurrentStep == 0)
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllReceivingWriterRanks)
if (m_Threading)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first,
0, m_StreamComm, &m_MpiRequests.back());
m_EndStepThread =
std::thread(&SscReader::EndBeginStepFirstFlexible, this);
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
else
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
EndStepFirstFlexible();
}
}
else if (m_MpiMode == "onesidedpostpull")
else
{
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
if (m_Threading)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
m_EndStepThread = std::thread(
&SscReader::EndBeginStepConsequentFlexible, this);
}
else
{
EndStepConsequentFlexible();
}
}
}
else // flexible IO pattern
{
MPI_Win_free(&m_MpiWin);
if (m_CurrentStep == 0)
{
SyncReadPattern();
}
}

m_StepBegun = false;
}

// PRIVATE

void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
Expand Down
12 changes: 11 additions & 1 deletion source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class SscReader : public Engine
MPI_Win m_MpiWin;
MPI_Group m_MpiAllWritersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
StepStatus m_StepStatus;
std::thread m_EndStepThread;

int m_StreamRank;
int m_StreamSize;
Expand All @@ -63,6 +64,13 @@ class SscReader : public Engine
void SyncMpiPattern();
bool SyncWritePattern();
void SyncReadPattern();
void BeginStepConsequentFixed();
void BeginStepFlexible(StepStatus &status);
void EndStepFixed();
void EndStepFirstFlexible();
void EndStepConsequentFlexible();
void EndBeginStepFirstFlexible();
void EndBeginStepConsequentFlexible();

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand All @@ -89,6 +97,8 @@ class SscReader : public Engine

int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down