Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert some panics that happen on invalid parquet files to error results #6738

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl InMemoryRowGroup {
let mut vs = std::mem::take(&mut self.column_chunks);
for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
if self.mask.leaf_included(leaf_idx) {
let (start, len) = meta.byte_range();
let (start, len) = meta.byte_range()?;
let data = reader
.get_bytes(start as usize..(start + len) as usize)
.await?;
Expand Down
25 changes: 14 additions & 11 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,11 @@ impl<'a> InMemoryRowGroup<'a> {
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
})
.flat_map(|(idx, (_chunk, chunk_meta))| {
.flat_map(|(idx, (_chunk, chunk_meta))| -> Result<Vec<Range<usize>>> {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
let (start, _len) = chunk_meta.byte_range()?;
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
Expand All @@ -760,8 +760,11 @@ impl<'a> InMemoryRowGroup<'a> {
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
Ok(ranges)
})
.collect::<Vec<_>>()
.into_iter()
.flatten()
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
Expand All @@ -779,25 +782,25 @@ impl<'a> InMemoryRowGroup<'a> {
}

*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
length: self.metadata.column(idx).byte_range()?.1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
}))
}
}
} else {
let fetch_ranges = self
let fetch_ranges: Result<Vec<Range<usize>>> = self
.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
let (start, length) = column.byte_range()?;
Ok(start as usize..(start + length) as usize)
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut chunk_data = input.get_byte_ranges(fetch_ranges?).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
Expand All @@ -806,7 +809,7 @@ impl<'a> InMemoryRowGroup<'a> {

if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: self.metadata.column(idx).byte_range().0 as usize,
offset: self.metadata.column(idx).byte_range()?.0 as usize,
data,
}));
}
Expand Down Expand Up @@ -1008,8 +1011,8 @@ mod tests {
assert_eq!(async_batches, sync_batches);

let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range().unwrap();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range().unwrap();

assert_eq!(
&requests[..],
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Common Parquet errors and macros.

use core::num::TryFromIntError;
use std::error::Error;
use std::{cell, io, result, str};

Expand Down Expand Up @@ -76,6 +77,12 @@ impl Error for ParquetError {
}
}

impl From<TryFromIntError> for ParquetError {
fn from(e: TryFromIntError) -> ParquetError {
ParquetError::General(format!("Integer overflow: {e}"))
}
}

impl From<io::Error> for ParquetError {
fn from(e: io::Error) -> ParquetError {
ParquetError::External(Box::new(e))
Expand Down
13 changes: 7 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,17 +959,18 @@ impl ColumnChunkMetaData {
}

/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
pub fn byte_range(&self) -> Result<(u64, u64)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking API change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah..is it fine given that it just wraps the return value within Result? The behavior change is just "panic --> error".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is a breaking change

let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
if col_start < 0 || col_len < 0 {
return Err(general_err!(
"column start and length should not be negative"
));
}
Ok((col_start as u64, col_len as u64))
}

/// Returns statistics that are set for this column chunk,
Expand Down
26 changes: 13 additions & 13 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ impl ParquetMetaDataReader {
for rg in t_file_metadata.row_groups {
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
let column_orders =
Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;

let file_metadata = FileMetaData::new(
t_file_metadata.version,
Expand All @@ -635,15 +636,13 @@ impl ParquetMetaDataReader {
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
) -> Result<Option<Vec<ColumnOrder>>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
if orders.len() != schema_descr.num_columns() {
return Err(general_err!("Column order length mismatch"));
};
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
Expand All @@ -657,9 +656,9 @@ impl ParquetMetaDataReader {
}
}
}
Some(res)
Ok(Some(res))
}
None => None,
None => Ok(None),
}
}
}
Expand Down Expand Up @@ -731,7 +730,7 @@ mod tests {
]);

