Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Add SchemaInferenceOptions options to infer_schema and option to configure int96 inference #1533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 101 additions & 25 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module has a single entry point, [`parquet_to_arrow_schema`].
//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`].
use parquet2::schema::{
types::{
FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
Expand All @@ -8,11 +8,23 @@
};

use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use crate::io::parquet::read::schema::SchemaInferenceOptions;

/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
/// any physical column.
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec<Field> {
fields.iter().filter_map(to_field).collect::<Vec<_>>()
parquet_to_arrow_schema_with_options(fields, &None)
}

/// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference
pub fn parquet_to_arrow_schema_with_options(
fields: &[ParquetType],
options: &Option<SchemaInferenceOptions>,
) -> Vec<Field> {
fields
.iter()
.filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default())))
.collect::<Vec<_>>()
}

fn from_int32(
Expand Down Expand Up @@ -169,7 +181,10 @@
}

/// Maps a [`PhysicalType`] with optional metadata to a [`DataType`]
fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
fn to_primitive_type_inner(
primitive_type: &PrimitiveType,
options: &SchemaInferenceOptions,
) -> DataType {
match primitive_type.physical_type {
PhysicalType::Boolean => DataType::Boolean,
PhysicalType::Int32 => {
Expand All @@ -178,7 +193,7 @@
PhysicalType::Int64 => {
from_int64(primitive_type.logical_type, primitive_type.converted_type)
}
PhysicalType::Int96 => DataType::Timestamp(TimeUnit::Nanosecond, None),
PhysicalType::Int96 => DataType::Timestamp(options.int96_coerce_to_timeunit, None),
PhysicalType::Float => DataType::Float32,
PhysicalType::Double => DataType::Float64,
PhysicalType::ByteArray => {
Expand All @@ -195,8 +210,8 @@
/// Entry point for converting parquet primitive type to arrow type.
///
/// This function takes care of repetition.
fn to_primitive_type(primitive_type: &PrimitiveType) -> DataType {
let base_type = to_primitive_type_inner(primitive_type);
fn to_primitive_type(primitive_type: &PrimitiveType, options: &SchemaInferenceOptions) -> DataType {
let base_type = to_primitive_type_inner(primitive_type, options);

if primitive_type.field_info.repetition == Repetition::Repeated {
DataType::List(Box::new(Field::new(
Expand All @@ -214,23 +229,27 @@
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
match (logical_type, converted_type) {
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name),
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options),

Check warning on line 236 in src/io/parquet/read/schema/convert.rs

View check run for this annotation

Codecov / codecov/patch

src/io/parquet/read/schema/convert.rs#L236

Added line #L236 was not covered by tests
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options),

Check warning on line 238 in src/io/parquet/read/schema/convert.rs

View check run for this annotation

Codecov / codecov/patch

src/io/parquet/read/schema/convert.rs#L238

Added line #L238 was not covered by tests
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
to_map(fields)
to_map(fields, options)

Check warning on line 240 in src/io/parquet/read/schema/convert.rs

View check run for this annotation

Codecov / codecov/patch

src/io/parquet/read/schema/convert.rs#L240

Added line #L240 was not covered by tests
}
_ => to_struct(fields),
_ => to_struct(fields, options),
}
}

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_struct(fields: &[ParquetType]) -> Option<DataType> {
let fields = fields.iter().filter_map(to_field).collect::<Vec<Field>>();
fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<DataType> {
let fields = fields
.iter()
.filter_map(|f| to_field(f, options))
.collect::<Vec<Field>>();
if fields.is_empty() {
None
} else {
Expand All @@ -240,8 +259,8 @@

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_map(fields: &[ParquetType]) -> Option<DataType> {
let inner = to_field(&fields[0])?;
fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<DataType> {
let inner = to_field(&fields[0], options)?;

Check warning on line 263 in src/io/parquet/read/schema/convert.rs

View check run for this annotation

Codecov / codecov/patch

src/io/parquet/read/schema/convert.rs#L262-L263

Added lines #L262 - L263 were not covered by tests
Some(DataType::Map(Box::new(inner), false))
}

Expand All @@ -254,16 +273,17 @@
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
if field_info.repetition == Repetition::Repeated {
Some(DataType::List(Box::new(Field::new(
&field_info.name,
to_struct(fields)?,
to_struct(fields, options)?,
is_nullable(field_info),
))))
} else {
non_repeated_group(logical_type, converted_type, fields, parent_name)
non_repeated_group(logical_type, converted_type, fields, parent_name, options)
}
}

Expand All @@ -279,10 +299,10 @@
/// Converts parquet schema to arrow field.
/// Returns `None` iff the parquet type has no associated primitive types,
/// i.e. if it is a column-less group type.
fn to_field(type_: &ParquetType) -> Option<Field> {
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
Some(Field::new(
&type_.get_field_info().name,
to_data_type(type_)?,
to_data_type(type_, options)?,
is_nullable(type_.get_field_info()),
))
}
Expand All @@ -291,21 +311,25 @@
///
/// To fully understand this algorithm, please refer to
/// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
fn to_list(fields: &[ParquetType], parent_name: &str) -> Option<DataType> {
fn to_list(
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
let item = fields.first().unwrap();

let item_type = match item {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive)),
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)),
ParquetType::GroupType { fields, .. } => {
if fields.len() == 1
&& item.name() != "array"
&& item.name() != format!("{parent_name}_tuple")
{
// extract the repetition field
let nested_item = fields.first().unwrap();
to_data_type(nested_item)
to_data_type(nested_item, options)
} else {
to_struct(fields)
to_struct(fields, options)
}
}
}?;
Expand Down Expand Up @@ -346,9 +370,12 @@
///
/// If this schema is a group type and none of its children is reserved in the
/// conversion, the result is Ok(None).
pub(crate) fn to_data_type(type_: &ParquetType) -> Option<DataType> {
pub(crate) fn to_data_type(
type_: &ParquetType,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
match type_ {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive)),
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)),
ParquetType::GroupType {
field_info,
logical_type,
Expand All @@ -364,6 +391,7 @@
converted_type,
fields,
&field_info.name,
options,
)
}
}
Expand Down Expand Up @@ -973,4 +1001,52 @@
assert_eq!(arrow_fields, fields);
Ok(())
}

#[test]
fn test_int96_options() -> Result<()> {
for tu in [
TimeUnit::Second,
TimeUnit::Microsecond,
TimeUnit::Millisecond,
TimeUnit::Nanosecond,
] {
let message_type = "
message arrow_schema {
REQUIRED INT96 int96_field;
OPTIONAL GROUP int96_list (LIST) {
REPEATED GROUP list {
OPTIONAL INT96 element;
}
}
REQUIRED GROUP int96_struct {
REQUIRED INT96 int96_field;
}
}
";
let coerced_to = DataType::Timestamp(tu, None);
let arrow_fields = vec![
Field::new("int96_field", coerced_to.clone(), false),
Field::new(
"int96_list",
DataType::List(Box::new(Field::new("element", coerced_to.clone(), true))),
true,
),
Field::new(
"int96_struct",
DataType::Struct(vec![Field::new("int96_field", coerced_to.clone(), false)]),
false,
),
];

let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema_with_options(
parquet_schema.fields(),
&Some(SchemaInferenceOptions {
int96_coerce_to_timeunit: tu,
}),
);
assert_eq!(arrow_fields, fields);
}
Ok(())
}
}
34 changes: 31 additions & 3 deletions src/io/parquet/read/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! APIs to handle Parquet <-> Arrow schemas.
use crate::datatypes::Schema;
use crate::datatypes::{Schema, TimeUnit};
use crate::error::Result;

mod convert;
mod metadata;

pub use convert::parquet_to_arrow_schema;
pub use convert::{parquet_to_arrow_schema, parquet_to_arrow_schema_with_options};
pub use metadata::read_schema_from_metadata;
pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor};
pub use parquet2::schema::types::ParquetType;
Expand All @@ -14,18 +14,46 @@ pub(crate) use convert::*;

