Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP5 flush data #2807

Merged
merged 1 commit into from
Jul 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,18 @@ class BP5Engine
* BP5 header for "Index Table" (64 bytes)
* for each Writer, what aggregator writes its data
* uint16_t [ WriterCount]
* for each timestep:
* uint64_t 0 : CombinedMetaDataPos
* uint64_t 1 : CombinedMetaDataSize
* for each Writer
* uint64_t DataPos (in the file above)
* for each timestep: (size (WriterCount + 2 ) 64-bit ints
* uint64_t 0 : CombinedMetaDataPos
* uint64_t 1 : CombinedMetaDataSize
* uint64_t 2 : FlushCount
* for each Writer
* for each flush before the last:
* uint64_t DataPos (in the file above)
* uint64_t DataSize
* for the final flush:
* uint64_t DataPos (in the file above)
* So, each timestep takes sizeof(uint64_t)* (3 + ((FlushCount-1)*2 +
*1) * WriterCount) bytes
*
* MetaMetadata file (mmd.0) contains FFS format information
* for each meta metadata item:
Expand Down
56 changes: 49 additions & 7 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination)
{
size_t DataStartPos = m_MetadataIndexTable[Timestep][2];
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = m_WriterToFileMap[WriterRank];
DataStartPos += WriterRank * sizeof(uint64_t);
size_t DataStart = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, DataStartPos, m_Minifooter.IsLittleEndian);

// check if subfile is already opened
if (m_DataFileManager.m_Transports.count(SubfileNum) == 0)
{
Expand All @@ -152,7 +151,35 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
m_DataFileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
}
m_DataFileManager.ReadFile(Destination, Length, DataStart + StartOffset,

size_t InfoStartPos =
DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
size_t ThisFlushInfo = InfoStartPos;
size_t RemainingLength = Length;
size_t ThisDataPos;
size_t Offset = StartOffset;
for (int flush = 0; flush < FlushCount; flush++)
{

ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
size_t ThisDataSize =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
if (ThisDataSize > RemainingLength)
ThisDataSize = RemainingLength;
m_DataFileManager.ReadFile(Destination, ThisDataSize,
ThisDataPos + Offset, SubfileNum);
Destination += ThisDataSize;
RemainingLength -= ThisDataSize;
Offset = 0;
if (RemainingLength == 0)
return;
}
ThisDataPos = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian);
m_DataFileManager.ReadFile(Destination, RemainingLength, ThisDataPos,
SubfileNum);
}

Expand Down Expand Up @@ -601,19 +628,34 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
m_MetadataIndexTable[currentStep] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[2] + sizeof(uint64_t) * i;
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize << "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif

position += sizeof(uint64_t) * m_WriterCount;
position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
} while (!oneStepOnly && position < buffer.size());
Expand Down
114 changes: 78 additions & 36 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)
false /* always copy */,
m_Parameters.BufferChunkSize));
}
m_ThisTimestepDataSize = 0;

return StepStatus::OK;
}

Expand Down Expand Up @@ -242,23 +244,50 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,

m_FileMetadataManager.FlushFiles();

uint64_t buf[2];
std::vector<uint64_t> buf;
buf.resize(3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size());
buf[0] = MetaDataPos;
buf[1] = MetaDataSize;
m_FileMetadataIndexManager.WriteFiles((char *)buf, sizeof(buf));
m_FileMetadataIndexManager.WriteFiles((char *)m_WriterDataPos.data(),
m_WriterDataPos.size() *
sizeof(uint64_t));
/*std::cout << "Write Index positions = {";
for (size_t i = 0; i < m_WriterDataPos.size(); ++i)
{
std::cout << m_WriterDataPos[i];
if (i < m_WriterDataPos.size() - 1)
buf[2] = FlushPosSizeInfo.size();

uint64_t pos = 3;

for (int writer = 0; writer < m_Comm.Size(); writer++)
{
for (int flushNum = 0; flushNum < FlushPosSizeInfo.size(); flushNum++)
{
std::cout << ", ";
buf[pos + (flushNum * 2)] = FlushPosSizeInfo[flushNum][2 * writer];
buf[pos + (flushNum * 2) + 1] =
FlushPosSizeInfo[flushNum][2 * writer + 1];
}
buf[pos + FlushPosSizeInfo.size() * 2] = m_WriterDataPos[writer];
pos += (FlushPosSizeInfo.size() * 2) + 1;
}
std::cout << "}" << std::endl;*/

m_FileMetadataIndexManager.WriteFiles((char *)buf.data(),
buf.size() * sizeof(uint64_t));

#ifdef DUMPDATALOCINFO
std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl;
std::cout << "Write Index positions = {" << std::endl;

for (size_t i = 0; i < m_Comm.Size(); ++i)
{
std::cout << "Writer " << i << " has data at: " << std::endl;
uint64_t eachWriterSize = FlushPosSizeInfo.size() * 2 + 1;
for (size_t j = 0; j < FlushPosSizeInfo.size(); ++j)
{
std::cout << "loc:" << buf[3 + eachWriterSize * i + j * 2]
<< " siz:" << buf[3 + eachWriterSize * i + j * 2 + 1]
<< std::endl;
}
std::cout << "loc:" << buf[3 + eachWriterSize * (i + 1) - 1]
<< std::endl;
}
std::cout << "}" << std::endl;
#endif
/* reset for next timestep */
FlushPosSizeInfo.clear();
}

void BP5Writer::MarshalAttributes()
Expand Down Expand Up @@ -332,10 +361,11 @@ void BP5Writer::EndStep()

WriteData(TSInfo.DataBuffer);

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();

