Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 168 additions & 17 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,57 @@ pub struct SerializedFileReader<R: ChunkReader> {
metadata: ParquetMetaData,
}

/// A builder for [`ReadOptions`].
/// For the predicates that are added to the builder,
/// they will be chained using 'AND' to filter the row groups.
pub struct ReadOptionsBuilder {
predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
}

impl ReadOptionsBuilder {
/// New builder
pub fn new() -> Self {
ReadOptionsBuilder { predicates: vec![] }
}

/// Add a predicate on row group metadata to the reading option,
/// Filter only row groups that match the predicate criteria
pub fn with_predicate(
mut self,
predicate: Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>,
) -> Self {
self.predicates.push(predicate);
self
}

/// Add a range predicate on filtering row groups if their midpoints are within
/// the Closed-Open range `[start..end) {x | start <= x < end}`
pub fn with_range(mut self, start: i64, end: i64) -> Self {
assert!(start < end);
let predicate = move |rg: &RowGroupMetaData, _: usize| {
let mid = get_midpoint_offset(rg);
mid >= start && mid < end
};
self.predicates.push(Box::new(predicate));
self
}

/// Seal the builder and return the read options
pub fn build(self) -> ReadOptions {
ReadOptions {
predicates: self.predicates,
}
}
}

/// A collection of options for reading a Parquet file.
///
/// Currently, only predicates on row group metadata are supported.
/// All predicates will be chained using 'AND' to filter the row groups.
pub struct ReadOptions {
predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
}

impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file.
/// Returns error if Parquet file does not exist or is corrupt.
Expand All @@ -138,25 +189,48 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
})
}

/// Filters row group metadata to only those row groups,
/// for which the predicate function returns true
pub fn filter_row_groups(
&mut self,
predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
) {
/// Creates file reader from a Parquet file with read options.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let metadata = footer::parse_metadata(&chunk_reader)?;
let mut predicates = options.predicates;
let row_groups = metadata.row_groups().to_vec();
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, row_group_metadata) in self.metadata.row_groups().iter().enumerate() {
if predicate(row_group_metadata, i) {
filtered_row_groups.push(row_group_metadata.clone());
for (i, rg_meta) in row_groups.into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
keep = false;
break;
}
}
if keep {
filtered_row_groups.push(rg_meta);
}
}
self.metadata = ParquetMetaData::new(
self.metadata.file_metadata().clone(),
filtered_row_groups,
);

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: ParquetMetaData::new(
metadata.file_metadata().clone(),
filtered_row_groups,
),
})
}
}

/// Get midpoint offset for a row group
fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
let col = meta.column(0);
let mut offset = col.data_page_offset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For encrypted Parquet files we'll need to use file_offset but it's fine for now since it's not supported anyways.

if let Some(dic_offset) = col.dictionary_page_offset() {
if offset > dic_offset {
offset = dic_offset
}
};
offset + meta.compressed_size() / 2
}

impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
fn metadata(&self) -> &ParquetMetaData {
&self.metadata
Expand Down Expand Up @@ -792,19 +866,96 @@ mod tests {
}

#[test]
fn test_file_reader_filter_row_groups() -> Result<()> {
fn test_file_reader_with_no_filter() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
let metadata = origin_reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
Ok(())
}

#[test]
fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let mut reader = SerializedFileReader::new(test_file)?;
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}

#[test]
fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
let metadata = origin_reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let mid = get_midpoint_offset(metadata.row_group(0));

let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);

let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}

#[test]
fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
let metadata = origin_reader.metadata();
let mid = get_midpoint_offset(metadata.row_group(0));

// true, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| true))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);

// test filtering out all row groups
reader.filter_row_groups(&|_, _| false);
// true, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| true))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);

// false, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);

// false, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}
}