Skip to content

Commit 389f404

Browse files
authored
Fix errors when reading nested Lists with pushdown predicates. (#8866)
Note this PR contains a 1 line fix, and the rest is tests , comments, and reorganization to support the tests # Which issue does this PR close? - Closes #8657 # Rationale for this change #8657 is a regression The check for "is this column nested" did not work correctly for Lists in Parquet, due to the [somewhat wacky way Lists](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists) are encoded # What changes are included in this PR? 1. Fix the bug 2. Move the code into `ProjectionMask::without_nested_types`, mostly so I could write better tests for it 3. Write a lot of tests # Are these changes tested? Yes, both the reproducer from #8657 and a bunch of tests are added # Are there any user-facing changes? There is a new API
1 parent 0dcc401 commit 389f404

File tree

3 files changed

+351
-51
lines changed

3 files changed

+351
-51
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -774,18 +774,18 @@ mod tests {
774774
};
775775
use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
776776
use crate::arrow::schema::virtual_type::RowNumber;
777-
use crate::arrow::{ArrowWriter, ProjectionMask};
777+
use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
778778
use crate::file::metadata::ParquetMetaDataReader;
779779
use crate::file::properties::WriterProperties;
780780
use arrow::compute::kernels::cmp::eq;
781781
use arrow::compute::or;
782782
use arrow::error::Result as ArrowResult;
783-
use arrow_array::builder::{ListBuilder, StringBuilder};
783+
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
784784
use arrow_array::cast::AsArray;
785785
use arrow_array::types::Int32Type;
786786
use arrow_array::{
787-
Array, ArrayRef, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray,
788-
StructArray, UInt64Array,
787+
Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader,
788+
Scalar, StringArray, StructArray, UInt64Array,
789789
};
790790
use arrow_schema::{DataType, Field, Schema};
791791
use arrow_select::concat::concat_batches;
@@ -2249,4 +2249,63 @@ mod tests {
22492249
},
22502250
);
22512251
}
2252+
2253+
#[tokio::test]
2254+
async fn test_nested_lists() -> Result<()> {
2255+
// Test case for https://github.com/apache/arrow-rs/issues/8657
2256+
let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
2257+
let table_schema = Arc::new(Schema::new(vec![
2258+
Field::new("id", DataType::Int32, false),
2259+
Field::new("vector", DataType::List(list_inner_field.clone()), true),
2260+
]));
2261+
2262+
let mut list_builder =
2263+
ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
2264+
list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
2265+
list_builder.append(true);
2266+
list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
2267+
list_builder.append(true);
2268+
list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
2269+
list_builder.append(true);
2270+
list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
2271+
list_builder.append(true);
2272+
let list_array = list_builder.finish();
2273+
2274+
let data = vec![RecordBatch::try_new(
2275+
table_schema.clone(),
2276+
vec![
2277+
Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
2278+
Arc::new(list_array),
2279+
],
2280+
)?];
2281+
2282+
let mut buffer = Vec::new();
2283+
let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
2284+
2285+
for batch in data {
2286+
writer.write(&batch).await?;
2287+
}
2288+
2289+
writer.close().await?;
2290+
2291+
let reader = TestReader::new(Bytes::from(buffer));
2292+
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
2293+
2294+
let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
2295+
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2296+
});
2297+
2298+
let projection_mask = ProjectionMask::all();
2299+
2300+
let mut stream = builder
2301+
.with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
2302+
.with_projection(projection_mask)
2303+
.build()?;
2304+
2305+
while let Some(batch) = stream.next().await {
2306+
let _ = batch.unwrap(); // ensure there is no panic
2307+
}
2308+
2309+
Ok(())
2310+
}
22522311
}

0 commit comments

Comments
 (0)