Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multithreaded parquet reader benchmarks. #15585

Merged
Prev Previous commit
Next Next commit
Rework test parameters so that we generally see at least small perfor…
…mance increases with more threads. And the benchmarks clearly

show multiple-thread-only performance regressions that we have historically seen.
  • Loading branch information
nvdbaranec committed May 20, 2024
commit 21a6261f0237b0d38a909eb18c3a4f1f56f4dc04
195 changes: 113 additions & 82 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,27 @@ void set_cuio_host_pinned_pool()
using host_pooled_mr = rmm::mr::pool_memory_resource<rmm::mr::pinned_host_memory_resource>;
static std::shared_ptr<host_pooled_mr> mr = std::make_shared<host_pooled_mr>(
std::make_shared<rmm::mr::pinned_host_memory_resource>().get(), 256ul * 1024 * 1024);

cudf::io::set_host_memory_resource(*mr);
}

size_t get_num_reads(nvbench::state const& state)
{
size_t const data_size = state.get_int64("total_data_size");
size_t const per_file_data_size = state.get_int64("per_file_data_size");
return data_size / per_file_data_size;
return state.get_int64("num_threads");
}

std::string get_label(std::string const& test_name, nvbench::state const& state)
size_t get_read_size(nvbench::state const& state)
{
auto const num_reads = get_num_reads(state);
return state.get_int64("total_data_size") / num_reads;
}

std::string get_label(std::string const& test_name, nvbench::state const& state)
{
auto const num_cols = state.get_int64("num_cols");
return {test_name + ", " + std::to_string(num_cols) + "columns, " + std::to_string(num_reads) +
"reads, " + std::to_string(state.get_int64("num_threads")) + " threads"};
size_t const read_size_mb = get_read_size(state) / (1024 * 1024);
return {test_name + ", " + std::to_string(num_cols) + " columns, " +
std::to_string(state.get_int64("num_threads")) + " threads " +
" (" + std::to_string(read_size_mb) + " MB each)"};
}

std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
Expand All @@ -66,14 +70,14 @@ std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
cudf::size_type const cardinality = state.get_int64("cardinality");
cudf::size_type const run_length = state.get_int64("run_length");
cudf::size_type const num_cols = state.get_int64("num_cols");
size_t const per_file_data_size = state.get_int64("per_file_data_size");

size_t const num_tables = get_num_reads(state);
size_t const num_files = get_num_reads(state);
size_t const per_file_data_size = get_read_size(state);

std::vector<cuio_source_sink_pair> source_sink_vector;

size_t total_file_size = 0;

for (size_t i = 0; i < num_tables; ++i) {
for (size_t i = 0; i < num_files; ++i) {
cuio_source_sink_pair source_sink{cudf::io::io_type::HOST_BUFFER};

auto const tbl = create_random_table(
Expand All @@ -84,19 +88,22 @@ std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(

cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(cudf::io::compression_type::SNAPPY);
.compression(cudf::io::compression_type::SNAPPY)
.max_page_size_rows(50000)
.max_page_size_bytes(1024 * 1024);

cudf::io::write_parquet(write_opts);
total_file_size += source_sink.size();

source_sink_vector.push_back(std::move(source_sink));
}

return {std::move(source_sink_vector), total_file_size, num_tables};
return {std::move(source_sink_vector), total_file_size, num_files};
}

void BM_parquet_multithreaded_read_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types)
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
Expand All @@ -106,10 +113,11 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_tables] = write_file_data(state, d_types);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);

auto mem_stats_logger = cudf::memory_stats_logger();

nvtxRangePushA(("(read) " + label).c_str());
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
Expand All @@ -121,7 +129,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
};

