Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADIOS2: Flush to disk within a step #1207

Merged
merged 8 commits into from
Jul 28, 2022
Prev Previous commit
Next Next commit
Testing
  • Loading branch information
franzpoeschel committed Jul 19, 2022
commit a22d25bee8b988a97444506a5a427c2e32ceecfc
215 changes: 215 additions & 0 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
#include <tuple>
#include <vector>

#ifdef __unix__
#include <dirent.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif

using namespace openPMD;

struct BackendSelection
Expand Down Expand Up @@ -3945,6 +3953,213 @@ TEST_CASE("git_adios2_early_chunk_query", "[serial][adios2]")
R"({"backend": "adios2"})");
}

/*
* Require Gnu compiler 11 or younger because this test needs the
* <filesystem> header.
*/
#if defined(__unix__) && defined(ADIOS2_HAVE_BP5)
void adios2_bp5_flush(std::string const &cfg, bool flushDuringStep)
{
constexpr size_t size = 1024 * 1024;

auto getsize = []() {
off_t res = 0;
int dirfd = open("../samples/bp5_flush.bp", O_RDONLY);
if (dirfd < 0)
{
throw std::system_error(
std::error_code(errno, std::system_category()));
}
DIR *directory = fdopendir(dirfd);
if (!directory)
{
close(dirfd);
throw std::system_error(
std::error_code(errno, std::system_category()));
}
dirent *entry;
struct stat statbuf;
while ((entry = readdir(directory)) != nullptr)
{
if (strcmp(entry->d_name, ".") == 0 ||
strcmp(entry->d_name, "..") == 0)
{
continue;
}
int err = fstatat(dirfd, entry->d_name, &statbuf, 0);
if (err != 0)
{
closedir(directory);
close(dirfd);
throw std::system_error(
std::error_code(errno, std::system_category()));
}
res += statbuf.st_size;
}
closedir(directory);
close(dirfd);
return res;
};
std::vector<int32_t> data(size, 10);
{
Series write("../samples/bp5_flush.bp", Access::CREATE, cfg);

{
auto component =
write.writeIterations()[0]
.meshes["e_chargeDensity"][RecordComponent::SCALAR];
component.resetDataset({Datatype::INT, {size}});
component.storeChunk(data, {0}, {size});
// component.seriesFlush(FlushMode::NonCollective);
component.seriesFlush();
}

auto currentSize = getsize();
if (flushDuringStep)
{
// should be roughly within 1% of 4Gb
REQUIRE(std::abs(1 - double(currentSize) / (4 * size)) <= 0.01);
}
else
{
// should be roughly zero
REQUIRE(currentSize <= 4096);
}

{
auto component =
write.writeIterations()[0]
.meshes["i_chargeDensity"][RecordComponent::SCALAR];
component.resetDataset({Datatype::INT, {size}});
component.storeChunk(data, {0}, {size});
}

currentSize = getsize();
if (flushDuringStep)
{
// should still be roughly within 1% of 4Gb
REQUIRE(std::abs(1 - double(currentSize) / (4 * size)) <= 0.01);
}
else
{
// should be roughly zero
REQUIRE(currentSize <= 4096);
}

write.flush();
currentSize = getsize();
if (flushDuringStep)
{
// should now be roughly within 1% of 8Gb
REQUIRE(std::abs(1 - double(currentSize) / (8 * size)) <= 0.01);
}
else
{
// should be roughly zero
REQUIRE(currentSize <= 4096);
}

{
auto component =
write.writeIterations()[0]
.meshes["temperature"][RecordComponent::SCALAR];
component.resetDataset({Datatype::INT, {size}});
component.storeChunk(data, {0}, {size});
// component.seriesFlush(FlushMode::NonCollective);
component.seriesFlush(
"adios2.engine.preferred_flush_target = \"buffer\"");
}
// everything should be the same as before
currentSize = getsize();
if (flushDuringStep)
{
// should now be roughly within 1% of 8Gb
REQUIRE(std::abs(1 - double(currentSize) / (8 * size)) <= 0.01);
}
else
{
// should be roughly zero
REQUIRE(currentSize <= 4096);
}

{
auto component =
write.writeIterations()[0]
.meshes["temperature"][RecordComponent::SCALAR];
component.resetDataset({Datatype::INT, {size}});
component.storeChunk(data, {0}, {size});
// component.seriesFlush(FlushMode::NonCollective);
component.seriesFlush(
"adios2.engine.preferred_flush_target = \"disk\"");
}
currentSize = getsize();
// should now indiscriminately be roughly within 1% of 16Gb
REQUIRE(std::abs(1 - double(currentSize) / (16 * size)) <= 0.01);
}
auto currentSize = getsize();
// should still be indiscriminately be roughly within 1% of 8Gb after
// closing the Series
REQUIRE(std::abs(1 - double(currentSize) / (16 * size)) <= 0.01);
}

TEST_CASE("adios2_bp5_flush", "[serial][adios2]")
{
std::string cfg1 = R"(
[adios2]

[adios2.engine]
usesteps = true
type = "bp5"
preferred_flush_target = "disk"

[adios2.engine.parameters]
AggregationType = "TwoLevelShm"
MaxShmSize = 3221225472
NumSubFiles = 1
NumAggregators = 1
BufferChunkSize = 2147483646 # 2^31 - 2
)";

adios2_bp5_flush(cfg1, /* flushDuringStep = */ true);

std::string cfg2 = R"(
[adios2]

[adios2.engine]
usesteps = true
type = "bp5"
preferred_flush_target = "buffer"

[adios2.engine.parameters]
AggregationType = "TwoLevelShm"
MaxShmSize = 3221225472
NumSubFiles = 1
NumAggregators = 1
BufferChunkSize = 2147483646 # 2^31 - 2
)";

adios2_bp5_flush(cfg2, /* flushDuringStep = */ false);

std::string cfg3 = R"(
[adios2]

[adios2.engine]
usesteps = true
type = "bp5"
# preferred_flush_target = <default>

[adios2.engine.parameters]
AggregationType = "TwoLevelShm"
MaxShmSize = 3221225472
NumSubFiles = 1
NumAggregators = 1
BufferChunkSize = 2147483646 # 2^31 - 2
)";

adios2_bp5_flush(cfg3, /* flushDuringStep = */ true);
}
#endif

TEST_CASE("serial_adios2_backend_config", "[serial][adios2]")
{
if (auxiliary::getEnvString("OPENPMD_BP_BACKEND", "NOT_SET") == "ADIOS1")
Expand Down