Skip to content

Commit c1dbf29

Browse files
committed
Encapsulate encryption code more in readers
1 parent a01886d commit c1dbf29

File tree

3 files changed

+68
-62
lines changed

3 files changed

+68
-62
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
3333
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
3434
use crate::column::page::{PageIterator, PageReader};
3535
#[cfg(feature = "encryption")]
36-
use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties};
36+
use crate::encryption::decrypt::FileDecryptionProperties;
3737
use crate::errors::{ParquetError, Result};
3838
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3939
use crate::file::reader::{ChunkReader, SerializedPageReader};
@@ -682,13 +682,11 @@ struct ReaderPageIterator<T: ChunkReader> {
682682
metadata: Arc<ParquetMetaData>,
683683
}
684684

685-
impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
686-
type Item = Result<Box<dyn PageReader>>;
687-
688-
fn next(&mut self) -> Option<Self::Item> {
689-
let rg_idx = self.row_groups.next()?;
685+
impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
686+
/// Return the next SerializedPageReader
687+
fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
690688
let rg = self.metadata.row_group(rg_idx);
691-
let meta = rg.column(self.column_idx);
689+
let column_chunk_metadata = rg.column(self.column_idx);
692690
let offset_index = self.metadata.offset_index();
693691
// `offset_index` may not exist and `i[rg_idx]` will be empty.
694692
// To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
@@ -698,32 +696,25 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
698696
let total_rows = rg.num_rows() as usize;
699697
let reader = self.reader.clone();
700698

701-
#[cfg(feature = "encryption")]
702-
let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() {
703-
match meta.crypto_metadata() {
704-
Some(crypto_metadata) => {
705-
match CryptoContext::for_column(
706-
file_decryptor,
707-
crypto_metadata,
708-
rg_idx,
709-
self.column_idx,
710-
) {
711-
Ok(context) => Some(Arc::new(context)),
712-
Err(err) => return Some(Err(err)),
713-
}
714-
}
715-
None => None,
716-
}
717-
} else {
718-
None
719-
};
720-
721-
let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
699+
SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
700+
.add_crypto_context(
701+
rg_idx,
702+
self.column_idx,
703+
self.metadata.as_ref(),
704+
column_chunk_metadata,
705+
)
706+
}
707+
}
722708

723-
#[cfg(feature = "encryption")]
724-
let ret = ret.map(|reader| reader.with_crypto_context(crypto_context));
709+
impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
710+
type Item = Result<Box<dyn PageReader>>;
725711

726-
Some(ret.map(|x| Box::new(x) as _))
712+
fn next(&mut self) -> Option<Self::Item> {
713+
let rg_idx = self.row_groups.next()?;
714+
let page_reader = self
715+
.next_page_reader(rg_idx)
716+
.map(|page_reader| Box::new(page_reader) as _);
717+
Some(page_reader)
727718
}
728719
}
729720

parquet/src/arrow/async_reader/mod.rs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHas
5959
mod metadata;
6060
pub use metadata::*;
6161

62-
#[cfg(feature = "encryption")]
63-
use crate::encryption::decrypt::CryptoContext;
64-
6562
#[cfg(feature = "object_store")]
6663
mod store;
6764

@@ -1027,6 +1024,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
10271024
self.row_count
10281025
}
10291026

1027+
/// Return chunks for column i
10301028
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
10311029
match &self.column_chunks[i] {
10321030
None => Err(ParquetError::General(format!(
@@ -1038,31 +1036,19 @@ impl RowGroups for InMemoryRowGroup<'_> {
10381036
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
10391037
.filter(|index| !index.is_empty())
10401038
.map(|index| index[i].page_locations.clone());
1041-
let column_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1039+
let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
10421040
let page_reader = SerializedPageReader::new(
10431041
data.clone(),
1044-
column_metadata,
1042+
column_chunk_metadata,
10451043
self.row_count,
10461044
page_locations,
10471045
)?;
1048-
1049-
#[cfg(feature = "encryption")]
1050-
let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() {
1051-
match column_metadata.crypto_metadata() {
1052-
Some(crypto_metadata) => Some(Arc::new(CryptoContext::for_column(
1053-
file_decryptor,
1054-
crypto_metadata,
1055-
self.row_group_idx,
1056-
i,
1057-
)?)),
1058-
None => None,
1059-
}
1060-
} else {
1061-
None
1062-
};
1063-
1064-
#[cfg(feature = "encryption")]
1065-
let page_reader = page_reader.with_crypto_context(crypto_context);
1046+
let page_reader = page_reader.add_crypto_context(
1047+
self.row_group_idx,
1048+
i,
1049+
self.metadata,
1050+
column_chunk_metadata,
1051+
)?;
10661052

10671053
let page_reader: Box<dyn PageReader> = Box::new(page_reader);
10681054

parquet/src/file/serialized_reader.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -571,22 +571,51 @@ impl<R: ChunkReader> SerializedPageReader<R> {
571571
/// Creates a new serialized page reader from a chunk reader and metadata
572572
pub fn new(
573573
reader: Arc<R>,
574-
meta: &ColumnChunkMetaData,
574+
column_chunk_metadata: &ColumnChunkMetaData,
575575
total_rows: usize,
576576
page_locations: Option<Vec<PageLocation>>,
577577
) -> Result<Self> {
578578
let props = Arc::new(ReaderProperties::builder().build());
579-
SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props)
579+
SerializedPageReader::new_with_properties(
580+
reader,
581+
column_chunk_metadata,
582+
total_rows,
583+
page_locations,
584+
props,
585+
)
580586
}
581587

582-
/// Adds cryptographical information to the reader.
588+
/// Stub No-op implementation when encryption is disabled.
589+
#[cfg(all(feature = "arrow", not(feature = "encryption")))]
590+
pub(crate) fn add_crypto_context(
591+
self,
592+
_rg_idx: usize,
593+
_column_idx: usize,
594+
_parquet_meta_data: &ParquetMetaData,
595+
_column_chunk_metadata: &ColumnChunkMetaData,
596+
) -> Result<SerializedPageReader<R>> {
597+
Ok(self)
598+
}
599+
600+
/// Adds any necessary crypto context to this page reader, if encryption is enabled.
583601
#[cfg(feature = "encryption")]
584-
pub(crate) fn with_crypto_context(
602+
pub(crate) fn add_crypto_context(
585603
mut self,
586-
crypto_context: Option<Arc<CryptoContext>>,
587-
) -> Self {
588-
self.crypto_context = crypto_context;
589-
self
604+
rg_idx: usize,
605+
column_idx: usize,
606+
parquet_meta_data: &ParquetMetaData,
607+
column_chunk_metadata: &ColumnChunkMetaData,
608+
) -> Result<SerializedPageReader<R>> {
609+
let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
610+
return Ok(self);
611+
};
612+
let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
613+
return Ok(self);
614+
};
615+
let crypto_context =
616+
CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
617+
self.crypto_context = Some(Arc::new(crypto_context));
618+
Ok(self)
590619
}
591620

592621
/// Creates a new serialized page with custom options.

0 commit comments

Comments
 (0)