use self::metadata::parse_key_value_metadata;

/// Options when inferring schemas from Parquet
pub struct SchemaInferenceOptions {
/// When inferring schemas from the Parquet INT96 timestamp type, this is the corresponding TimeUnit
/// in the inferred Arrow Timestamp type.
///
/// This defaults to `TimeUnit::Nanosecond`, but INT96 timestamps outside of the range of years 1678-2262,
/// will overflow when parsed as `Timestamp(TimeUnit::Nanosecond)`. Setting this to a lower resolution
/// (e.g. TimeUnit::Milliseconds) will result in loss of precision, but support a larger range of dates
/// without overflowing when parsing the data.
pub int96_coerce_to_timeunit: TimeUnit,
}

impl Default for SchemaInferenceOptions {
fn default() -> Self {
SchemaInferenceOptions {
int96_coerce_to_timeunit: TimeUnit::Nanosecond,
}
}
}

/// Infers a [`Schema`] from parquet's [`FileMetaData`]. This first looks for the metadata key
/// `"ARROW:schema"`; if it does not exist, it converts the parquet types declared in the
/// file's parquet schema to Arrow's equivalent.
/// # Error
/// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded,
/// indicating that that the file's arrow metadata was incorrectly written.
pub fn infer_schema(file_metadata: &FileMetaData) -> Result<Schema> {
infer_schema_with_options(file_metadata, &None)
}

/// Like [`infer_schema`] but with configurable options which affects the behavior of inference
pub fn infer_schema_with_options(
file_metadata: &FileMetaData,
options: &Option<SchemaInferenceOptions>,
) -> Result<Schema> {
let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata());

let schema = read_schema_from_metadata(&mut metadata)?;
Ok(schema.unwrap_or_else(|| {
let fields = parquet_to_arrow_schema(file_metadata.schema().fields());
let fields = parquet_to_arrow_schema_with_options(file_metadata.schema().fields(), options);
Schema { fields, metadata }
}))
}
Loading