Skip to content

Commit a36bf7a

Browse files
authored
Parquet: clear metadata and project fields of ParquetRecordBatchStream::schema (#5135)
* Parquet: clear metadata of ParquetRecordBatchStream::schema * Revert "Parquet: clear metadata of ParquetRecordBatchStream::schema" This reverts commit 84be336. * Document expected behaviour * Revert "Document expected behaviour" This reverts commit ef9601e. * Reapply "Parquet: clear metadata of ParquetRecordBatchStream::schema" This reverts commit fd662ad. * ParquetRecordBatchStream should strip schema metadata and respect projection * Fix projection of nested fields
1 parent f16d2f5 commit a36bf7a

File tree

2 files changed

+135
-5
lines changed

2 files changed

+135
-5
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader {
575575
}
576576

577577
impl RecordBatchReader for ParquetRecordBatchReader {
578+
/// Returns the projected [`SchemaRef`] for reading the parquet file.
579+
///
580+
/// Note that the schema metadata will be stripped here. See
581+
/// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
578582
fn schema(&self) -> SchemaRef {
579583
self.schema.clone()
580584
}

parquet/src/arrow/async_reader/mod.rs

Lines changed: 131 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ use futures::stream::Stream;
9090
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
9191

9292
use arrow_array::RecordBatch;
93-
use arrow_schema::SchemaRef;
93+
use arrow_schema::{DataType, Fields, Schema, SchemaRef};
9494

9595
use crate::arrow::array_reader::{build_array_reader, RowGroups};
9696
use crate::arrow::arrow_reader::{
@@ -385,13 +385,24 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
385385
offset: self.offset,
386386
};
387387

388+
// Ensure schema of ParquetRecordBatchStream respects projection, and does
389+
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
390+
let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
391+
Some(DataType::Struct(fields)) => {
392+
fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
393+
}
394+
None => Fields::empty(),
395+
_ => unreachable!("Must be Struct for root type"),
396+
};
397+
let schema = Arc::new(Schema::new(projected_fields));
398+
388399
Ok(ParquetRecordBatchStream {
389400
metadata: self.metadata,
390401
batch_size,
391402
row_groups,
392403
projection: self.projection,
393404
selection: self.selection,
394-
schema: self.schema,
405+
schema,
395406
reader: Some(reader),
396407
state: StreamState::Init,
397408
})
@@ -572,7 +583,10 @@ impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
572583
}
573584

