Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writing compressed output using JSON writer #17323

Merged
merged 10 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ class json_writer_options_builder;
class json_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format of the sink
compression_type _compression = compression_type::NONE;
// maximum number of rows to write in each chunk (limits memory use)
size_type _rows_per_chunk = std::numeric_limits<size_type>::max();
// Set of columns to output
Expand Down Expand Up @@ -1022,6 +1024,13 @@ class json_writer_options {
*/
[[nodiscard]] std::string const& get_na_rep() const { return _na_rep; }

/**
* @brief Returns compression type used for sink
*
* @return compression type for sink
*/
[[nodiscard]] compression_type get_compression() const { return _compression; }

/**
* @brief Whether to output nulls as 'null'.
*
Expand Down Expand Up @@ -1066,6 +1075,13 @@ class json_writer_options {
*/
void set_table(table_view tbl) { _table = tbl; }

/**
* @brief Sets compression type to be used
*
* @param comptype Compression type for sink
*/
void set_compression(compression_type comptype) { _compression = comptype; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1153,6 +1169,18 @@ class json_writer_options_builder {
return *this;
}

/**
* @brief Sets compression type of output sink
*
* @param comptype Compression type used
* @return this for chaining
*/
json_writer_options_builder& compression(compression_type comptype)
{
options._compression = comptype;
return *this;
}

/**
* @brief Sets optional metadata (with column names).
*
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress
#include <zlib.h> // GZIP compression

namespace cudf::io::detail {

Expand Down Expand Up @@ -77,12 +77,12 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
{
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

auto dst_size = compress_max_output_chunk_size(nvcomp::compression_type::SNAPPY, src.size());
rmm::device_uvector<uint8_t> d_dst(dst_size, stream);
cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);
Expand All @@ -93,13 +93,10 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

} // namespace
Expand Down
29 changes: 25 additions & 4 deletions cpp/src/io/json/write_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @brief cuDF-IO JSON writer implementation
*/

#include "io/comp/comp.hpp"
#include "io/csv/durations.hpp"
#include "io/utilities/parsing_utils.cuh"
#include "lists/utilities.hpp"
Expand Down Expand Up @@ -828,10 +829,10 @@ void write_chunked(data_sink* out_sink,
}
}

void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
void write_json_uncompressed(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
std::vector<column_name_info> user_column_names = [&]() {
Expand Down Expand Up @@ -934,4 +935,24 @@ void write_json(data_sink* out_sink,
}
}

void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
if (options.get_compression() != compression_type::NONE) {
std::vector<char> hbuf;
auto hbuf_sink_ptr = data_sink::create(&hbuf);
write_json_uncompressed(hbuf_sink_ptr.get(), table, options, stream);
stream.synchronize();
auto comp_hbuf = cudf::io::detail::compress(
options.get_compression(),
host_span<uint8_t>(reinterpret_cast<uint8_t*>(hbuf.data()), hbuf.size()),
stream);
out_sink->host_write(comp_hbuf.data(), comp_hbuf.size());
return;
}
write_json_uncompressed(out_sink, table, options, stream);
}

} // namespace cudf::io::json::detail
Loading
Loading