Skip to content

Commit

Permalink
BP5 Flush Data
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Jul 30, 2021
1 parent f2c03cf commit 8b67093
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 87 deletions.
17 changes: 12 additions & 5 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,18 @@ class BP5Engine
* BP5 header for "Index Table" (64 bytes)
* for each Writer, what aggregator writes its data
* uint16_t [ WriterCount]
* for each timestep:
* uint64_t 0 : CombinedMetaDataPos
* uint64_t 1 : CombinedMetaDataSize
* for each Writer
* uint64_t DataPos (in the file above)
* for each timestep: (size (WriterCount + 2 ) 64-bit ints
* uint64_t 0 : CombinedMetaDataPos
* uint64_t 1 : CombinedMetaDataSize
* uint64_t 2 : FlushCount
* for each Writer
* for each flush before the last:
* uint64_t DataPos (in the file above)
* uint64_t DataSize
* for the final flush:
* uint64_t DataPos (in the file above)
* So, each timestep takes sizeof(uint64_t)* (3 + ((FlushCount-1)*2 +
*1) * WriterCount) bytes
*
* MetaMetadata file (mmd.0) contains FFS format information
* for each meta metadata item:
Expand Down
56 changes: 49 additions & 7 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination)
{
size_t DataStartPos = m_MetadataIndexTable[Timestep][2];
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = m_WriterToFileMap[WriterRank];
DataStartPos += WriterRank * sizeof(uint64_t);
size_t DataStart = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, DataStartPos, m_Minifooter.IsLittleEndian);

// check if subfile is already opened
if (m_DataFileManager.m_Transports.count(SubfileNum) == 0)
{
Expand All @@ -152,7 +151,35 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
m_DataFileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
}
m_DataFileManager.ReadFile(Destination, Length, DataStart + StartOffset,

size_t InfoStartPos =
DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
size_t ThisFlushInfo = InfoStartPos;
size_t RemainingLength = Length;
size_t ThisDataPos;
size_t Offset = StartOffset;
for (int flush = 0; flush < FlushCount; flush++)
{

ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
size_t ThisDataSize =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
if (ThisDataSize > RemainingLength)
ThisDataSize = RemainingLength;
m_DataFileManager.ReadFile(Destination, ThisDataSize,
ThisDataPos + Offset, SubfileNum);
Destination += ThisDataSize;
RemainingLength -= ThisDataSize;
Offset = 0;
if (RemainingLength == 0)
return;
}
ThisDataPos = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian);
m_DataFileManager.ReadFile(Destination, RemainingLength, ThisDataPos,
SubfileNum);
}

Expand Down Expand Up @@ -598,19 +625,34 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
m_MetadataIndexTable[currentStep] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[2] + sizeof(uint64_t) * i;
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize << "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif

position += sizeof(uint64_t) * m_WriterCount;
position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
} while (!oneStepOnly && position < buffer.size());
Expand Down
92 changes: 67 additions & 25 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)
false /* always copy */,
m_Parameters.BufferChunkSize));
}
m_ThisTimestepDataSize = 0;

return StepStatus::OK;
}

Expand Down Expand Up @@ -133,7 +135,7 @@ uint64_t BP5Writer::WriteMetadata(
return MetaDataSize;
}

void BP5Writer::WriteData(format::BufferV *Data, bool IsFinal)
void BP5Writer::WriteData(format::BufferV *Data)
{
format::BufferV::BufferV_iovec DataVec = Data->DataVec();
switch (m_Parameters.AggregationType)
Expand All @@ -153,15 +155,6 @@ void BP5Writer::WriteData(format::BufferV *Data, bool IsFinal)
std::to_string(m_Parameters.AggregationType) +
"is not supported in BP5");
}
if (IsFinal)
{
// next one is first
m_FirstDataWrite = true;
}
else
{
m_FirstDataWrite = false;
}
}

void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data,
Expand Down Expand Up @@ -251,23 +244,50 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,

m_FileMetadataManager.FlushFiles();

uint64_t buf[2];
std::vector<uint64_t> buf;
buf.resize(3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size());
buf[0] = MetaDataPos;
buf[1] = MetaDataSize;
m_FileMetadataIndexManager.WriteFiles((char *)buf, sizeof(buf));
m_FileMetadataIndexManager.WriteFiles((char *)m_WriterDataPos.data(),
m_WriterDataPos.size() *
sizeof(uint64_t));
/*std::cout << "Write Index positions = {";
for (size_t i = 0; i < m_WriterDataPos.size(); ++i)
{
std::cout << m_WriterDataPos[i];
if (i < m_WriterDataPos.size() - 1)
buf[2] = FlushPosSizeInfo.size();

uint64_t pos = 3;

for (int writer = 0; writer < m_Comm.Size(); writer++)
{
for (int flushNum = 0; flushNum < FlushPosSizeInfo.size(); flushNum++)
{
buf[pos + (flushNum * 2)] = FlushPosSizeInfo[flushNum][2 * writer];
buf[pos + (flushNum * 2) + 1] =
FlushPosSizeInfo[flushNum][2 * writer + 1];
}
buf[pos + FlushPosSizeInfo.size() * 2] = m_WriterDataPos[writer];
pos += (FlushPosSizeInfo.size() * 2) + 1;
}

m_FileMetadataIndexManager.WriteFiles((char *)buf.data(),
buf.size() * sizeof(uint64_t));

#ifdef DUMPDATALOCINFO
std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl;
std::cout << "Write Index positions = {" << std::endl;

for (size_t i = 0; i < m_Comm.Size(); ++i)
{
std::cout << "Writer " << i << " has data at: " << std::endl;
uint64_t eachWriterSize = FlushPosSizeInfo.size() * 2 + 1;
for (size_t j = 0; j < FlushPosSizeInfo.size(); ++j)
{
std::cout << ", ";
std::cout << "loc:" << buf[3 + eachWriterSize * i + j * 2]
<< " siz:" << buf[3 + eachWriterSize * i + j * 2 + 1]
<< std::endl;
}
std::cout << "loc:" << buf[3 + eachWriterSize * (i + 1) - 1]
<< std::endl;
}
std::cout << "}" << std::endl;*/
std::cout << "}" << std::endl;
#endif
/* reset for next timestep */
FlushPosSizeInfo.clear();
}