threads.paused = true;
for (size_t i = 0; i < num_tables; ++i) {
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
timer.start();
Expand All @@ -130,6 +138,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
Expand All @@ -140,51 +149,56 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,

void BM_parquet_multithreaded_read_mixed(nvbench::state& state)
{
nvtxRangePushA(get_label("mixed", state).c_str());
auto label = get_label("mixed", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_common(
state,
{cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING, cudf::type_id::LIST});
{cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_fixed_width(nvbench::state& state)
{
nvtxRangePushA(get_label("fixed width", state).c_str());
BM_parquet_multithreaded_read_common(state, {cudf::type_id::INT32});
auto label = get_label("fixed width", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_common(state, {cudf::type_id::INT32}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_string(nvbench::state& state)
{
nvtxRangePushA(get_label("string", state).c_str());
BM_parquet_multithreaded_read_common(state, {cudf::type_id::STRING});
auto label = get_label("string", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_common(state, {cudf::type_id::STRING}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_list(nvbench::state& state)
{
nvtxRangePushA(get_label("list", state).c_str());
BM_parquet_multithreaded_read_common(state, {cudf::type_id::LIST});
auto label = get_label("list", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_common(state, {cudf::type_id::LIST}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types)
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
size_t const data_size = state.get_int64("total_data_size");
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
size_t const input_limit = state.get_int64("input_limit");
size_t const output_limit = state.get_int64("output_limit");
auto const num_threads = state.get_int64("num_threads");

set_cuio_host_pinned_pool();

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_tables] = write_file_data(state, d_types);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);

auto mem_stats_logger = cudf::memory_stats_logger();

nvtxRangePushA(("(read) " + label).c_str());
std::vector<cudf::io::table_with_metadata> chunks;
state.exec(
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
Expand All @@ -193,7 +207,10 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
auto& source_sink = source_sink_vector[index];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());
auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts);
// divide chunk limits by number of threads so the number of chunks produced is the same for all cases.
// this seems better than the alternative, which is to keep the limits the same. if we do that, as the
// number of threads goes up, the number of chunks goes down - so are actually benchmarking the same thing in that case?
auto reader = cudf::io::chunked_parquet_reader(output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
Expand All @@ -202,7 +219,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
};

threads.paused = true;
for (size_t i = 0; i < num_tables; ++i) {
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
timer.start();
Expand All @@ -211,6 +228,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
Expand All @@ -221,103 +239,116 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,

void BM_parquet_multithreaded_read_chunked_mixed(nvbench::state& state)
{
nvtxRangePushA(get_label("mixed", state).c_str());
auto label = get_label("mixed", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_chunked_common(
state, {cudf::type_id::INT32, cudf::type_id::STRING, cudf::type_id::LIST});
state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_chunked_fixed_width(nvbench::state& state)
{
auto label = get_label("mixed", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_chunked_common(
state, {cudf::type_id::INT32}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_chunked_string(nvbench::state& state)
{
nvtxRangePushA(get_label("string", state).c_str());
BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::STRING});
auto label = get_label("string", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}, label);
nvtxRangePop();
}

void BM_parquet_multithreaded_read_chunked_list(nvbench::state& state)
{
nvtxRangePushA(get_label("list", state).c_str());
BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::LIST});
auto label = get_label("list", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}, label);
nvtxRangePop();
}

// mixed data types, covering the 3 main families : fixed width, strings, and lists
// mixed data types: fixed width and strings
NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed)
.set_name("parquet_multithreaded_read_decode_mixed")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {4, 16})
.add_int64_axis("run_length", {8})
.add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024})
.add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8});
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width)
.set_name("parquet_multithreaded_read_decode_fixed_width")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {4, 16})
.add_int64_axis("run_length", {8})
.add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024})
.add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8});
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

NVBENCH_BENCH(BM_parquet_multithreaded_read_string)
.set_name("parquet_multithreaded_read_decode_string")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {4, 16})
.add_int64_axis("run_length", {8})
.add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024})
.add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8});
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

NVBENCH_BENCH(BM_parquet_multithreaded_read_list)
.set_name("parquet_multithreaded_read_decode_list")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {4, 16})
.add_int64_axis("run_length", {8})
.add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024})
.add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8});
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

// mixed data types, covering the 3 main families : fixed width, strings, and lists
// mixed data types: fixed width, strings
NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed)
.set_name("parquet_multithreaded_read_decode_chunked_mixed")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {6})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
// divides into 10 GB exactly 8 times
.add_int64_axis("per_file_data_size", {1280ul * 1024 * 1024})
.add_int64_axis("total_data_size", {10ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8})
.add_int64_axis("input_limit", {768 * 1024 * 1024})
.add_int64_axis("output_limit", {512 * 1024 * 1024});
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});

NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width)
.set_name("parquet_multithreaded_read_decode_chunked_fixed_width")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});

NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string)
.set_name("parquet_multithreaded_read_decode_chunked_mixed")
.set_name("parquet_multithreaded_read_decode_chunked_string")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {6})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
// divides into 10 GB exactly 8 times
.add_int64_axis("per_file_data_size", {1280ul * 1024 * 1024})
.add_int64_axis("total_data_size", {10ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8})
.add_int64_axis("input_limit", {768 * 1024 * 1024})
.add_int64_axis("output_limit", {512 * 1024 * 1024});
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});

NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list)
.set_name("parquet_multithreaded_read_decode_chunked_mixed")
.set_name("parquet_multithreaded_read_decode_chunked_list")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("num_cols", {6})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
// divides into 10 GB exactly 8 times
.add_int64_axis("per_file_data_size", {1280ul * 1024 * 1024})
.add_int64_axis("total_data_size", {10ul * 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {2, 4, 8})
.add_int64_axis("input_limit", {768 * 1024 * 1024})
.add_int64_axis("output_limit", {512 * 1024 * 1024});
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});