assert_eq!(
ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr),
ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
Some(vec![
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
Expand All @@ -740,20 +739,21 @@ mod tests {

// Test when no column orders are defined.
assert_eq!(
ParquetMetaDataReader::parse_column_orders(None, &schema_descr),
ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
None
);
}

#[test]
#[should_panic(expected = "Column order length mismatch")]
fn test_metadata_column_orders_len_mismatch() {
let schema = SchemaType::group_type_builder("schema").build().unwrap();
let schema_descr = SchemaDescriptor::new(Arc::new(schema));

let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);

ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
assert!(res.is_err());
assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
}

#[test]
Expand Down
55 changes: 47 additions & 8 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ pub(crate) fn decode_page(
let is_sorted = dict_header.is_sorted.unwrap_or(false);
Page::DictionaryPage {
buf: buffer,
num_values: dict_header.num_values as u32,
num_values: dict_header.num_values.try_into()?,
encoding: Encoding::try_from(dict_header.encoding)?,
is_sorted,
}
Expand All @@ -446,7 +446,7 @@ pub(crate) fn decode_page(
.ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
Page::DataPage {
buf: buffer,
num_values: header.num_values as u32,
num_values: header.num_values.try_into()?,
encoding: Encoding::try_from(header.encoding)?,
def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
Expand All @@ -460,12 +460,12 @@ pub(crate) fn decode_page(
let is_compressed = header.is_compressed.unwrap_or(true);
Page::DataPageV2 {
buf: buffer,
num_values: header.num_values as u32,
num_values: header.num_values.try_into()?,
encoding: Encoding::try_from(header.encoding)?,
num_nulls: header.num_nulls as u32,
num_rows: header.num_rows as u32,
def_levels_byte_len: header.definition_levels_byte_length as u32,
rep_levels_byte_len: header.repetition_levels_byte_length as u32,
num_nulls: header.num_nulls.try_into()?,
num_rows: header.num_rows.try_into()?,
def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
is_compressed,
statistics: statistics::from_thrift(physical_type, header.statistics)?,
}
Expand Down Expand Up @@ -535,7 +535,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
props: ReaderPropertiesPtr,
) -> Result<Self> {
let decompressor = create_codec(meta.compression(), props.codec_options())?;
let (start, len) = meta.byte_range();
let (start, len) = meta.byte_range()?;

let state = match page_locations {
Some(locations) => {
Expand Down Expand Up @@ -578,6 +578,27 @@ impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
}
}

fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
if header_len > remaining_bytes {
return Err(eof_err!("Invalid page header"));
}
Ok(())
}

fn verify_page_size(
compressed_size: i32,
uncompressed_size: i32,
remaining_bytes: usize,
) -> Result<()> {
// The page's compressed size should not exceed the remaining bytes that are
// available to read. The page's uncompressed size is the expected size
// after decompression, which can never be negative.
if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
return Err(eof_err!("Invalid page header"));
}
Ok(())
}

impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
loop {
Expand All @@ -596,10 +617,16 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*header
} else {
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining)?;
*offset += header_len;
*remaining -= header_len;
header
};
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining,
)?;
let data_len = header.compressed_page_size as usize;
*offset += data_len;
*remaining -= data_len;
Expand Down Expand Up @@ -683,6 +710,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining_bytes)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Expand Down Expand Up @@ -733,12 +761,23 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
next_page_header,
} => {
if let Some(buffered_header) = next_page_header.take() {
verify_page_size(
buffered_header.compressed_page_size,
buffered_header.uncompressed_page_size,
*remaining_bytes,
)?;
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining_bytes)?;
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining_bytes,
)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
Expand Down
26 changes: 26 additions & 0 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,32 @@ pub fn from_thrift(
stats.max_value
};

fn check_len(min: &Option<Vec<u8>>, max: &Option<Vec<u8>>, len: usize) -> Result<()> {
if let Some(min) = min {
if min.len() < len {
return Err(ParquetError::General(
"Insufficient bytes to parse max statistic".to_string(),
));
}
}
if let Some(max) = max {
if max.len() < len {
return Err(ParquetError::General(
"Insufficient bytes to parse max statistic".to_string(),
));
}
}
Ok(())
}

match physical_type {
Type::BOOLEAN => check_len(&min, &max, 1),
Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
Type::INT96 => check_len(&min, &max, 12),
_ => Ok(()),
}?;

// Values are encoded using PLAIN encoding definition, except that
// variable-length byte arrays do not include a length prefix.
//
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/format.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,10 @@ pub fn from_thrift(elements: &[SchemaElement]) -> Result<TypePtr> {
));
}

if !schema_nodes[0].is_group() {
return Err(general_err!("Expected root node to be a group type"));
}

Ok(schema_nodes.remove(0))
}

Expand Down Expand Up @@ -1227,6 +1231,10 @@ fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize
if !is_root_node {
builder = builder.with_repetition(rep);
}
} else if !is_root_node {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is based on the comment at line 1230 which says All other types must have one, and the assert at line 1066: assert!(tp.get_basic_info.()has_repetitio())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern with this check is unless it is necessary for correctness, there is potential adding it breaks something for someone. Parquet is a very broad ecosystem, lots of writers have interesting interpretations of the specification

return Err(general_err!(
"Repetition level must be defined for non-root types"
));
}
Ok((next_index, Arc::new(builder.build().unwrap())))
}
Expand Down
Loading
Loading