-
Notifications
You must be signed in to change notification settings - Fork 1k
Variant integration fixes #8438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e08bcaa
4f45954
811f4a5
127e3ae
fe4d628
c98edee
ab129d9
54056fe
8a2e4af
edc7ecd
6a6e067
5de3489
8dd3ab7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,12 +23,13 @@ use arrow::buffer::NullBuffer; | |
use arrow::compute::cast; | ||
use arrow::datatypes::{ | ||
Date32Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, | ||
UInt16Type, UInt32Type, UInt64Type, UInt8Type, | ||
}; | ||
use arrow_schema::extension::ExtensionType; | ||
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields}; | ||
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, TimeUnit}; | ||
use parquet_variant::Uuid; | ||
use parquet_variant::Variant; | ||
|
||
use std::borrow::Cow; | ||
use std::sync::Arc; | ||
|
||
/// Arrow Variant [`ExtensionType`]. | ||
|
@@ -353,37 +354,18 @@ impl VariantArray { | |
/// Note: Does not do deep validation of the [`Variant`], so it is up to the | ||
/// caller to ensure that the metadata and value were constructed correctly. | ||
pub fn value(&self, index: usize) -> Variant<'_, '_> { | ||
match &self.shredding_state { | ||
ShreddingState::Unshredded { value, .. } => { | ||
// Unshredded case | ||
Variant::new(self.metadata.value(index), value.value(index)) | ||
} | ||
ShreddingState::Typed { typed_value, .. } => { | ||
// Typed case (formerly PerfectlyShredded) | ||
if typed_value.is_null(index) { | ||
Variant::Null | ||
} else { | ||
typed_value_to_variant(typed_value, index) | ||
} | ||
} | ||
ShreddingState::PartiallyShredded { | ||
value, typed_value, .. | ||
} => { | ||
// PartiallyShredded case (formerly ImperfectlyShredded) | ||
if typed_value.is_null(index) { | ||
Variant::new(self.metadata.value(index), value.value(index)) | ||
} else { | ||
typed_value_to_variant(typed_value, index) | ||
} | ||
match (self.typed_value_field(), self.value_field()) { | ||
// Always prefer typed_value, if available | ||
(Some(typed_value), value) if typed_value.is_valid(index) => { | ||
typed_value_to_variant(typed_value, value, index) | ||
} | ||
ShreddingState::AllNull => { | ||
// AllNull case: neither value nor typed_value fields exist | ||
// NOTE: This handles the case where neither value nor typed_value fields exist. | ||
// For top-level variants, this returns Variant::Null (JSON null). | ||
// For shredded object fields, this technically should indicate SQL NULL, | ||
// but the current API cannot distinguish these contexts. | ||
Variant::Null | ||
// Otherwise fall back to value, if available | ||
(_, Some(value)) if value.is_valid(index) => { | ||
Variant::new(self.metadata.value(index), value.value(index)) | ||
} | ||
// It is technically invalid for neither value nor typed_value fields to be available, | ||
// but the spec specifically requires readers to return Variant::Null in this case. | ||
_ => Variant::Null, | ||
} | ||
} | ||
|
||
|
@@ -796,8 +778,17 @@ impl StructArrayBuilder { | |
} | ||
|
||
/// returns the non-null element at index as a Variant | ||
fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, '_> { | ||
match typed_value.data_type() { | ||
fn typed_value_to_variant<'a>( | ||
typed_value: &'a ArrayRef, | ||
value: Option<&BinaryViewArray>, | ||
index: usize, | ||
) -> Variant<'a, 'a> { | ||
let data_type = typed_value.data_type(); | ||
if value.is_some_and(|v| !matches!(data_type, DataType::Struct(_)) && v.is_valid(index)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll panic here if ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not need to panic if we have a struct here -- that corresponds to a partially shredded variant object, where the value is a variant object and the typed_value is a struct. Eventually, the code that handles partial shredding will detect if the value is not a variant object or contains field names that conflict with those of the |
||
// Only a partially shredded struct is allowed to have values for both columns | ||
panic!("Invalid variant, conflicting value and typed_value"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole panic thing is becoming increasingly awkward as more and more valid error cases arise. Especially because:
Now that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think adding However, it seems to me that most of these checks can be done once per array (e.g. this check for Can we perhaps move this check into the constructor of VariantArray 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this one is a row-oriented check, unlike the columnar type checks I added in For a specific row, both |
||
} | ||
match data_type { | ||
DataType::Boolean => { | ||
let boolean_array = typed_value.as_boolean(); | ||
let value = boolean_array.value(index); | ||
|
@@ -809,17 +800,11 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, ' | |
let date = Date32Type::to_naive_date(value); | ||
Variant::from(date) | ||
} | ||
DataType::FixedSizeBinary(binary_len) => { | ||
// 16-byte FixedSizeBinary alway corresponds to a UUID; all other sizes are illegal. | ||
DataType::FixedSizeBinary(16) => { | ||
let array = typed_value.as_fixed_size_binary(); | ||
// Try to treat 16 byte FixedSizeBinary as UUID | ||
let value = array.value(index); | ||
if *binary_len == 16 { | ||
if let Ok(uuid) = Uuid::from_slice(value) { | ||
return Variant::from(uuid); | ||
} | ||
} | ||
let value = array.value(index); | ||
Variant::from(value) | ||
Uuid::from_slice(value).unwrap().into() // unwrap is safe: slice is always 16 bytes | ||
} | ||
DataType::BinaryView => { | ||
let array = typed_value.as_binary_view(); | ||
|
@@ -843,18 +828,6 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, ' | |
DataType::Int64 => { | ||
primitive_conversion_single_value!(Int64Type, typed_value, index) | ||
} | ||
DataType::UInt8 => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand this correctly, the point is that since the Variant spec has no unsigned types, it wouldn't be permissible to shred out such arrow types https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly. I don't think the shredding spec directly says that, but it's implied because shredding is always presumed to start from binary encoded variant values and is a more efficient representation of the same. So throwing in random other types doesn't really make sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow, I'm blind... the spec definitely directly says which parquet logical types are allowed for shredded columns -- there's a section for it, including a table: |
||
primitive_conversion_single_value!(UInt8Type, typed_value, index) | ||
} | ||
DataType::UInt16 => { | ||
primitive_conversion_single_value!(UInt16Type, typed_value, index) | ||
} | ||
DataType::UInt32 => { | ||
primitive_conversion_single_value!(UInt32Type, typed_value, index) | ||
} | ||
DataType::UInt64 => { | ||
primitive_conversion_single_value!(UInt64Type, typed_value, index) | ||
} | ||
DataType::Float16 => { | ||
primitive_conversion_single_value!(Float16Type, typed_value, index) | ||
} | ||
|
@@ -891,28 +864,120 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, ' | |
/// | ||
/// So cast them to get the right type. | ||
fn cast_to_binary_view_arrays(array: &dyn Array) -> Result<ArrayRef, ArrowError> { | ||
let new_type = rewrite_to_view_types(array.data_type()); | ||
cast(array, &new_type) | ||
let new_type = canonicalize_and_verify_data_type(array.data_type())?; | ||
cast(array, new_type.as_ref()) | ||
} | ||
|
||
/// replaces all instances of Binary with BinaryView in a DataType | ||
fn rewrite_to_view_types(data_type: &DataType) -> DataType { | ||
match data_type { | ||
DataType::Binary => DataType::BinaryView, | ||
DataType::List(field) => DataType::List(rewrite_field_type(field)), | ||
DataType::Struct(fields) => { | ||
DataType::Struct(fields.iter().map(rewrite_field_type).collect()) | ||
} | ||
_ => data_type.clone(), | ||
/// Validates whether a given arrow decimal is a valid variant decimal | ||
/// | ||
/// NOTE: By a strict reading of the "decimal table" in the [shredding spec], each decimal type | ||
/// should have a width-dependent lower bound on precision as well as an upper bound (i.e. Decimal16 | ||
/// with precision 5 is invalid because Decimal4 "covers" it). But the variant shredding integration | ||
/// tests specifically expect such cases to succeed, so we only enforce the upper bound here. | ||
/// | ||
/// [shredding spec]: https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types | ||
fn is_valid_variant_decimal(p: &u8, s: &i8, max_precision: u8) -> bool { | ||
(1..=max_precision).contains(p) && (0..=*p as i8).contains(s) | ||
} | ||
|
||
/// Recursively visits a data type, ensuring that it only contains data types that can legally | ||
/// appear in a (possibly shredded) variant array. It also replaces Binary fields with BinaryView, | ||
/// since that's what comes back from the parquet reader and what the variant code expects to find. | ||
fn canonicalize_and_verify_data_type( | ||
data_type: &DataType, | ||
) -> Result<Cow<'_, DataType>, ArrowError> { | ||
use DataType::*; | ||
|
||
// helper macros | ||
macro_rules! fail { | ||
() => { | ||
return Err(ArrowError::InvalidArgumentError(format!( | ||
"Illegal shredded value type: {data_type}" | ||
))) | ||
}; | ||
} | ||
macro_rules! borrow { | ||
() => { | ||
Cow::Borrowed(data_type) | ||
}; | ||
} | ||
|
||
let new_data_type = match data_type { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
// Primitive arrow types that have a direct variant counterpart are allowed | ||
Null | Boolean => borrow!(), | ||
Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => borrow!(), | ||
|
||
// Unsigned integers and half-float are not allowed | ||
UInt8 | UInt16 | UInt32 | UInt64 | Float16 => fail!(), | ||
|
||
// Most decimal types are allowed, with restrictions on precision and scale | ||
Decimal32(p, s) if is_valid_variant_decimal(p, s, 9) => borrow!(), | ||
Decimal64(p, s) if is_valid_variant_decimal(p, s, 18) => borrow!(), | ||
Decimal128(p, s) if is_valid_variant_decimal(p, s, 38) => borrow!(), | ||
Decimal32(..) | Decimal64(..) | Decimal128(..) | Decimal256(..) => fail!(), | ||
|
||
// Only micro and nano timestamps are allowed | ||
Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _) => borrow!(), | ||
Timestamp(TimeUnit::Millisecond | TimeUnit::Second, _) => fail!(), | ||
|
||
// Only 32-bit dates and 64-bit microsecond time are allowed. | ||
Date32 | Time64(TimeUnit::Microsecond) => borrow!(), | ||
Date64 | Time32(_) | Time64(_) | Duration(_) | Interval(_) => fail!(), | ||
|
||
// Binary and string are allowed. Force Binary to BinaryView because that's what the parquet | ||
// reader returns and what the rest of the variant code expects. | ||
Binary => Cow::Owned(DataType::BinaryView), | ||
BinaryView | Utf8 => borrow!(), | ||
|
||
// UUID maps to 16-byte fixed-size binary; no other width is allowed | ||
FixedSizeBinary(16) => borrow!(), | ||
FixedSizeBinary(_) | FixedSizeList(..) => fail!(), | ||
|
||
// We can _possibly_ allow (some of) these some day? | ||
LargeBinary | LargeUtf8 | Utf8View | ListView(_) | LargeList(_) | LargeListView(_) => { | ||
fail!() | ||
} | ||
|
||
// Lists and struct are allowed, maps and unions are not | ||
List(field) => match canonicalize_and_verify_field(field)? { | ||
Cow::Borrowed(_) => borrow!(), | ||
Cow::Owned(new_field) => Cow::Owned(DataType::List(new_field)), | ||
}, | ||
// Struct is used by the internal layout, and can also represent a shredded variant object. | ||
Struct(fields) => { | ||
// Avoid allocation unless at least one field changes, to avoid unnecessary deep cloning | ||
// of the data type. Even if some fields change, the others are shallow arc clones. | ||
let mut new_fields = std::collections::HashMap::new(); | ||
for (i, field) in fields.iter().enumerate() { | ||
if let Cow::Owned(new_field) = canonicalize_and_verify_field(field)? { | ||
new_fields.insert(i, new_field); | ||
} | ||
} | ||
|
||
if new_fields.is_empty() { | ||
borrow!() | ||
} else { | ||
let new_fields = fields | ||
.iter() | ||
.enumerate() | ||
.map(|(i, field)| new_fields.remove(&i).unwrap_or_else(|| field.clone())); | ||
Cow::Owned(DataType::Struct(new_fields.collect())) | ||
} | ||
} | ||
Map(..) | Union(..) => fail!(), | ||
|
||
// We can _possibly_ support (some of) these some day? | ||
Dictionary(..) | RunEndEncoded(..) => fail!(), | ||
}; | ||
Ok(new_data_type) | ||
} | ||
|
||
fn rewrite_field_type(field: impl AsRef<Field>) -> Arc<Field> { | ||
let field = field.as_ref(); | ||
let new_field = field | ||
.clone() | ||
.with_data_type(rewrite_to_view_types(field.data_type())); | ||
Arc::new(new_field) | ||
fn canonicalize_and_verify_field(field: &Arc<Field>) -> Result<Cow<'_, Arc<Field>>, ArrowError> { | ||
let Cow::Owned(new_data_type) = canonicalize_and_verify_data_type(field.data_type())? else { | ||
return Ok(Cow::Borrowed(field)); | ||
}; | ||
let new_field = field.as_ref().clone().with_data_type(new_data_type); | ||
Ok(Cow::Owned(Arc::new(new_field))) | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was already substantial logic duplication among the different match arms, and it only got worse once
typed_value_to_variant
started requiring the value column (needed for both error checking now, and later when handling partially shredded objects). It turned out that directly referencing the two fields was a lot simpler.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow-up that continues this line of thought: