-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Cache Parquet metadata in built in parquet reader #16971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,12 +21,15 @@ | |
| use crate::ParquetFileMetrics; | ||
| use bytes::Bytes; | ||
| use datafusion_datasource::file_meta::FileMeta; | ||
| use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; | ||
| use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; | ||
| use futures::future::BoxFuture; | ||
| use futures::FutureExt; | ||
| use object_store::ObjectStore; | ||
| use parquet::arrow::arrow_reader::ArrowReaderOptions; | ||
| use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; | ||
| use parquet::file::metadata::ParquetMetaData; | ||
| use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; | ||
| use std::any::Any; | ||
| use std::fmt::Debug; | ||
| use std::ops::Range; | ||
| use std::sync::Arc; | ||
|
|
@@ -150,3 +153,139 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { | |
| })) | ||
| } | ||
| } | ||
|
|
||
| /// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page | ||
| /// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. | ||
| /// This reader always loads the entire metadata (including page index, unless the file is | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| /// encrypted), even if not required by the current query, to ensure it is always available for | ||
| /// those that need it. | ||
| #[derive(Debug)] | ||
| pub struct CachedParquetFileReaderFactory { | ||
| store: Arc<dyn ObjectStore>, | ||
| metadata_cache: Arc<dyn FileMetadataCache>, | ||
| } | ||
|
|
||
| impl CachedParquetFileReaderFactory { | ||
| pub fn new( | ||
| store: Arc<dyn ObjectStore>, | ||
| metadata_cache: Arc<dyn FileMetadataCache>, | ||
| ) -> Self { | ||
| Self { | ||
| store, | ||
| metadata_cache, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { | ||
| fn create_reader( | ||
| &self, | ||
| partition_index: usize, | ||
| file_meta: FileMeta, | ||
| metadata_size_hint: Option<usize>, | ||
| metrics: &ExecutionPlanMetricsSet, | ||
| ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> { | ||
| let file_metrics = ParquetFileMetrics::new( | ||
| partition_index, | ||
| file_meta.location().as_ref(), | ||
| metrics, | ||
| ); | ||
| let store = Arc::clone(&self.store); | ||
|
|
||
| let mut inner = | ||
| ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) | ||
| .with_file_size(file_meta.object_meta.size); | ||
|
|
||
| if let Some(hint) = metadata_size_hint { | ||
| inner = inner.with_footer_size_hint(hint) | ||
| }; | ||
|
|
||
| Ok(Box::new(CachedParquetFileReader { | ||
| inner, | ||
| file_metrics, | ||
| file_meta, | ||
| metadata_cache: Arc::clone(&self.metadata_cache), | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| /// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata | ||
| /// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then | ||
| /// updates the cache. | ||
| pub(crate) struct CachedParquetFileReader { | ||
| pub file_metrics: ParquetFileMetrics, | ||
| pub inner: ParquetObjectReader, | ||
| file_meta: FileMeta, | ||
| metadata_cache: Arc<dyn FileMetadataCache>, | ||
| } | ||
|
|
||
| impl AsyncFileReader for CachedParquetFileReader { | ||
| fn get_bytes( | ||
| &mut self, | ||
| range: Range<u64>, | ||
| ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { | ||
| let bytes_scanned = range.end - range.start; | ||
| self.file_metrics.bytes_scanned.add(bytes_scanned as usize); | ||
| self.inner.get_bytes(range) | ||
| } | ||
|
|
||
| fn get_byte_ranges( | ||
| &mut self, | ||
| ranges: Vec<Range<u64>>, | ||
| ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> | ||
| where | ||
| Self: Send, | ||
| { | ||
| let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); | ||
| self.file_metrics.bytes_scanned.add(total as usize); | ||
| self.inner.get_byte_ranges(ranges) | ||
| } | ||
|
|
||
| fn get_metadata<'a>( | ||
| &'a mut self, | ||
| options: Option<&'a ArrowReaderOptions>, | ||
| ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> { | ||
| let file_meta = self.file_meta.clone(); | ||
| let metadata_cache = Arc::clone(&self.metadata_cache); | ||
|
|
||
| async move { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is impressive that you worked out this API dance -- it is something I really don't like about the current API of the parquet reader. BTW I am working on improving it (no changes needed or suggested here, I am just self-promoting): |
||
| let object_meta = &file_meta.object_meta; | ||
|
|
||
| // lookup if the metadata is already cached | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If two workers call this at the same time for the same file, they will both independently read the metadata and then update the cache. There should be no problem with this, but pointing it out just in case. |
||
| if let Some(metadata) = metadata_cache.get(object_meta) { | ||
| if let Some(parquet_metadata) = | ||
| metadata.as_any().downcast_ref::<CachedParquetMetaData>() | ||
| { | ||
| return Ok(Arc::clone(&parquet_metadata.0)); | ||
| } | ||
| } | ||
|
|
||
| let mut reader = ParquetMetaDataReader::new(); | ||
| // the page index can only be loaded with unencrypted files | ||
| if let Some(file_decryption_properties) = | ||
| options.and_then(|o| o.file_decryption_properties()) | ||
| { | ||
| reader = | ||
| reader.with_decryption_properties(Some(file_decryption_properties)); | ||
| } else { | ||
| reader = reader.with_page_indexes(true); | ||
| } | ||
| reader.try_load(&mut self.inner, object_meta.size).await?; | ||
| let metadata = Arc::new(reader.finish()?); | ||
| let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); | ||
|
|
||
| metadata_cache.put(object_meta, cached_metadata); | ||
| Ok(metadata) | ||
| } | ||
| .boxed() | ||
| } | ||
| } | ||
|
|
||
| /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. | ||
| struct CachedParquetMetaData(Arc<ParquetMetaData>); | ||
|
|
||
| impl FileMetadata for CachedParquetMetaData { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,10 +15,12 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use crate::cache::cache_unit::DefaultFilesMetadataCache; | ||
| use crate::cache::CacheAccessor; | ||
| use datafusion_common::{Result, Statistics}; | ||
| use object_store::path::Path; | ||
| use object_store::ObjectMeta; | ||
| use std::any::Any; | ||
| use std::fmt::{Debug, Formatter}; | ||
| use std::sync::Arc; | ||
|
|
||
|
|
@@ -32,6 +34,19 @@ pub type FileStatisticsCache = | |
| pub type ListFilesCache = | ||
| Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>; | ||
|
|
||
| /// Represents generic file-embedded metadata. | ||
| pub trait FileMetadata: Any + Send + Sync { | ||
| /// Returns the file metadata as [`Any`] so that it can be downcasted to a specific | ||
| /// implementation. | ||
| fn as_any(&self) -> &dyn Any; | ||
| } | ||
|
|
||
| /// Cache to store file-embedded metadata. | ||
| pub trait FileMetadataCache: | ||
| CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta> | ||
| { | ||
| } | ||
|
|
||
| impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> { | ||
| fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
| write!(f, "Cache name: {} with length: {}", self.name(), self.len()) | ||
|
|
@@ -44,10 +59,17 @@ impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> | |
| } | ||
| } | ||
|
|
||
| impl Debug for dyn FileMetadataCache { | ||
| fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
| write!(f, "Cache name: {} with length: {}", self.name(), self.len()) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Default, Debug)] | ||
| pub struct CacheManager { | ||
| file_statistic_cache: Option<FileStatisticsCache>, | ||
| list_files_cache: Option<ListFilesCache>, | ||
| file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, | ||
| } | ||
|
|
||
| impl CacheManager { | ||
|
|
@@ -59,6 +81,13 @@ impl CacheManager { | |
| if let Some(lc) = &config.list_files_cache { | ||
| manager.list_files_cache = Some(Arc::clone(lc)) | ||
| } | ||
| if let Some(mc) = &config.file_metadata_cache { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here the |
||
| manager.file_metadata_cache = Some(Arc::clone(mc)); | ||
| } else { | ||
| manager.file_metadata_cache = | ||
| Some(Arc::new(DefaultFilesMetadataCache::default())); | ||
| } | ||
|
|
||
| Ok(Arc::new(manager)) | ||
| } | ||
|
|
||
|
|
@@ -71,6 +100,11 @@ impl CacheManager { | |
| pub fn get_list_files_cache(&self) -> Option<ListFilesCache> { | ||
| self.list_files_cache.clone() | ||
| } | ||
|
|
||
| /// Get the file embedded metadata cache. | ||
| pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> { | ||
| self.file_metadata_cache.clone() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone, Default)] | ||
|
|
@@ -86,6 +120,10 @@ pub struct CacheManagerConfig { | |
| /// location. | ||
| /// Default is disable. | ||
| pub list_files_cache: Option<ListFilesCache>, | ||
| /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a | ||
| /// data file (e.g., Parquet footer and page metadata). | ||
| /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. | ||
| pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, | ||
| } | ||
|
|
||
| impl CacheManagerConfig { | ||
|
|
@@ -101,4 +139,12 @@ impl CacheManagerConfig { | |
| self.list_files_cache = cache; | ||
| self | ||
| } | ||
|
|
||
| pub fn with_file_metadata_cache( | ||
| mut self, | ||
| cache: Option<Arc<dyn FileMetadataCache>>, | ||
| ) -> Self { | ||
| self.file_metadata_cache = cache; | ||
| self | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, I think it would be better to have this be a size setting
metadata_cache_sizeas then that can represent both disabled (0size) and a memory cap.We can do this in a follow on PR
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.