Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DurationType in cudf parquet reader via arrow:schema #15617

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
053f7da
Read duration type in cudf parquet via arrow:schema
mhaseeb123 Apr 30, 2024
aa4e9bb
reverting an inadvertently removed code line.
mhaseeb123 Apr 30, 2024
6c67c28
clang-format changes
mhaseeb123 Apr 30, 2024
0e6fc4a
Update cpp/src/io/parquet/reader_impl_helpers.cpp
mhaseeb123 Apr 30, 2024
a6eca13
Co-walk arrow and parquet schema
mhaseeb123 May 1, 2024
ced5dd9
fixing copyrights
mhaseeb123 May 1, 2024
b192352
fix the hardcoded if conditions for duration type
mhaseeb123 May 1, 2024
18d5e6c
add boolean check for arrow type columns
mhaseeb123 May 1, 2024
8f55983
add basic testing for duration type
mhaseeb123 May 1, 2024
6883c7e
revert clangd induced formatting
mhaseeb123 May 1, 2024
ab5cacd
more reverting clangd
mhaseeb123 May 1, 2024
649148c
remove raw for loops, verify equal fields at each schema level
mhaseeb123 May 2, 2024
416dbbd
Remove flatbuffer files. Add flatbuffers via CMake
mhaseeb123 May 2, 2024
c5a7b0e
Make arrow schema use in PQ reader optional. Add tests.
mhaseeb123 May 2, 2024
6f18766
minor updates for better readability
mhaseeb123 May 2, 2024
e4b9e74
Merge branch 'branch-24.06' into arrow-schema-support-pq-reader
mhaseeb123 May 2, 2024
dc7564a
fix arrow schema walk to handle list type columns. Add more pytests
mhaseeb123 May 3, 2024
0c4e7c4
add comments for the dummy node hack
mhaseeb123 May 3, 2024
0514b5c
Adding `map` type to parquet testing.
mhaseeb123 May 3, 2024
a1f8fe7
relocate files, fix copyirghts and ruff checks
mhaseeb123 May 6, 2024
a36c1c6
minor fix for verify copyright hook
mhaseeb123 May 6, 2024
59d84f4
Merge branch 'branch-24.06' into arrow-schema-support-pq-reader
mhaseeb123 May 6, 2024
6b9bde5
update copyright messages
mhaseeb123 May 6, 2024
041ff76
Merge branch 'arrow-schema-support-pq-reader' of https://github.com/m…
mhaseeb123 May 6, 2024
cb691dd
segfault-proof the `validate_schemas` method
mhaseeb123 May 6, 2024
59610cd
Merge branch 'branch-24.06' into arrow-schema-support-pq-reader
mhaseeb123 May 6, 2024
ed83908
C++ friendly base64 encoder/decoder implementations
mhaseeb123 May 7, 2024
fbd3356
minor updates
mhaseeb123 May 7, 2024
b93c2c0
fix the erroneous inequality check to equality
mhaseeb123 May 7, 2024
d01f94c
use string find instead of custom function for better speed
mhaseeb123 May 7, 2024
b8c338b
optimize base64 encode
mhaseeb123 May 7, 2024
e47bbfb
fix minor signed comparison error
mhaseeb123 May 7, 2024
0b5ec61
speed optimization for decoder
mhaseeb123 May 7, 2024
83a13a7
Apply suggestions from code review
mhaseeb123 May 8, 2024
69be7db
applying suggestions from reviewers
mhaseeb123 May 8, 2024
0d41d99
minor updates from reviewer suggestions
mhaseeb123 May 8, 2024
56bbc15
add ctests for base64 encoder and decoder
mhaseeb123 May 8, 2024
bd54430
minor comments update
mhaseeb123 May 9, 2024
e954b45
Apply styling suggestions from code review
mhaseeb123 May 9, 2024
b870359
minor updates and better styling
mhaseeb123 May 9, 2024
c34c248
adding const to decode_ipc_message fn
mhaseeb123 May 9, 2024
dda87d1
avoid returning raw pointer in decode_ipc_message
mhaseeb123 May 9, 2024
e9f441d
move base64 definitions to a source file and add it to cmake
mhaseeb123 May 10, 2024
ac85ecc
apply suggestions from the reviews
mhaseeb123 May 10, 2024
45261f1
Apply suggestions from code review
mhaseeb123 May 10, 2024
f92fcc8
improve round trip tests for thorough arrow schema testing plus minor…
mhaseeb123 May 10, 2024
1c36d36
Update cpp/src/io/parquet/reader_impl_helpers.cpp
mhaseeb123 May 10, 2024
336574a
minor syntactical updates to tests
mhaseeb123 May 10, 2024
b0289b8
Apply suggestions from code review
mhaseeb123 May 13, 2024
3a602cc
small improvements and using zip iterator instead of counting iterato…
mhaseeb123 May 13, 2024
63b4df3
Merge branch 'branch-24.06' into arrow-schema-support-pq-reader
vuule May 13, 2024
7fbbea0
Remove explicit check for dtypes as already being done
mhaseeb123 May 13, 2024
6ab3b17
move `use_arrow_schema` to the end of parameters
mhaseeb123 May 14, 2024
4d74b24
Update tests to construct `expected` and use `assert_eq` for dtypes
mhaseeb123 May 14, 2024
a80f562
Remove `use_arrow_schema` from public Python APIs.
mhaseeb123 May 14, 2024
4e368d8
Remove `use_arrow_schema` from Cython API args as well
mhaseeb123 May 14, 2024
93ec789
Throw some Nulls in python tests
mhaseeb123 May 14, 2024
09eadcf
Merge branch 'branch-24.06' into arrow-schema-support-pq-reader
galipremsagar May 14, 2024
1d94cc8
Merge remote-tracking branch 'upstream/branch-24.06' into arrow-schem…
mhaseeb123 May 14, 2024
50d0b77
Update .pre-commit-config.yaml
galipremsagar May 14, 2024
56b2edc
Merge branch 'branch-24.06' into arrow-schema-support-pq-reader
mhaseeb123 May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Co-walk arrow and parquet schema
  • Loading branch information
