Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-variant-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod variant_array_builder;
pub mod variant_get;
mod variant_to_arrow;

pub use variant_array::{ShreddingState, VariantArray, VariantType};
pub use variant_array::{ShreddingState, VariantArray, VariantArrayValue, VariantType};
pub use variant_array_builder::{VariantArrayBuilder, VariantValueArrayBuilder};

pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options};
Expand Down
4 changes: 3 additions & 1 deletion parquet-variant-compute/src/shred_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<Variant
if array.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(array.value(i))?;
array
.value(i)
.consume(|value| builder.append_value(value))?;
}
}
let (value, typed_value, nulls) = builder.finish()?;
Expand Down
141 changes: 133 additions & 8 deletions parquet-variant-compute/src/variant_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use arrow::datatypes::{
};
use arrow_schema::extension::ExtensionType;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, TimeUnit};
use parquet_variant::Uuid;
use parquet_variant::Variant;
use parquet_variant::{Uuid, Variant, VariantList, VariantMetadata, VariantObject};

use std::borrow::Cow;
use std::sync::Arc;
Expand Down Expand Up @@ -73,6 +72,131 @@ impl ExtensionType for VariantType {
}
}

/// A [`Cow`]-like representation of a [`Variant`] value returned by [`VariantArray::value`], which
/// may use owned or borrowed value bytes depending on how the underlying variant was shredded. We
/// cannot "just" use [`Cow`] because of the special lifetime management that [`Variant`] requires.
pub enum VariantArrayValue<'m, 'v> {
Borrowed(Variant<'m, 'v>),
Owned {
metadata: VariantMetadata<'m>,
value_bytes: Vec<u8>,
},
}

impl<'m, 'v> VariantArrayValue<'m, 'v> {
/// Creates a new instance that borrows from a normal [`Variant`] value.
pub fn borrowed(value: Variant<'m, 'v>) -> Self {
Self::Borrowed(value)
}

/// Creates a new instance that wraps owned bytes that can be converted to a [`Variant`] value.
pub fn owned(metadata_bytes: &'m [u8], value_bytes: Vec<u8>) -> Self {
Self::Owned {
metadata: VariantMetadata::new(metadata_bytes),
value_bytes,
}
}

/// Consumes this variant value, passing the result to a `visitor` function.
///
/// The visitor idiom is helpful because a variant value based on owned bytes cannot outlive
/// self.
pub fn consume<R>(self, visitor: impl FnOnce(Variant<'_, '_>) -> R) -> R {
match self {
VariantArrayValue::Borrowed(v) => visitor(v),
VariantArrayValue::Owned {
metadata,
value_bytes,
} => visitor(Variant::new_with_metadata(metadata, &value_bytes)),
}
}

// internal helper for when we don't want to pay the extra clone
fn as_variant_cow(&self) -> Cow<'_, Variant<'m, '_>> {
match self {
VariantArrayValue::Borrowed(v) => Cow::Borrowed(v),
VariantArrayValue::Owned {
metadata,
value_bytes,
} => Cow::Owned(Variant::new_with_metadata(metadata.clone(), value_bytes)),
}
}

/// Returns a [`Variant`] instance for this value.
pub fn as_variant(&self) -> Variant<'m, '_> {
self.as_variant_cow().into_owned()
}

/// Returns the variant metadata that backs this value.
pub fn metadata(&self) -> &VariantMetadata<'m> {
match self {
VariantArrayValue::Borrowed(v) => v.metadata(),
VariantArrayValue::Owned { metadata, .. } => metadata,
Comment on lines +133 to +134
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be implemented directly, without resorting to the cost of calling self.as_variant_cow()

}
}

/// Extracts the underlying [`VariantObject`], if this is a variant object.
///
/// See also [`Variant::as_object`].
pub fn as_object(&self) -> Option<VariantObject<'m, '_>> {
self.as_variant_cow().as_object().cloned()
}

