Skip to content

Commit cc2e1ec

Browse files
committed
add plumbing for column skipping
1 parent b367562 commit cc2e1ec

File tree

1 file changed

+44
-6
lines changed
  • parquet/src/file/metadata/thrift

1 file changed

+44
-6
lines changed

parquet/src/file/metadata/thrift/mod.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ fn read_column_chunk<'a>(
585585
fn read_row_group(
586586
prot: &mut ThriftSliceInputProtocol,
587587
schema_descr: &Arc<SchemaDescriptor>,
588+
index: Option<MetadataIndexSlice>,
588589
) -> Result<RowGroupMetaData> {
589590
// create default initialized RowGroupMetaData
590591
let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
@@ -622,9 +623,22 @@ fn read_row_group(
622623
list_ident.size
623624
));
624625
}
625-
for i in 0..list_ident.size as usize {
626-
let col = read_column_chunk(prot, &schema_descr.columns()[i])?;
627-
row_group.columns.push(col);
626+
if let Some(meta_idx) = index.as_ref() {
627+
for i in 0..list_ident.size as usize {
628+
let col_len = (meta_idx.col_chunk_offsets[i + 1]
629+
- meta_idx.col_chunk_offsets[i])
630+
as usize;
631+
let col_bytes = &prot.as_slice()[..col_len];
632+
let mut col_prot = ThriftSliceInputProtocol::new(col_bytes);
633+
let col = read_column_chunk(&mut col_prot, &schema_descr.columns()[i])?;
634+
row_group.columns.push(col);
635+
prot.skip_bytes(col_len)?;
636+
}
637+
} else {
638+
for i in 0..list_ident.size as usize {
639+
let col = read_column_chunk(prot, &schema_descr.columns()[i])?;
640+
row_group.columns.push(col);
641+
}
628642
}
629643
mask |= RG_COLUMNS;
630644
}
@@ -705,6 +719,29 @@ pub(crate) fn get_metadata_index(buf: &[u8]) -> Result<Option<MetaIndex>> {
705719
Ok(Some(idx))
706720
}
707721

722+
#[allow(dead_code)]
723+
struct MetadataIndexSlice<'a> {
724+
col_chunk_offsets: &'a [i64],
725+
col_meta_lengths: &'a [i64],
726+
}
727+
728+
impl<'a> MetadataIndexSlice<'a> {
729+
fn new(index: &'a MetaIndex, rg_idx: usize, schema_descr: &Arc<SchemaDescriptor>) -> Self {
730+
let num_cols = schema_descr.num_columns();
731+
let start = rg_idx * (num_cols + 1);
732+
let end = start + num_cols + 1;
733+
let col_chunk_offsets = &index.column_offsets[start..end];
734+
let start = rg_idx * num_cols;
735+
let end = start + num_cols;
736+
let col_meta_lengths = &index.column_meta_lengths[start..end];
737+
738+
Self {
739+
col_chunk_offsets,
740+
col_meta_lengths,
741+
}
742+
}
743+
}
744+
708745
/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
709746
/// the Parquet footer. Page indexes will need to be added later.
710747
pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result<ParquetMetaData> {
@@ -792,17 +829,18 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result<ParquetMetaData>
792829

793830
if let Some(meta_idx) = index.as_ref() {
794831
for i in 0..list_ident.size as usize {
832+
let slice = MetadataIndexSlice::new(&meta_idx, i, schema_descr);
795833
let rg_len = (meta_idx.row_group_offsets[i + 1]
796834
- meta_idx.row_group_offsets[i])
797835
as usize;
798836
let rg_bytes = &prot.as_slice()[..rg_len];
799837
let mut rg_prot = ThriftSliceInputProtocol::new(rg_bytes);
800-
rg_vec.push(read_row_group(&mut rg_prot, schema_descr)?);
838+
rg_vec.push(read_row_group(&mut rg_prot, schema_descr, Some(slice))?);
801839
prot.skip_bytes(rg_len)?;
802840
}
803841
} else {
804842
for _ in 0..list_ident.size {
805-
rg_vec.push(read_row_group(&mut prot, schema_descr)?);
843+
rg_vec.push(read_row_group(&mut prot, schema_descr, None)?);
806844
}
807845
}
808846
row_groups = Some(rg_vec);
@@ -1767,7 +1805,7 @@ pub(crate) mod tests {
17671805
schema_descr: Arc<SchemaDescriptor>,
17681806
) -> Result<RowGroupMetaData> {
17691807
let mut reader = ThriftSliceInputProtocol::new(buf);
1770-
crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr)
1808+
crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
17711809
}
17721810

17731811
pub(crate) fn read_column_chunk(

0 commit comments

Comments
 (0)