mhaseeb123 committed May 1, 2024
commit a6eca13089f6058affb4394e2aaa76607505c321
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ struct SchemaElement {
// extra cudf specific fields
bool output_as_byte_array = false;

// cudf type determined from arrow:schema
thrust::optional<type_id> arrow_type;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

// The following fields are filled in later during schema initialization
int max_definition_level = 0;
int max_repetition_level = 0;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,7 @@ void reader::impl::populate_metadata(table_metadata& out_metadata)
table_with_metadata reader::impl::read_chunk_internal(
bool uses_custom_row_bounds, std::optional<std::reference_wrapper<ast::expression const>> filter)
{
// If `_output_metadata` has been constructed, just
// copy it over.
// If `_output_metadata` has been constructed, just copy it over.
auto out_metadata = _output_metadata ? table_metadata{*_output_metadata} : table_metadata{};
out_metadata.schema_info.resize(_output_buffers.size());

Expand Down
213 changes: 124 additions & 89 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,14 @@

#include "reader_impl_helpers.hpp"

#include "cudf/detail/utilities/assert.cuh"
#include "cudf/types.hpp"
#include "cudf/utilities/error.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_common.hpp"
#include "io/utilities/row_selection.hpp"

#include <cudf/detail/utilities/base64_utils.hpp>
// flatbuffer headers
#include <cudf/io/ipc/detail/Message_generated.h>
#include <cudf/io/ipc/detail/Schema_generated.h>

#include <arrow/scalar.h>
#include <arrow/type_fwd.h>

#include <algorithm>
#include <cstdint>
#include <iterator>
#include <memory>
#include <numeric>
#include <optional>
#include <regex>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -83,10 +70,10 @@ thrust::optional<LogicalType> converted_to_logical_type(SchemaElement const& sch
*/
type_id to_type_id(SchemaElement const& schema,
bool strings_to_categorical,
type_id timestamp_type_id,
cudf::data_type duration_type)
type_id timestamp_type_id)
{
auto const physical = schema.type;
auto const arrow = schema.arrow_type;
auto logical_type = schema.logical_type;
// sanity check, but not worth failing over
if (schema.converted_type.has_value() and not logical_type.has_value()) {
Expand Down Expand Up @@ -180,8 +167,7 @@ type_id to_type_id(SchemaElement const& schema,
switch (physical) {
case BOOLEAN: return type_id::BOOL8;
case INT32: return type_id::INT32;
case INT64:
return (duration_type.id() == type_id::EMPTY) ? type_id::INT64 : duration_type.id();
case INT64: return arrow.value_or(type_id::INT64);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
case FLOAT: return type_id::FLOAT32;
case DOUBLE: return type_id::FLOAT64;
case BYTE_ARRAY:
Expand Down Expand Up @@ -560,25 +546,28 @@ aggregate_reader_metadata::aggregate_reader_metadata(
}

if (arrow_schema.has_value()) {
// erase "ARROW:schema" from the output pfm.
std::for_each(
keyval_maps.begin(), keyval_maps.end(), [](auto& pfm) { pfm.erase("ARROW:schema"); });
// consume arrow schema into the parquet schema
consume_arrow_schema();
vuule marked this conversation as resolved.
Show resolved Hide resolved
// we no longer keep arrow schema alive
arrow_schema.reset();
}

// erase "ARROW:schema" from the output pfm if exists
std::for_each(
keyval_maps.begin(), keyval_maps.end(), [](auto& pfm) { pfm.erase("ARROW:schema"); });
vuule marked this conversation as resolved.
Show resolved Hide resolved
}

[[nodiscard]] std::optional<arrow_schema_t> aggregate_reader_metadata::collect_arrow_schema() const
[[nodiscard]] std::optional<arrow_schema_data_types>
aggregate_reader_metadata::collect_arrow_schema() const
{
// Check if the key_value metadata contains an ARROW:schema
// If yes, read and decode the flatbuffer schema

// TODO: Should we check if any file has the "ARROW:schema" key or
// Question: Should we check if any file has the "ARROW:schema" key or
// Or if all files have the same "ARROW:schema"?
auto it = keyval_maps[0].find("ARROW:schema");
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
if (it == keyval_maps[0].end()) { return std::nullopt; }

// Local arrow::schema object
arrow_schema_t schema;

// Read arrow schema from flatbuffers
std::string encoded_serialized_message = it->second;

Expand Down Expand Up @@ -642,66 +631,123 @@ aggregate_reader_metadata::aggregate_reader_metadata(
return cudf::data_type{};
vuule marked this conversation as resolved.
Show resolved Hide resolved
};

// Question: Should we not walk the schema here and return flatbuf:Fields *
// and walk fields in `select_columns` function where we actually
// use the to_type_id function?

// Lambda function to walk a field and its children in DFS manner and
// return boolean walk success status
std::function<bool(const flatbuf::Field*)> walk_field =
[&schema, &duration_from_flatbuffer, &walk_field](const flatbuf::Field* field) -> bool {
// DFS: recursively walk over its children first
const auto& children = field->children();
if (children != nullptr) {
auto iter = std::find_if_not(children->cbegin(),
children->cend(),
[&walk_field](const auto& child) { return walk_field(child); });
if (iter != children->end()) { return false; }
}
std::function<bool(const flatbuf::Field*, arrow_schema_data_types&)> walk_field =
[&walk_field, &duration_from_flatbuffer](const flatbuf::Field* field,
arrow_schema_data_types& schema_elem) {
// DFS: recursively walk over the children first
auto const& field_children = field->children();
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

if (field_children != nullptr) {
auto schema_children = std::vector<arrow_schema_data_types>(field->children()->size());
// TODO: raw loop. Should try and change to STL.
for (uint32_t idx = 0; idx < field_children->size(); idx++) {
if (not walk_field(*(field_children->begin() + idx), schema_children[idx])) {
return false;
}
}
schema_elem.children = std::move(schema_children);
}

// Walk the field itself
// TODO: is the 'name' a good key? What if cols at different levels have the same name.
// Solution: build strings like: "parent.child.grandchild.greatgrandchild" to ensure
// uniqueness?
auto name = (field->name() != nullptr) ? field->name()->str() : "";

if (field->type_type() == flatbuf::Type::Type_Duration) {
auto type_data = field->type_as_Duration();
if (type_data != nullptr) {
// add an entry to the the unordered_map
schema[name] = duration_from_flatbuffer(type_data);
} else {
CUDF_LOG_ERROR("Parquet reader encountered an invalid type_data pointer.",
"arrow:schema not processed.");
return false;
// Walk the field itself
if (field->type_type() == flatbuf::Type::Type_Duration) {
auto type_data = field->type_as_Duration();
if (type_data != nullptr) {
auto name = (field->name()) ? field->name()->str() : "";
// set the schema_elem type to duration type
schema_elem.type = duration_from_flatbuffer(type_data);
} else {
CUDF_LOG_ERROR("Parquet reader encountered an invalid type_data pointer.",
"arrow:schema not processed.");
return false;
}
}
return true;
};

// arrow schema structure to return
arrow_schema_data_types schema;

// Recursively walk the arrow schema and set cudf::data_type
// for all duration columns
if (fields->size() > 0) {
schema.children = std::vector<arrow_schema_data_types>(fields->size());
// TODO: raw loop. Should try and change to STL.
for (uint32_t idx = 0; idx < fields->size(); idx++) {
if (not walk_field(*(fields->begin() + idx), schema.children[idx])) { return std::nullopt; }
}
return true;
}

return std::make_optional(std::move(schema));
}

void aggregate_reader_metadata::consume_arrow_schema()
{
auto schema_root = get_schema(0);
auto arrow_schema_root = arrow_schema.value();

/*
* Recursively verify that the number of columns at each level in
* Parquet schema and arrow schema are the same. If yes, do the
* co-walk between them, else skip it
*/

// Should verify at each level rather than total number of fields
auto num_fields = arrow_schema_root.children.size();
auto num_schema_elems = schema_root.children_idx.size();

// lambda to compute total number of fields in arrow schema
std::function<int32_t(const arrow_schema_data_types&)> calc_num_fields =
[&calc_num_fields](const arrow_schema_data_types& arrow_schema_elem) -> int32_t {
int32_t num_fields = arrow_schema_elem.children.size();
std::for_each(arrow_schema_elem.children.cbegin(),
arrow_schema_elem.children.cend(),
[&](auto const& schema_elem) { num_fields += calc_num_fields(schema_elem); });

return num_fields;
};

// Walk all fields and extract all DurationType columns
auto iter = std::find_if_not(fields->cbegin(), fields->cend(), [&walk_field](const auto& field) {
return walk_field(field);
});
// calculate the total number of fields.
std::for_each(arrow_schema_root.children.cbegin(),
arrow_schema_root.children.cend(),
[&](auto const& schema_elem) { num_fields += calc_num_fields(schema_elem); });

if (iter != fields->end()) { return std::nullopt; }
// check if total number of fields are equal
if (num_fields != num_schema_elems) {
CUDF_LOG_ERROR("Parquet reader encountered a mismatch between Parquet and arrow schema.",
"arrow:schema not processed.");
return;
}

// TODO: Get endianness from arrow:schema object - We aren't using this for now
[[maybe_unused]] auto endianness = fb_schema->endianness();
// All good, now co-walk schemas
std::function<void(arrow_schema_data_types&, int)> co_walk_schemas =
[&](arrow_schema_data_types& arrow_schema, int schema_idx) {
auto& schema_elem = per_file_metadata[0].schema[schema_idx];
// TODO: raw loop. Should try and change to STL.
for (int32_t idx = 0; idx < static_cast<int32_t>(arrow_schema.children.size()); idx++) {
co_walk_schemas(arrow_schema.children[idx], schema_elem.children_idx[idx]);
}

// TODO: Get an arrow:KeyValue map of custom_metadata from arrow:schema object -
// We aren't using this for now
[[maybe_unused]] auto custom_metadata = fb_schema->custom_metadata();
if (arrow_schema.type.id() != type_id::EMPTY and schema_elem.type == Type::INT64 and
not schema_elem.logical_type.has_value() and not schema_elem.converted_type.has_value()) {
schema_elem.arrow_type = arrow_schema.type.id();
}
};
// TODO: raw loop. Should try and change to STL.
for (int32_t idx = 0; idx < static_cast<int32_t>(arrow_schema_root.children.size()); idx++) {
co_walk_schemas(arrow_schema_root.children[idx], schema_root.children_idx[idx]);
}

return std::make_optional(std::move(schema));
return;
}

const void* aggregate_reader_metadata::decode_ipc_message(std::string& serialized_message) const
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
{
// Constants copied from arrow source and renamed to match the case
constexpr auto message_decoder_next_required_size_initial = sizeof(int32_t);
constexpr auto MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL = sizeof(int32_t);
constexpr auto message_decoder_next_required_size_metadata_length = sizeof(int32_t);
constexpr int32_t ipc_continuation_token = -1;
constexpr int32_t IPC_CONTINUATION_TOKEN = -1;

// message buffer
auto message_buf = serialized_message.data();
Expand All @@ -715,24 +761,24 @@ const void* aggregate_reader_metadata::decode_ipc_message(std::string& serialize
return static_cast<const void*>(nullptr);
}
// Check for improper message.
if (message_size - message_decoder_next_required_size_initial < 0) {
if (message_size - MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL < 0) {
CUDF_LOG_ERROR("Parquet reader encountered unexpected arrow:schema message length.",
"arrow:schema not processed.");
}
vuule marked this conversation as resolved.
Show resolved Hide resolved

// Get the first 4 bytes (continuation) of the ipc message
int32_t continuation;
std::memcpy(&continuation, message_buf, message_decoder_next_required_size_initial);
std::memcpy(&continuation, message_buf, MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL);

// Check if the continuation matches the expected token
if (continuation != ipc_continuation_token) {
if (continuation != IPC_CONTINUATION_TOKEN) {
CUDF_LOG_ERROR("Parquet reader encountered unexpected IPC continuation token.",
"arrow:schema not processed.");
return static_cast<const void*>(nullptr);
} else {
// Offset the message buf and reduce remaining size
message_buf += message_decoder_next_required_size_initial;
message_size -= message_decoder_next_required_size_initial;
message_buf += MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL;
message_size -= MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL;
}

// Check for improper message.
Expand Down Expand Up @@ -967,17 +1013,11 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>

auto const one_level_list = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx));

auto duration_type = cudf::data_type{};
if (arrow_schema.has_value() and
arrow_schema.value().find(schema_elem.name) != arrow_schema.value().end()) {
duration_type = arrow_schema.value().at(schema_elem.name);
}
// if we're at the root, this is a new output column
auto const col_type =
one_level_list
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, duration_type);
auto const dtype = to_data_type(col_type, schema_elem);
auto const col_type = one_level_list
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);

cudf::io::detail::inline_column_buffer output_col(dtype,
schema_elem.repetition_type == OPTIONAL);
Expand Down Expand Up @@ -1016,13 +1056,8 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
// set up child output column for one-level encoding list
if (one_level_list) {
// determine the element data type
auto duration_type = cudf::data_type{};
if (arrow_schema.has_value() and
arrow_schema.value().find(schema_elem.name) != arrow_schema.value().end()) {
duration_type = arrow_schema.value().at(schema_elem.name);
}
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, duration_type);
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const element_dtype = to_data_type(element_type, schema_elem);

cudf::io::detail::inline_column_buffer element_col(
Expand Down
21 changes: 13 additions & 8 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ struct row_group_info {
*/
[[nodiscard]] type_id to_type_id(SchemaElement const& schema,
bool strings_to_categorical,
type_id timestamp_type_id,
cudf::data_type duration_type = cudf::data_type{});
type_id timestamp_type_id);

/**
* @brief Converts cuDF type enum to column logical type
Expand All @@ -124,12 +123,15 @@ struct metadata : public FileMetaData {
void sanitize_schema();
};

using arrow_schema_t = std::unordered_map<std::string, cudf::data_type>;
struct arrow_schema_data_types {
std::vector<arrow_schema_data_types> children;
data_type type{type_id::EMPTY};
};

class aggregate_reader_metadata {
std::vector<metadata> per_file_metadata;
std::vector<std::unordered_map<std::string, std::string>> keyval_maps;
std::optional<arrow_schema_t> arrow_schema;
std::optional<arrow_schema_data_types> arrow_schema;

int64_t num_rows;
size_type num_row_groups;
Expand All @@ -147,13 +149,16 @@ class aggregate_reader_metadata {
const;

/**
* @brief Walks over an "ARROW:schema" flatbuffer and collects type information for DurationType
* columns into an unordered map.
* @brief Decodes and walks over "ARROW:schema" from Parquet key value
* metadata section and return it.
*/
[[nodiscard]] std::optional<arrow_schema_t> collect_arrow_schema() const;
[[nodiscard]] std::optional<arrow_schema_data_types> collect_arrow_schema() const;

void consume_arrow_schema();

/**
* @brief Decode an arrow:IPC message and returns a const void pointer to its metadata header
* @brief Decode an arrow:IPC message and returns a const void pointer
* to its metadata header
*/
[[nodiscard]] const void* decode_ipc_message(std::string& serialized_message) const;

Expand Down
Loading