Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

feat: Add nested_column_iter_to_arrays to deserialize inner columns #1583

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,22 @@ where
.map(|x| x.map(|x| x.1)),
))
}

/// Basically the same as `column_iter_to_arrays`, with the addition of the `init` parameter
/// to read the inner columns of the nested type directly, instead of reading the entire nested type.
pub fn nested_column_iter_to_arrays<'a, I: 'a>(
columns: Vec<I>,
types: Vec<&PrimitiveType>,
field: Field,
init: Vec<InitNested>,
chunk_size: Option<usize>,
num_rows: usize,
) -> Result<ArrayIter<'a>>
where
I: Pages,
{
Ok(Box::new(
nested::columns_to_iter_recursive(columns, types, field, init, num_rows, chunk_size)?
.map(|x| x.map(|x| x.1)),
))
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{array::Array, error::Result};
use crate::types::{i256, NativeType};
pub use deserialize::{
column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
InitNested, NestedArrayIter, NestedState, StructIterator,
nested_column_iter_to_arrays, InitNested, NestedArrayIter, NestedState, StructIterator,
};
pub use file::{FileReader, RowGroupReader};
pub use row_group::*;
Expand Down
87 changes: 87 additions & 0 deletions tests/it/io/parquet/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::fs::File;

use arrow2::{
array::StructArray,
datatypes::DataType,
error::{Error, Result},
io::parquet::read::{
infer_schema, n_columns, nested_column_iter_to_arrays, read_columns, read_metadata,
to_deserializer, BasicDecompressor, InitNested, PageReader,
},
util::test_util::parquet_test_data,
};

#[test]
fn test_deserialize_nested_column() -> Result<()> {
let testdata = parquet_test_data();
let path = format!("{testdata}/nested_structs.rust.parquet");
let mut reader = File::open(&path).unwrap();

let metadata = read_metadata(&mut reader)?;
let schema = infer_schema(&metadata)?;

let num_rows = metadata.num_rows;
let row_group = metadata.row_groups[0].clone();

let field_columns = schema
.fields
.iter()
.map(|field| read_columns(&mut reader, row_group.columns(), &field.name))
.collect::<Result<Vec<_>, Error>>()?;

let fields = schema.fields.clone();
for (mut columns, field) in field_columns.into_iter().zip(fields.iter()) {
if let DataType::Struct(inner_fields) = &field.data_type {
let mut array_iter =
to_deserializer(columns.clone(), field.clone(), num_rows, None, None)?;
let array = array_iter.next().transpose()?.unwrap();
let expected_array = array
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone();

// deserialize inner values of struct fields.
let init = vec![InitNested::Struct(field.is_nullable)];
let mut values = Vec::with_capacity(inner_fields.len());
for inner_field in inner_fields {
let n = n_columns(&inner_field.data_type);
let inner_columns: Vec<_> = columns.drain(0..n).collect();

let (nestd_columns, types): (Vec<_>, Vec<_>) = inner_columns
.into_iter()
.map(|(column_meta, chunk)| {
let len = chunk.len();
let pages = PageReader::new(
std::io::Cursor::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
len * 2 + 1024,
);
(
BasicDecompressor::new(pages, vec![]),
&column_meta.descriptor().descriptor.primitive_type,
)
})
.unzip();

let mut inner_array_iter = nested_column_iter_to_arrays(
nestd_columns,
types,
inner_field.clone(),
init.clone(),
None,
num_rows,
)?;
let inner_array = inner_array_iter.next().transpose()?;
values.push(inner_array.unwrap());
}
let struct_array = StructArray::try_new(field.data_type.clone(), values, None)?;

assert_eq!(expected_array, struct_array);
}
}

Ok(())
}
1 change: 1 addition & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use arrow2::{
types::{days_ms, NativeType},
};

mod deserialize;
#[cfg(feature = "io_json_integration")]
mod integration;
mod read;
Expand Down
Loading