/// Extracts the underlying [`VariantList`], if this is a variant array.
///
/// See also [`Variant::as_list`].
pub fn as_list(&self) -> Option<VariantList<'m, '_>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two as_xxx methods are not as efficient as their Variant counterparts, because they must return an owned object/list rather than a borrowed reference. But they're also used only in test code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a note in the docs clarifying that this is test-only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if we think it's important we could mark them #[cfg(test)] to force them test-only? But I'm not sure it really matters? They produce correct results, just not super-efficiently.

self.as_variant_cow().as_list().cloned()
}

/// Extracts the value of the named variant object field, if this is a variant object.
///
/// See also [`Variant::get_object_field`].
pub fn get_object_field<'s>(&'s self, field_name: &str) -> Option<Variant<'m, 's>> {
self.as_variant_cow().get_object_field(field_name)
}

/// Extracts the value of the variant array element at `index`, if this is a variant object.
///
/// See also [`Variant::get_list_element`].
pub fn get_list_element(&self, index: usize) -> Option<Variant<'m, '_>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Variant these two methods return is limited to the lifetime of self, because it could be owned.
This restriction didn't seem to impact any any code. Only unit tests call these methods.

self.as_variant_cow().get_list_element(index)
}
}

impl<'m, 'v> From<Variant<'m, 'v>> for VariantArrayValue<'m, 'v> {
fn from(value: Variant<'m, 'v>) -> Self {
Self::borrowed(value)
}
}

// By providing PartialEq for all three combinations, we avoid changing a lot of unit test code that
// relies on comparisons.
impl PartialEq for VariantArrayValue<'_, '_> {
fn eq(&self, other: &VariantArrayValue<'_, '_>) -> bool {
self.as_variant_cow().as_ref() == other.as_variant_cow().as_ref()
}
}

impl PartialEq<Variant<'_, '_>> for VariantArrayValue<'_, '_> {
fn eq(&self, other: &Variant<'_, '_>) -> bool {
self.as_variant_cow().as_ref() == other
}
}

impl PartialEq<VariantArrayValue<'_, '_>> for Variant<'_, '_> {
fn eq(&self, other: &VariantArrayValue<'_, '_>) -> bool {
self == other.as_variant_cow().as_ref()
}
}

// Make it transparent -- looks just like the underlying value it proxies
impl std::fmt::Debug for VariantArrayValue<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.as_variant_cow().as_ref().fmt(f)
}
}

/// An array of Parquet [`Variant`] values
///
/// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying
Expand Down Expand Up @@ -353,19 +477,19 @@ impl VariantArray {
///
/// Note: Does not do deep validation of the [`Variant`], so it is up to the
/// caller to ensure that the metadata and value were constructed correctly.
pub fn value(&self, index: usize) -> Variant<'_, '_> {
pub fn value(&self, index: usize) -> VariantArrayValue<'_, '_> {
match (self.typed_value_field(), self.value_field()) {
// Always prefer typed_value, if available
(Some(typed_value), value) if typed_value.is_valid(index) => {
typed_value_to_variant(typed_value, value, index)
}
// Otherwise fall back to value, if available
(_, Some(value)) if value.is_valid(index) => {
Variant::new(self.metadata.value(index), value.value(index))
Variant::new(self.metadata.value(index), value.value(index)).into()
}
// It is technically invalid for neither value nor typed_value fields to be available,
// but the spec specifically requires readers to return Variant::Null in this case.
_ => Variant::Null,
_ => Variant::Null.into(),
}
}

Expand Down Expand Up @@ -782,13 +906,13 @@ fn typed_value_to_variant<'a>(
typed_value: &'a ArrayRef,
value: Option<&BinaryViewArray>,
index: usize,
) -> Variant<'a, 'a> {
) -> VariantArrayValue<'a, 'a> {
let data_type = typed_value.data_type();
if value.is_some_and(|v| !matches!(data_type, DataType::Struct(_)) && v.is_valid(index)) {
// Only a partially shredded struct is allowed to have values for both columns
panic!("Invalid variant, conflicting value and typed_value");
}
match data_type {
let value = match data_type {
DataType::Boolean => {
let boolean_array = typed_value.as_boolean();
let value = boolean_array.value(index);
Expand Down Expand Up @@ -850,7 +974,8 @@ fn typed_value_to_variant<'a>(
);
Variant::Null
}
}
};
value.into()
}