void BP5Writer::MarshalAttributes()
Expand Down Expand Up @@ -341,10 +361,11 @@ void BP5Writer::EndStep()

WriteData(TSInfo.DataBuffer);

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();

std::vector<char> MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous(
TSInfo.NewMetaMetaBlocks, TSInfo.MetaEncodeBuffer,
TSInfo.AttributeEncodeBuffer, TSInfo.DataBuffer->Size(),
m_StartDataPos);
TSInfo.AttributeEncodeBuffer, m_ThisTimestepDataSize, m_StartDataPos);

size_t LocalSize = MetaBuffer.size();
std::vector<size_t> RecvCounts = m_Comm.GatherValues(LocalSize, 0);
Expand Down Expand Up @@ -808,7 +829,28 @@ void BP5Writer::FlushData(const bool isFinal)
m_Parameters.BufferChunkSize));
}

WriteData(DataBuf, false);
WriteData(DataBuf);

m_ThisTimestepDataSize += DataBuf->Size();

if (!isFinal)
{
size_t tmp[2];
// aggregate start pos and data size to rank 0
tmp[0] = m_StartDataPos;
tmp[1] = DataBuf->Size();

std::vector<size_t> RecvBuffer;
if (m_Comm.Rank() == 0)
{
RecvBuffer.resize(m_Comm.Size() * 2);
}
m_Comm.GatherArrays(tmp, 2, RecvBuffer.data(), 0);
if (m_Comm.Rank() == 0)
{
FlushPosSizeInfo.push_back(RecvBuffer);
}
}
delete DataBuf;
}

Expand Down
10 changes: 8 additions & 2 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class BP5Writer : public BP5Engine, public core::Engine
const std::vector<format::BufferV::iovec> AttributeBlocks);

/** Write Data to disk, in an aggregator chain */
void WriteData(format::BufferV *Data, bool IsFinal = true);
void WriteData(format::BufferV *Data);
void WriteData_EveryoneWrites(format::BufferV *Data,
bool SerializedWriters);
void WriteData_TwoLevelShm(format::BufferV *Data);
Expand Down Expand Up @@ -169,7 +169,6 @@ class BP5Writer : public BP5Engine, public core::Engine
bool m_IAmWritingData = false;
helper::Comm *DataWritingComm; // processes that write the same data file
bool m_IAmWritingDataHeader = false;
bool m_FirstDataWrite = true;

private:
// updated during WriteMetaData
Expand All @@ -186,6 +185,11 @@ class BP5Writer : public BP5Engine, public core::Engine
*/
uint64_t m_DataPos = 0;

/*
* Total data written this timestep
*/
uint64_t m_ThisTimestepDataSize = 0;

/** rank 0 collects m_StartDataPos in this vector for writing it
* to the index file
*/
Expand All @@ -197,6 +201,8 @@ class BP5Writer : public BP5Engine, public core::Engine
// where each writer rank writes its data, init in InitBPBuffer;
std::vector<uint64_t> m_Assignment;

std::vector<std::vector<size_t>> FlushPosSizeInfo;

void MakeHeader(format::BufferSTL &b, const std::string fileType,
const bool isActive);
};
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/toolkit/format/bp5/BP5Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
"BP5Serializer:: Marshall without Prior Init");
}
DataOffset =
m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data, ElemSize, Sync);

if (!AlreadyWritten)
Expand Down Expand Up @@ -626,6 +627,7 @@ void BP5Serializer::InitStep(BufferV *DataBuffer)
throw std::logic_error("BP5Serializer:: InitStep without prior close");
}
CurDataBuffer = DataBuffer;
m_PriorDataBufferSizeTotal = 0;
}

BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer)
Expand All @@ -634,6 +636,9 @@ BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer)
{
throw std::logic_error("BP5Serializer:: ReinitStep without prior Init");
}
m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

BufferV *tmp = CurDataBuffer;
CurDataBuffer = DataBuffer;
return tmp;
Expand Down Expand Up @@ -697,6 +702,8 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
MBase->DataBlockSize = CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

MBase->DataBlockSize += m_PriorDataBufferSizeTotal;

void *MetaDataBlock = FFSencode(MetaEncodeBuffer, Info.MetaFormat,
MetadataBuf, &MetaDataSize);
BufferFFS *Metadata =
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/bp5/BP5Serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class BP5Serializer : virtual public BP5Base
BufferV *CurDataBuffer = NULL;
std::vector<MetaMetaInfoBlock> PreviousMetaMetaInfoBlocks;

size_t m_PriorDataBufferSizeTotal = 0;

BP5WriterRec LookupWriterRec(void *Key);
BP5WriterRec CreateWriterRec(void *Variable, const char *Name,
DataType Type, size_t ElemSize,
Expand Down
Loading

0 comments on commit 8b67093

Please sign in to comment.