Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarking JSON reader for compressed inputs #17219

Merged
merged 38 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3517b26
partial work
shrshi Oct 18, 2024
1cc6f46
compressed input datasource
shrshi Oct 23, 2024
1f46223
formatting
shrshi Oct 23, 2024
334ef06
improving the datasoruce
shrshi Oct 24, 2024
839bdda
cleanup
shrshi Oct 24, 2024
42a4b1b
slow path for some compression formats
shrshi Oct 24, 2024
cff583b
merge
shrshi Oct 24, 2024
c3b6cb3
cleanup
shrshi Oct 24, 2024
e116fa7
remove include
shrshi Oct 24, 2024
3cd7c1d
pr feedback
shrshi Oct 25, 2024
dc7471c
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Oct 25, 2024
9c86d77
gzip impl; snappy still being figured out
shrshi Oct 31, 2024
e9272a1
added benchmark
shrshi Nov 4, 2024
00ff602
added writer test
shrshi Nov 4, 2024
09cad15
partial
shrshi Nov 4, 2024
0872f7c
merge
shrshi Nov 4, 2024
4dbf16a
added test
shrshi Nov 4, 2024
fd69384
removing unnecessary checks
shrshi Nov 4, 2024
3e3c8c5
formatting
shrshi Nov 4, 2024
c314992
added benchmark
shrshi Nov 5, 2024
a00bf62
added snappy support
shrshi Nov 5, 2024
b82c1cf
added data size axis
shrshi Nov 6, 2024
1177f4a
formatting
shrshi Nov 6, 2024
d198300
cleanup; snappy still not working though
shrshi Nov 6, 2024
cb2e5a6
removing unrelated changes
shrshi Nov 6, 2024
935ed7e
small fix
shrshi Nov 6, 2024
bd2820f
copying data to device
shrshi Nov 7, 2024
9fb0e68
Merge branch 'branch-24.12' into jsonl-gzip-benchmark
shrshi Nov 7, 2024
1fbf85f
Merge branch 'branch-24.12' into jsonl-gzip-benchmark
shrshi Nov 12, 2024
7f1b834
Merge branch 'jsonl-gzip-benchmark' of github.com:shrshi/cudf into js…
shrshi Nov 12, 2024
f9195ed
Merge branch 'branch-24.12' into jsonl-gzip-benchmark
shrshi Nov 15, 2024
f2ef4e7
Merge branch 'branch-24.12' into jsonl-gzip-benchmark
shrshi Nov 19, 2024
e508065
fixing merge
shrshi Nov 19, 2024
11ea251
more merge fixes
shrshi Nov 19, 2024
d14601c
pr reviews
shrshi Nov 19, 2024
75246d9
pr reviews
shrshi Nov 20, 2024
0afd290
Merge branch 'branch-24.12' into jsonl-gzip-benchmark
mhaseeb123 Nov 20, 2024
2ca8943
Merge branch 'branch-24.12' into jsonl-gzip-benchmark
shrshi Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ add_library(
src/io/comp/snap.cu
src/io/comp/statistics.cu
src/io/comp/uncomp.cpp
src/io/comp/comp.cu
src/io/comp/unsnap.cu
src/io/csv/csv_gpu.cu
src/io/csv/durations.cu
Expand Down
56 changes: 46 additions & 10 deletions cpp/benchmarks/io/json/json_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@

// Size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks to
// run on most GPUs, but large enough to allow highest throughput
constexpr size_t data_size = 512 << 20;
shrshi marked this conversation as resolved.
Show resolved Hide resolved
// constexpr size_t data_size = 512 << 20;
constexpr cudf::size_type num_cols = 64;

void json_read_common(cuio_source_sink_pair& source_sink,
cudf::size_type num_rows_to_read,
nvbench::state& state)
nvbench::state& state,
cudf::io::compression_type comptype = cudf::io::compression_type::NONE,
size_t datasize = 512 << 20)
{
cudf::io::json_reader_options read_opts =
cudf::io::json_reader_options::builder(source_sink.make_source_info());
cudf::io::json_reader_options::builder(source_sink.make_source_info()).compression(comptype);

auto mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
Expand All @@ -51,21 +53,27 @@ void json_read_common(cuio_source_sink_pair& source_sink,
});

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_element_count(static_cast<double>(datasize) / time, "bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

cudf::size_type json_write_bm_data(cudf::io::sink_info sink,
std::vector<cudf::type_id> const& dtypes)
cudf::size_type json_write_bm_data(
cudf::io::sink_info sink,
std::vector<cudf::type_id> const& dtypes,
cudf::io::compression_type comptype = cudf::io::compression_type::NONE,
size_t datasize = 512 << 20)
{
auto const tbl = create_random_table(
cycle_dtypes(dtypes, num_cols), table_size_bytes{data_size}, data_profile_builder());
cycle_dtypes(dtypes, num_cols), table_size_bytes{datasize}, data_profile_builder());
auto const view = tbl->view();

cudf::io::json_writer_options const write_opts =
cudf::io::json_writer_options::builder(sink, view).na_rep("null").rows_per_chunk(100'000);
cudf::io::json_writer_options::builder(sink, view)
.na_rep("null")
.rows_per_chunk(100'000)
.compression(comptype);
cudf::io::write_json(write_opts);
return view.num_rows();
}
Expand All @@ -87,6 +95,26 @@ void BM_json_read_io(nvbench::state& state, nvbench::type_list<nvbench::enum_typ
json_read_common(source_sink, num_rows, state);
}

template <cudf::io::compression_type comptype, io_type IO>
void BM_json_read_compressed_io(
nvbench::state& state, nvbench::type_list<nvbench::enum_type<comptype>, nvbench::enum_type<IO>>)
{
size_t const datasize = state.get_int64("data_size");
cuio_source_sink_pair source_sink(IO);
auto const d_type = get_type_or_group({static_cast<int32_t>(data_type::INTEGRAL),
static_cast<int32_t>(data_type::FLOAT),
static_cast<int32_t>(data_type::DECIMAL),
static_cast<int32_t>(data_type::TIMESTAMP),
static_cast<int32_t>(data_type::DURATION),
static_cast<int32_t>(data_type::STRING),
static_cast<int32_t>(data_type::LIST),
static_cast<int32_t>(data_type::STRUCT)});
auto const num_rows =
json_write_bm_data(source_sink.make_sink_info(), d_type, comptype, datasize);

json_read_common(source_sink, num_rows, state, comptype);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
}

template <data_type DataType, io_type IO>
void BM_json_read_data_type(
nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>, nvbench::enum_type<IO>>)
Expand All @@ -110,8 +138,9 @@ using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
using io_list =
nvbench::enum_type_list<io_type::FILEPATH, io_type::HOST_BUFFER, io_type::DEVICE_BUFFER>;

using compression_list =
nvbench::enum_type_list<cudf::io::compression_type::SNAPPY, cudf::io::compression_type::NONE>;
using compression_list = nvbench::enum_type_list<cudf::io::compression_type::GZIP,
cudf::io::compression_type::SNAPPY,
cudf::io::compression_type::NONE>;

NVBENCH_BENCH_TYPES(BM_json_read_data_type,
NVBENCH_TYPE_AXES(d_type_list, nvbench::enum_type_list<io_type::DEVICE_BUFFER>))
Expand All @@ -123,3 +152,10 @@ NVBENCH_BENCH_TYPES(BM_json_read_io, NVBENCH_TYPE_AXES(io_list))
.set_name("json_read_io")
.set_type_axes_names({"io"})
.set_min_samples(4);

NVBENCH_BENCH_TYPES(BM_json_read_compressed_io,
NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list<io_type::FILEPATH>))
.set_name("json_read_compressed_io")
.set_type_axes_names({"compression_type", "io"})
.add_int64_power_of_two_axis("data_size", nvbench::range(20, 29, 1))
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
.set_min_samples(4);
1 change: 1 addition & 0 deletions cpp/benchmarks/io/nvbench_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ NVBENCH_DECLARE_ENUM_TYPE_STRINGS(
[](auto value) {
switch (value) {
case cudf::io::compression_type::SNAPPY: return "SNAPPY";
case cudf::io::compression_type::GZIP: return "GZIP";
case cudf::io::compression_type::NONE: return "NONE";
default: return "Unknown";
}
Expand Down
28 changes: 28 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ class json_writer_options_builder;
class json_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format of the sink
compression_type _compression = compression_type::NONE;
// maximum number of rows to write in each chunk (limits memory use)
size_type _rows_per_chunk = std::numeric_limits<size_type>::max();
// Set of columns to output
Expand Down Expand Up @@ -993,6 +995,13 @@ class json_writer_options {
*/
[[nodiscard]] std::string const& get_na_rep() const { return _na_rep; }

/**
* @brief Returns compression type used for sink
*
* @return compression type for sink
*/
[[nodiscard]] compression_type get_compression() const { return _compression; }

/**
* @brief Whether to output nulls as 'null'.
*
Expand Down Expand Up @@ -1037,6 +1046,13 @@ class json_writer_options {
*/
void set_table(table_view tbl) { _table = tbl; }

/**
* @brief Sets compression type to be used
*
* @param comptype Compression type for sink
*/
void set_compression(compression_type comptype) { _compression = comptype; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1124,6 +1140,18 @@ class json_writer_options_builder {
return *this;
}

/**
* @brief Sets compression type of output sink
*
* @param comptype Compression type used
* @return this for chaining
*/
json_writer_options_builder& compression(compression_type comptype)
{
options._compression = comptype;
return *this;
}

/**
* @brief Sets optional metadata (with column names).
*
Expand Down
109 changes: 109 additions & 0 deletions cpp/src/io/comp/comp.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "comp.hpp"
#include "gpuinflate.hpp"
#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress

#include <cstring> // memset

namespace cudf {
namespace io {

/**
* @brief GZIP host compressor (includes header)
*/
std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src, rmm::cuda_stream_view stream)
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = src.size();
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));

std::vector<uint8_t> dst(src.size());
zs.avail_out = src.size();
zs.next_out = dst.data();

int windowbits = 15;
int gzip_encoding = 16;
int ret = deflateInit2(
&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed.");

deflate(&zs, Z_FINISH);
deflateEnd(&zs);

dst.resize(dst.size() - zs.avail_out);
return dst;
}

/**
* @brief SNAPPY device compressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
rmm::device_uvector<uint8_t> d_src(src.size(), stream);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

// gpu_snap(inputs, outputs, hd_status, stream);
nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
}

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
switch (compression) {
case compression_type::GZIP: return compress_gzip(src, stream);
case compression_type::SNAPPY: return compress_snappy(src, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

} // namespace io
} // namespace cudf
42 changes: 42 additions & 0 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

namespace cudf {
namespace io {

/**
* @brief Compresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Decompressed host buffer
*
* @return Vector containing the Compressed output
*/
std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace cudf
7 changes: 4 additions & 3 deletions cpp/src/io/comp/io_uncomp.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@

using cudf::host_span;

namespace cudf {
namespace CUDF_EXPORT cudf {
namespace io {

/**
Expand All @@ -43,6 +43,7 @@ size_t decompress(compression_type compression,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream);

size_t estimate_uncompressed_size(compression_type compression, host_span<uint8_t const> src);
/**
* @brief GZIP header flags
* See https://tools.ietf.org/html/rfc1952
Expand All @@ -56,4 +57,4 @@ constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

} // namespace io
} // namespace cudf
} // namespace CUDF_EXPORT cudf
Loading
Loading