Skip to content

Commit

Permalink
Parquet AsyncReader: Don't panic when empty offset_index is Some([]) (#…
Browse files Browse the repository at this point in the history
…6582)

* Don't panic when empty offset_index is Some([])

* Remove duplicate test

* Add a test with offset_index where page_locations doesnt end up as None

* remove unused imports

* Update parquet/src/arrow/async_reader/mod.rs

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* Rename test, add explanatory comments

* cargo fmt

---------

Co-authored-by: jroddev <jroddev@jroddev.com>
Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
4 people authored Oct 30, 2024
1 parent 933d348 commit 56d4713
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ where
let offset_index = self
.metadata
.offset_index()
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|x| x[row_group_idx].as_slice());

let mut row_group = InMemoryRowGroup {
Expand Down Expand Up @@ -828,6 +830,8 @@ impl RowGroups for InMemoryRowGroup<'_> {
Some(data) => {
let page_locations = self
.offset_index
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
Expand Down Expand Up @@ -2037,4 +2041,105 @@ mod tests {
// Should only have made 3 requests
assert_eq!(requests.lock().unwrap().len(), 3);
}

#[tokio::test]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let mut metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size as usize)
.await
.unwrap();

metadata.set_offset_index(Some(vec![]));
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();

let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}

#[tokio::test]
async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size as usize)
.await
.unwrap();

let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();

let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 8);
}

#[tokio::test]
async fn empty_offset_index_doesnt_panic_in_column_chunks() {
use tempfile::TempDir;
use tokio::fs::File;
fn write_metadata_to_local_file(
metadata: ParquetMetaData,
file: impl AsRef<std::path::Path>,
) {
use crate::file::metadata::ParquetMetaDataWriter;
use std::fs::File;
let file = File::create(file).unwrap();
ParquetMetaDataWriter::new(file, &metadata)
.finish()
.unwrap()
}

fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
use std::fs::File;
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&file)
.unwrap()
}

let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size as usize)
.await
.unwrap();

let tempdir = TempDir::new().unwrap();
let metadata_path = tempdir.path().join("thrift_metadata.dat");
write_metadata_to_local_file(metadata, &metadata_path);
let metadata = read_metadata_from_local_file(&metadata_path);

let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();

// Panics here
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
}

0 comments on commit 56d4713

Please sign in to comment.