Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,10 @@ where
/// Note: The Parquet schema and Arrow schema do not have to be identical (for
/// example, the columns may be in different orders and one or the other schemas
/// may have additional columns). The function [`parquet_column`] is used to
/// match the column in the Parquet schema to the column in the Arrow schema.
/// match the column in the Parquet schema to the column in the Arrow schema
/// when using [`Self::try_new`]. For nested leaf fields (for example fields
/// within a struct), use [`Self::from_column_index`] with a pre-resolved
/// Parquet leaf column index.
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// the index of the matched column in the Parquet schema
Expand Down Expand Up @@ -1454,6 +1457,9 @@ impl<'a> StatisticsConverter<'a> {
/// arrays will be null. This can happen if the column is in the arrow
/// schema but not in the parquet schema due to schema evolution.
///
/// This constructor only supports top-level, non-nested columns. For nested
/// leaf fields, use [`Self::from_column_index`].
///
/// See example on [`Self::row_group_mins`] for usage
///
/// # Errors
Expand Down Expand Up @@ -1495,6 +1501,37 @@ impl<'a> StatisticsConverter<'a> {
})
}

/// Create a new `StatisticsConverter` from a Parquet leaf column index.
///
/// Unlike [`Self::try_new`], this constructor bypasses schema resolution
/// and accepts a pre-resolved Parquet leaf column index directly. This is
/// useful for nested leaf fields where the caller already knows how the
/// Arrow field maps to the Parquet schema.
///
/// # Errors
///
/// * If `parquet_column_index` is out of bounds for `parquet_schema`
pub fn from_column_index(
parquet_column_index: usize,
arrow_field: &'a Field,
parquet_schema: &'a SchemaDescriptor,
) -> Result<Self> {
if parquet_column_index >= parquet_schema.columns().len() {
return Err(arrow_err!(format!(
"Parquet column index {} out of bounds, max {}",
parquet_column_index,
parquet_schema.columns().len()
)));
}

Ok(Self {
parquet_column_index: Some(parquet_column_index),
arrow_field,
missing_null_counts_as_zero: true,
physical_type: Some(parquet_schema.column(parquet_column_index).physical_type()),
})
}

/// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
///
/// # Return Value
Expand Down
74 changes: 74 additions & 0 deletions parquet/tests/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,80 @@ async fn test_boolean() {
.run();
}

// struct array
#[tokio::test]
async fn test_struct_children_from_column_index() {
let reader = TestReader {
scenario: Scenario::StructArray,
row_per_group: 5,
}
.build()
.await;

let schema = reader.schema();
let parquet_schema = reader.parquet_schema();
let (_idx, struct_field) = schema.column_with_name("struct").unwrap();
let DataType::Struct(fields) = struct_field.data_type() else {
panic!("expected struct field, got {:?}", struct_field.data_type());
};

let cases = vec![
(
"struct.int32_col",
"int32_col",
Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef,
UInt64Array::from(vec![1]),
),
(
"struct.float32_col",
"float32_col",
Arc::new(Float32Array::from(vec![Some(6.0)])) as ArrayRef,
Arc::new(Float32Array::from(vec![Some(8.5)])) as ArrayRef,
UInt64Array::from(vec![0]),
),
(
"struct.float64_col",
"float64_col",
Arc::new(Float64Array::from(vec![Some(12.0)])) as ArrayRef,
Arc::new(Float64Array::from(vec![Some(14.0)])) as ArrayRef,
UInt64Array::from(vec![1]),
),
];

for (parquet_path, field_name, expected_min, expected_max, expected_null_counts) in cases {
let parquet_column_index = parquet_schema
.columns()
.iter()
.position(|col| col.path().string() == parquet_path)
.unwrap();
let (_field_idx, child_field) = fields.find(field_name).unwrap();

let converter = StatisticsConverter::from_column_index(
parquet_column_index,
child_field.as_ref(),
parquet_schema,
)
.unwrap();

assert_eq!(converter.parquet_column_index(), Some(parquet_column_index));
assert_eq!(converter.arrow_field(), child_field.as_ref());

Test {
reader: &reader,
expected_min,
expected_max,
expected_null_counts,
expected_row_counts: Some(UInt64Array::from(vec![3])),
expected_max_value_exact: BooleanArray::from(vec![true]),
expected_min_value_exact: BooleanArray::from(vec![true]),
column_name: field_name,
check: Check::RowGroup,
}
.run_checks(converter);
}
}

// struct array
// BUG
// https://github.com/apache/datafusion/issues/10609
Expand Down