std::vector<char> MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous(
TSInfo.NewMetaMetaBlocks, TSInfo.MetaEncodeBuffer,
TSInfo.AttributeEncodeBuffer, TSInfo.DataBuffer->Size(),
m_StartDataPos);
TSInfo.AttributeEncodeBuffer, m_ThisTimestepDataSize, m_StartDataPos);

size_t LocalSize = MetaBuffer.size();
std::vector<size_t> RecvCounts = m_Comm.GatherValues(LocalSize, 0);
Expand Down Expand Up @@ -765,7 +795,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType,
(helper::IsRowMajor(m_IO.m_HostLanguage) == false) ? 'y' : 'n';
helper::CopyToBuffer(buffer, position, &columnMajor);

// byte 45-63: unused
// byte 49-63: unused
position += 15;
absolutePosition = position;
}
Expand Down Expand Up @@ -807,41 +837,53 @@ void BP5Writer::InitBPBuffer()
}
}

void BP5Writer::DoFlush(const bool isFinal, const int transportIndex)
void BP5Writer::FlushData(const bool isFinal)
{
m_FileMetadataManager.FlushFiles();
m_FileMetaMetadataManager.FlushFiles();
m_FileDataManager.FlushFiles();

// true: advances step
// BufferV *DataBuf;
BufferV *DataBuf;
if (m_Parameters.BufferVType == (int)BufferVType::MallocVType)
{
// DataBuf = m_BP5Serializer.ReinitStepData(new
// MallocV("BP5Writer", false,
// m_Parameters.InitialBufferSize,
// m_Parameters.GrowthFactor));
DataBuf = m_BP5Serializer.ReinitStepData(
new MallocV("BP5Writer", false, m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
}
else
{
// DataBuf = m_BP5Serializer.ReinitStepData(new
// ChunkV("BP5Writer",
// false /* always
// copy
//*/,
// m_Parameters.BufferChunkSize));
DataBuf = m_BP5Serializer.ReinitStepData(
new ChunkV("BP5Writer", false /* always copy */,
m_Parameters.BufferChunkSize));
}

// WriteData(DataBuf);
// delete DataBuf;
WriteData(DataBuf);

m_ThisTimestepDataSize += DataBuf->Size();

if (!isFinal)
{
size_t tmp[2];
// aggregate start pos and data size to rank 0
tmp[0] = m_StartDataPos;
tmp[1] = DataBuf->Size();

std::vector<size_t> RecvBuffer;
if (m_Comm.Rank() == 0)
{
RecvBuffer.resize(m_Comm.Size() * 2);
}
m_Comm.GatherArrays(tmp, 2, RecvBuffer.data(), 0);
if (m_Comm.Rank() == 0)
{
FlushPosSizeInfo.push_back(RecvBuffer);
}
}
delete DataBuf;
}

void BP5Writer::Flush(const int transportIndex) { FlushData(false); }

void BP5Writer::DoClose(const int transportIndex)
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::Close");

DoFlush(true, transportIndex);

m_FileDataManager.CloseFiles(transportIndex);
// Delete files from temporary storage if draining was on

Expand Down
10 changes: 9 additions & 1 deletion source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class BP5Writer : public BP5Engine, public core::Engine
size_t CurrentStep() const final;
void PerformPuts() final;
void EndStep() final;
void Flush(const int transportIndex = -1) final;

private:
/** Single object controlling BP buffering */
Expand Down Expand Up @@ -131,7 +132,7 @@ class BP5Writer : public BP5Engine, public core::Engine
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

void DoFlush(const bool isFinal = false, const int transportIndex = -1);
void FlushData(const bool isFinal = false);

void DoClose(const int transportIndex = -1) final;

Expand Down Expand Up @@ -199,6 +200,11 @@ class BP5Writer : public BP5Engine, public core::Engine
*/
uint64_t m_DataPos = 0;

/*
* Total data written this timestep
*/
uint64_t m_ThisTimestepDataSize = 0;

/** rank 0 collects m_StartDataPos in this vector for writing it
* to the index file
*/
Expand All @@ -210,6 +216,8 @@ class BP5Writer : public BP5Engine, public core::Engine
// where each writer rank writes its data, init in InitBPBuffer;
std::vector<uint64_t> m_Assignment;

std::vector<std::vector<size_t>> FlushPosSizeInfo;

void MakeHeader(format::BufferSTL &b, const std::string fileType,
const bool isActive);
};
Expand Down
22 changes: 20 additions & 2 deletions source/adios2/toolkit/format/bp5/BP5Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,14 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,

if (span == nullptr)
{
DataOffset = CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
ElemSize, Sync);
}
else
{
*span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize);
DataOffset = span->globalPos;
DataOffset = m_PriorDataBufferSizeTotal + span->globalPos;
}

if (!AlreadyWritten)
Expand Down Expand Up @@ -636,6 +637,21 @@ void BP5Serializer::InitStep(BufferV *DataBuffer)
throw std::logic_error("BP5Serializer:: InitStep without prior close");
}
CurDataBuffer = DataBuffer;
m_PriorDataBufferSizeTotal = 0;
}

BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer)
{
if (CurDataBuffer == NULL)
{
throw std::logic_error("BP5Serializer:: ReinitStep without prior Init");
}
m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

BufferV *tmp = CurDataBuffer;
CurDataBuffer = DataBuffer;
return tmp;
}

BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
Expand Down Expand Up @@ -696,6 +712,8 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
MBase->DataBlockSize = CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

MBase->DataBlockSize += m_PriorDataBufferSizeTotal;

void *MetaDataBlock = FFSencode(MetaEncodeBuffer, Info.MetaFormat,
MetadataBuf, &MetaDataSize);
BufferFFS *Metadata =
Expand Down
Loading