/// Workaround for lack of direct support for BinaryArray
Expand Down
3 changes: 2 additions & 1 deletion parquet-variant-compute/src/variant_array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ mod test {
let mut builder = VariantValueArrayBuilder::new(3);

// straight copy
builder.append_value(array.value(0));
array.value(0).consume(|value| builder.append_value(value));

// filtering fields takes more work because we need to manually create an object builder
let value = array.value(1);
Expand All @@ -498,6 +498,7 @@ mod test {

// same bytes, but now nested and duplicated inside a list
let value = array.value(2);
let value = value.as_variant();
let mut metadata_builder = ReadOnlyMetadataBuilder::new(value.metadata().clone());
let state = builder.parent_state(&mut metadata_builder);
ListBuilder::new(state, false)
Expand Down
36 changes: 27 additions & 9 deletions parquet-variant-compute/src/variant_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ fn shredded_get_path(
if target.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(target.value(i))?;
target
.value(i)
.consume(|value| builder.append_value(value))?;
}
}
builder.finish()
Expand Down Expand Up @@ -1392,7 +1394,9 @@ mod test {
let json_str = r#"{"x": 42}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1403,7 +1407,9 @@ mod test {
let json_str = r#"{"x": "foo"}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1414,7 +1420,9 @@ mod test {
let json_str = r#"{"y": 10}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1433,7 +1441,9 @@ mod test {
let json_str = r#"{"a": {"x": 55}, "b": 42}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1444,7 +1454,9 @@ mod test {
let json_str = r#"{"a": {"x": "foo"}, "b": 42}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1463,7 +1475,9 @@ mod test {
let json_str = r#"{"a": {"b": {"x": 100}}}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1474,7 +1488,9 @@ mod test {
let json_str = r#"{"a": {"b": {"x": "bar"}}}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand All @@ -1485,7 +1501,9 @@ mod test {
let json_str = r#"{"a": {"b": {"y": 200}}}"#;
let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str]));
if let Ok(variant_array) = json_to_variant(&string_array) {
builder.append_variant(variant_array.value(0));
variant_array
.value(0)
.consume(|value| builder.append_variant(value));
} else {
builder.append_null();
}
Expand Down
6 changes: 3 additions & 3 deletions parquet-variant/src/variant.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were some weird uses of 'm lifetime that should never have been there and caused lifetime problems in this PR.

Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ impl<'m, 'v> Variant<'m, 'v> {
/// let obj = variant.as_object().expect("variant should be an object");
/// assert_eq!(obj.get("name"), Some(Variant::from("John")));
/// ```
pub fn as_object(&'m self) -> Option<&'m VariantObject<'m, 'v>> {
pub fn as_object(&self) -> Option<&VariantObject<'m, 'v>> {
if let Variant::Object(obj) = self {
Some(obj)
} else {
Expand Down Expand Up @@ -1280,7 +1280,7 @@ impl<'m, 'v> Variant<'m, 'v> {
/// assert_eq!(list.get(0).unwrap(), Variant::from("John"));
/// assert_eq!(list.get(1).unwrap(), Variant::from("Doe"));
/// ```
pub fn as_list(&'m self) -> Option<&'m VariantList<'m, 'v>> {
pub fn as_list(&self) -> Option<&VariantList<'m, 'v>> {
if let Variant::List(list) = self {
Some(list)
} else {
Expand Down Expand Up @@ -1308,7 +1308,7 @@ impl<'m, 'v> Variant<'m, 'v> {
/// let v2 = Variant::from("Hello");
/// assert_eq!(None, v2.as_time_utc());
/// ```
pub fn as_time_utc(&'m self) -> Option<NaiveTime> {
pub fn as_time_utc(&self) -> Option<NaiveTime> {
if let Variant::Time(time) = self {
Some(*time)
} else {
Expand Down
Loading