574585
impl<T> ParquetRecordBatchStream<T> {
575-
/// Returns the [`SchemaRef`] for this parquet file
586+
/// Returns the projected [`SchemaRef`] for reading the parquet file.
587+
///
588+
/// Note that the schema metadata will be stripped here. See
589+
/// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
576590
pub fn schema(&self) -> &SchemaRef {
577591
&self.schema
578592
}
@@ -855,11 +869,15 @@ mod tests {
855869
use arrow_array::builder::{ListBuilder, StringBuilder};
856870
use arrow_array::cast::AsArray;
857871
use arrow_array::types::Int32Type;
858-
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array};
872+
use arrow_array::{
873+
Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
874+
StructArray, UInt64Array,
875+
};
859876
use arrow_schema::{DataType, Field, Schema};
860877
use futures::{StreamExt, TryStreamExt};
861878
use rand::{thread_rng, Rng};
862-
use std::sync::Mutex;
879+
use std::collections::HashMap;
880+
use std::sync::{Arc, Mutex};
863881
use tempfile::tempfile;
864882

865883
#[derive(Clone)]
@@ -1584,6 +1602,114 @@ mod tests {
15841602
test_get_row_group_column_bloom_filter(data, false).await;
15851603
}
15861604

1605+
#[tokio::test]
1606+
async fn test_parquet_record_batch_stream_schema() {
1607+
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1608+
schema.all_fields().iter().map(|f| f.name()).collect()
1609+
}
1610+
1611+
// ParquetRecordBatchReaderBuilder::schema differs from
1612+
// ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1613+
// schema contents (in terms of custom metadata attached to schema, and fields
1614+
// returned). Test to ensure this remains consistent behaviour.
1615+
//
1616+
// Ensure same for asynchronous versions of the above.
1617+
1618+
// Prep data, for a schema with nested fields, with custom metadata
1619+
let mut metadata = HashMap::with_capacity(1);
1620+
metadata.insert("key".to_string(), "value".to_string());
1621+
1622+
let nested_struct_array = StructArray::from(vec![
1623+
(
1624+
Arc::new(Field::new("d", DataType::Utf8, true)),
1625+
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1626+
),
1627+
(
1628+
Arc::new(Field::new("e", DataType::Utf8, true)),
1629+
Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1630+
),
1631+
]);
1632+
let struct_array = StructArray::from(vec![
1633+
(
1634+
Arc::new(Field::new("a", DataType::Int32, true)),
1635+
Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1636+
),
1637+
(
1638+
Arc::new(Field::new("b", DataType::UInt64, true)),
1639+
Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1640+
),
1641+
(
1642+
Arc::new(Field::new(
1643+
"c",
1644+
nested_struct_array.data_type().clone(),
1645+
true,
1646+
)),
1647+
Arc::new(nested_struct_array) as ArrayRef,
1648+
),
1649+
]);
1650+
1651+
let schema =
1652+
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1653+
let record_batch = RecordBatch::from(struct_array)
1654+
.with_schema(schema.clone())
1655+
.unwrap();
1656+
1657+
// Write parquet with custom metadata in schema
1658+
let mut file = tempfile().unwrap();
1659+
let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1660+
writer.write(&record_batch).unwrap();
1661+
writer.close().unwrap();
1662+
1663+
let all_fields = ["a", "b", "c", "d", "e"];
1664+
// (leaf indices in mask, expected names in output schema all fields)
1665+
let projections = [
1666+
(vec![], vec![]),
1667+
(vec![0], vec!["a"]),
1668+
(vec![0, 1], vec!["a", "b"]),
1669+
(vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1670+
(vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1671+
];
1672+
1673+
// Ensure we're consistent for each of these projections
1674+
for (indices, expected_projected_names) in projections {
1675+
let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1676+
// Builder schema should preserve all fields and metadata
1677+
assert_eq!(get_all_field_names(&builder), all_fields);
1678+
assert_eq!(builder.metadata, metadata);
1679+
// Reader & batch schema should show only projected fields, and no metadata
1680+
assert_eq!(get_all_field_names(&reader), expected_projected_names);
1681+
assert_eq!(reader.metadata, HashMap::default());
1682+
assert_eq!(get_all_field_names(&batch), expected_projected_names);
1683+
assert_eq!(batch.metadata, HashMap::default());
1684+
};
1685+
1686+
let builder =
1687+
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1688+
let sync_builder_schema = builder.schema().clone();
1689+
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1690+
let mut reader = builder.with_projection(mask).build().unwrap();
1691+
let sync_reader_schema = reader.schema();
1692+
let batch = reader.next().unwrap().unwrap();
1693+
let sync_batch_schema = batch.schema();
1694+
assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1695+
1696+
// asynchronous should be same
1697+
let file = tokio::fs::File::from(file.try_clone().unwrap());
1698+
let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1699+
let async_builder_schema = builder.schema().clone();
1700+
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1701+
let mut reader = builder.with_projection(mask).build().unwrap();
1702+
let async_reader_schema = reader.schema().clone();
1703+
let batch = reader.next().await.unwrap().unwrap();
1704+
let async_batch_schema = batch.schema();
1705+
assert_schemas(
1706+
async_builder_schema,
1707+
async_reader_schema,
1708+
async_batch_schema,
1709+
);
1710+
}
1711+
}
1712+
15871713
#[tokio::test]
15881714
async fn test_get_row_group_column_bloom_filter_with_length() {
15891715
// convert to new parquet file with bloom_filter_length

0 commit comments

Comments
 (0)