Skip to content

Commit

Permalink
Merge pull request #2225 from philip-davis/wrelease
Browse files Browse the repository at this point in the history
Handle pending releases when closing writer
  • Loading branch information
eisenhauer authored May 12, 2020
2 parents b155055 + fe4f9b8 commit 4b6a5ed
Showing 1 changed file with 68 additions and 2 deletions.
70 changes: 68 additions & 2 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ extern void CP_verbose(SstStream Stream, char *Format, ...);
static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
enum StreamStatus NewState);

static void ProcessReleaseList(SstStream Stream, ReturnMetadataInfo Metadata);

#define gettid() pthread_self()
#ifdef MUTEX_DEBUG
#define STREAM_MUTEX_LOCK(Stream) \
Expand Down Expand Up @@ -1488,6 +1490,21 @@ void SstWriterClose(SstStream Stream)
if ((Stream->ConfigParams->CPCommPattern == SstCPCommPeer) ||
(Stream->Rank == 0))
{
if (Stream->ReleaseCount > 0)
{
if (Stream->ConfigParams->CPCommPattern == SstCPCommMin)
{
SMPI_Bcast(&Stream->ReleaseCount, 1, SMPI_INT, 0,
Stream->mpiComm);
SMPI_Bcast(Stream->ReleaseList,
Stream->ReleaseCount *
sizeof(*(Stream->ReleaseList)),
SMPI_BYTE, 0, Stream->mpiComm);
}
Stream->ReleaseCount = 0;
free(Stream->ReleaseList);
Stream->ReleaseList = NULL;
}
while (Stream->QueuedTimesteps)
{
CP_verbose(Stream,
Expand All @@ -1502,7 +1519,7 @@ void SstWriterClose(SstStream Stream)
{
char tmp[20];
CP_verbose(Stream,
"IN TS WAIT, ENTRIES areTimestep %ld (exp %d, "
"IN TS WAIT, ENTRIES are Timestep %ld (exp %d, "
"Prec %d, Ref %d), Count now %d\n",
List->Timestep, List->Expired,
List->PreciousTimestep, List->ReferenceCount,
Expand All @@ -1526,18 +1543,67 @@ void SstWriterClose(SstStream Stream)
}
/* NEED TO HANDLE FAILURE HERE */
STREAM_CONDITION_WAIT(Stream);
if (Stream->ConfigParams->CPCommPattern == SstCPCommMin)
{
SMPI_Bcast(&Stream->ReleaseCount, 1, SMPI_INT, 0,
Stream->mpiComm);
if (Stream->ReleaseCount > 0)
{
SMPI_Bcast(Stream->ReleaseList,
Stream->ReleaseCount *
sizeof(*(Stream->ReleaseList)),
SMPI_BYTE, 0, Stream->mpiComm);
Stream->ReleaseCount = 0;
free(Stream->ReleaseList);
Stream->ReleaseList = NULL;
}
}
}
if (Stream->ConfigParams->CPCommPattern == SstCPCommMin)
{
Stream->ReleaseCount = -1;
SMPI_Bcast(&Stream->ReleaseCount, 1, SMPI_INT, 0, Stream->mpiComm);
Stream->ReleaseCount = 0;
}
}

if (Stream->ConfigParams->CPCommPattern == SstCPCommMin)
{
if (Stream->Rank != 0)
{
struct _ReturnMetadataInfo ReleaseData;
while (1)
{
SMPI_Bcast(&ReleaseData.ReleaseCount, 1, SMPI_INT, 0,
Stream->mpiComm);
if (ReleaseData.ReleaseCount == -1)
{
break;
}
else if (ReleaseData.ReleaseCount > 0)
{
ReleaseData.ReleaseList =
malloc(ReleaseData.ReleaseCount *
sizeof(*ReleaseData.ReleaseList));
SMPI_Bcast(ReleaseData.ReleaseList,
ReleaseData.ReleaseCount *
sizeof(*ReleaseData.ReleaseList),
SMPI_BYTE, 0, Stream->mpiComm);
STREAM_MUTEX_UNLOCK(Stream);
ProcessReleaseList(Stream, &ReleaseData);
STREAM_MUTEX_LOCK(Stream);
free(ReleaseData.ReleaseList);
ReleaseData.ReleaseList = NULL;
}
}
}
/*
* if we're CommMin, getting here implies that Rank 0 has released all
* timesteps, other ranks can follow suit after barrier
*/
STREAM_MUTEX_UNLOCK(Stream);
SMPI_Barrier(Stream->mpiComm);
STREAM_MUTEX_LOCK(Stream);
ReleaseAndDiscardRemainingTimesteps(Stream);
}
STREAM_MUTEX_UNLOCK(Stream);
gettimeofday(&CloseTime, NULL);
Expand Down

0 comments on commit 4b6a5ed

Please sign in to comment.