Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multithreaded parquet reader benchmarks. #15585

Merged
Prev Previous commit
Next Next commit
Formatting.
  • Loading branch information
nvdbaranec committed May 20, 2024
commit ad747cbf65b5c4766e516a7b96f709a96844e708
85 changes: 41 additions & 44 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ void set_cuio_host_pinned_pool()
cudf::io::set_host_memory_resource(*mr);
}

size_t get_num_reads(nvbench::state const& state)
{
return state.get_int64("num_threads");
}
size_t get_num_reads(nvbench::state const& state) { return state.get_int64("num_threads"); }

size_t get_read_size(nvbench::state const& state)
{
Expand All @@ -57,11 +54,11 @@ size_t get_read_size(nvbench::state const& state)

std::string get_label(std::string const& test_name, nvbench::state const& state)
{
auto const num_cols = state.get_int64("num_cols");
auto const num_cols = state.get_int64("num_cols");
size_t const read_size_mb = get_read_size(state) / (1024 * 1024);
return {test_name + ", " + std::to_string(num_cols) + " columns, " +
std::to_string(state.get_int64("num_threads")) + " threads " +
" (" + std::to_string(read_size_mb) + " MB each)"};
std::to_string(state.get_int64("num_threads")) + " threads " + " (" +
std::to_string(read_size_mb) + " MB each)"};
}

std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
Expand All @@ -70,9 +67,9 @@ std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
cudf::size_type const cardinality = state.get_int64("cardinality");
cudf::size_type const run_length = state.get_int64("run_length");
cudf::size_type const num_cols = state.get_int64("num_cols");
size_t const num_files = get_num_reads(state);
size_t const per_file_data_size = get_read_size(state);
size_t const num_files = get_num_reads(state);
size_t const per_file_data_size = get_read_size(state);

std::vector<cuio_source_sink_pair> source_sink_vector;

size_t total_file_size = 0;
Expand Down Expand Up @@ -152,8 +149,7 @@ void BM_parquet_multithreaded_read_mixed(nvbench::state& state)
auto label = get_label("mixed", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_common(
state,
{cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label);
state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label);
nvtxRangePop();
}

Expand Down Expand Up @@ -185,8 +181,8 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
size_t const input_limit = state.get_int64("input_limit");
size_t const output_limit = state.get_int64("output_limit");

Expand All @@ -200,34 +196,36 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,

nvtxRangePushA(("(read) " + label).c_str());
std::vector<cudf::io::table_with_metadata> chunks;
state.exec(
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
auto& source_sink = source_sink_vector[index];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());
// divide chunk limits by number of threads so the number of chunks produced is the same for all cases.
// this seems better than the alternative, which is to keep the limits the same. if we do that, as the
// number of threads goes up, the number of chunks goes down - so are actually benchmarking the same thing in that case?
auto reader = cudf::io::chunked_parquet_reader(output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
timer.start();
threads.paused = false;
threads.wait_for_tasks();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
auto& source_sink = source_sink_vector[index];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());
// divide chunk limits by number of threads so the number of chunks produced is the
// same for all cases. this seems better than the alternative, which is to keep the
// limits the same. if we do that, as the number of threads goes up, the number of
// chunks goes down - so are actually benchmarking the same thing in that case?
auto reader = cudf::io::chunked_parquet_reader(
output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
timer.start();
threads.paused = false;
threads.wait_for_tasks();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
Expand All @@ -250,8 +248,7 @@ void BM_parquet_multithreaded_read_chunked_fixed_width(nvbench::state& state)
{
auto label = get_label("mixed", state);
nvtxRangePushA(label.c_str());
BM_parquet_multithreaded_read_chunked_common(
state, {cudf::type_id::INT32}, label);
BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::INT32}, label);
nvtxRangePop();
}

Expand Down
Loading