Skip to content

Commit 1452333

Browse files
committed
feat: Cache Parquet metadata
1 parent aab44fd commit 1452333

File tree

18 files changed

+480
-3
lines changed

18 files changed

+480
-3
lines changed

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,12 @@ config_namespace! {
549549
/// (reading) Use any available bloom filters when reading parquet files
550550
pub bloom_filter_on_read: bool, default = true
551551

552+
/// (reading) Whether or not to enable the caching of embedded metadata of Parquet files
553+
/// (footer and page metadata). Enabling it can offer substantial performance improvements
554+
/// for repeated queries over large files. By default, the cache is automatically
555+
/// invalidated when the underlying file is modified.
556+
pub cache_metadata: bool, default = false
557+
552558
// The following options affect writing to parquet files
553559
// and map to parquet::file::properties::WriterProperties
554560

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ impl ParquetOptions {
245245
binary_as_string: _, // not used for writer props
246246
coerce_int96: _, // not used for writer props
247247
skip_arrow_metadata: _,
248+
cache_metadata: _,
248249
} = self;
249250

250251
let mut builder = WriterProperties::builder()
@@ -522,6 +523,7 @@ mod tests {
522523
binary_as_string: defaults.binary_as_string,
523524
skip_arrow_metadata: defaults.skip_arrow_metadata,
524525
coerce_int96: None,
526+
cache_metadata: defaults.cache_metadata,
525527
}
526528
}
527529

@@ -634,6 +636,7 @@ mod tests {
634636
binary_as_string: global_options_defaults.binary_as_string,
635637
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
636638
coerce_int96: None,
639+
cache_metadata: global_options_defaults.cache_metadata,
637640
},
638641
column_specific_options,
639642
key_value_metadata,

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> {
254254
pub file_sort_order: Vec<Vec<SortExpr>>,
255255
/// Properties for decryption of Parquet files that use modular encryption
256256
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
257+
/// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and
258+
/// page metadata). Enabling it can offer substantial performance improvements for repeated
259+
/// queries over large files. By default, the cache is automatically invalidated when the
260+
/// underlying file is modified.
261+
pub cache_metadata: Option<bool>,
257262
}
258263

259264
impl Default for ParquetReadOptions<'_> {
@@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> {
266271
schema: None,
267272
file_sort_order: vec![],
268273
file_decryption_properties: None,
274+
cache_metadata: None,
269275
}
270276
}
271277
}
@@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> {
325331
self.file_decryption_properties = Some(file_decryption_properties);
326332
self
327333
}
334+
335+
/// Specify whether to enable or not metadata caching
336+
pub fn cache_metadata(mut self, cache_metadata: bool) -> Self {
337+
self.cache_metadata = Some(cache_metadata);
338+
self
339+
}
328340
}
329341

330342
/// Options that control the reading of ARROW files.
@@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
590602
if let Some(file_decryption_properties) = &self.file_decryption_properties {
591603
options.crypto.file_decryption = Some(file_decryption_properties.clone());
592604
}
605+
if let Some(cache_metadata) = self.cache_metadata {
606+
options.global.cache_metadata = cache_metadata;
607+
}
593608
let mut file_format = ParquetFormat::new().with_options(options);
594609

595610
if let Some(parquet_pruning) = self.parquet_pruning {

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator;
6363
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
6464
use datafusion_session::Session;
6565

66+
use crate::reader::CachedParquetFileReaderFactory;
6667
use crate::source::{parse_coerce_int96_string, ParquetSource};
6768
use async_trait::async_trait;
6869
use bytes::Bytes;
@@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat {
435436

436437
async fn create_physical_plan(
437438
&self,
438-
_state: &dyn Session,
439+
state: &dyn Session,
439440
conf: FileScanConfig,
440441
) -> Result<Arc<dyn ExecutionPlan>> {
441442
let mut metadata_size_hint = None;
@@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat {
446447

447448
let mut source = ParquetSource::new(self.options.clone());
448449

450+
// Use the CachedParquetFileReaderFactory when metadata caching is enabled
451+
if self.options.global.cache_metadata {
452+
if let Some(metadata_cache) =
453+
state.runtime_env().cache_manager.get_file_metadata_cache()
454+
{
455+
let store = state
456+
.runtime_env()
457+
.object_store(conf.object_store_url.clone())?;
458+
let cached_parquet_read_factory =
459+
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
460+
source =
461+
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
462+
}
463+
}
464+
449465
if let Some(metadata_size_hint) = metadata_size_hint {
450466
source = source.with_metadata_size_hint(metadata_size_hint)
451467
}

datafusion/datasource-parquet/src/reader.rs

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
use crate::ParquetFileMetrics;
2222
use bytes::Bytes;
2323
use datafusion_datasource::file_meta::FileMeta;
24+
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
2425
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
2526
use futures::future::BoxFuture;
27+
use futures::FutureExt;
2628
use object_store::ObjectStore;
2729
use parquet::arrow::arrow_reader::ArrowReaderOptions;
2830
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
29-
use parquet::file::metadata::ParquetMetaData;
31+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3032
use std::fmt::Debug;
3133
use std::ops::Range;
3234
use std::sync::Arc;
@@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
150152
}))
151153
}
152154
}
155+
156+
/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page
157+
/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data.
158+
/// This reader always loads the entire metadata (including page index, unless the file is
159+
/// encrypted), even if not required by the current query, to ensure it is always available for
160+
/// those that need it.
161+
#[derive(Debug)]
162+
pub struct CachedParquetFileReaderFactory {
163+
store: Arc<dyn ObjectStore>,
164+
metadata_cache: FileMetadataCache,
165+
}
166+
167+
impl CachedParquetFileReaderFactory {
168+
pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache) -> Self {
169+
Self {
170+
store,
171+
metadata_cache,
172+
}
173+
}
174+
}
175+
176+
impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
177+
fn create_reader(
178+
&self,
179+
partition_index: usize,
180+
file_meta: FileMeta,
181+
metadata_size_hint: Option<usize>,
182+
metrics: &ExecutionPlanMetricsSet,
183+
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
184+
let file_metrics = ParquetFileMetrics::new(
185+
partition_index,
186+
file_meta.location().as_ref(),
187+
metrics,
188+
);
189+
let store = Arc::clone(&self.store);
190+
191+
let mut inner =
192+
ParquetObjectReader::new(store, file_meta.object_meta.location.clone())
193+
.with_file_size(file_meta.object_meta.size);
194+
195+
if let Some(hint) = metadata_size_hint {
196+
inner = inner.with_footer_size_hint(hint)
197+
};
198+
199+
Ok(Box::new(CachedParquetFileReader {
200+
inner,
201+
file_metrics,
202+
file_meta,
203+
metadata_cache: Arc::clone(&self.metadata_cache),
204+
}))
205+
}
206+
}
207+
208+
/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata
209+
/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
210+
/// updates the cache.
211+
pub(crate) struct CachedParquetFileReader {
212+
pub file_metrics: ParquetFileMetrics,
213+
pub inner: ParquetObjectReader,
214+
file_meta: FileMeta,
215+
metadata_cache: FileMetadataCache,
216+
}
217+
218+
impl AsyncFileReader for CachedParquetFileReader {
219+
fn get_bytes(
220+
&mut self,
221+
range: Range<u64>,
222+
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
223+
let bytes_scanned = range.end - range.start;
224+
self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
225+
self.inner.get_bytes(range)
226+
}
227+
228+
fn get_byte_ranges(
229+
&mut self,
230+
ranges: Vec<Range<u64>>,
231+
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
232+
where
233+
Self: Send,
234+
{
235+
let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
236+
self.file_metrics.bytes_scanned.add(total as usize);
237+
self.inner.get_byte_ranges(ranges)
238+
}
239+
240+
fn get_metadata<'a>(
241+
&'a mut self,
242+
options: Option<&'a ArrowReaderOptions>,
243+
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
244+
let file_meta = self.file_meta.clone();
245+
let metadata_cache = Arc::clone(&self.metadata_cache);
246+
247+
async move {
248+
let object_meta = &file_meta.object_meta;
249+
250+
// lookup if the metadata is already cached
251+
if let Some(metadata) =
252+
metadata_cache.get_with_extra(&object_meta.location, object_meta)
253+
{
254+
if let Ok(parquet_metadata) = Arc::downcast::<ParquetMetaData>(metadata) {
255+
return Ok(Arc::clone(&parquet_metadata));
256+
}
257+
}
258+
259+
let mut reader = ParquetMetaDataReader::new();
260+
// the page index can only be loaded with unencrypted files
261+
if let Some(file_decryption_properties) =
262+
options.and_then(|o| o.file_decryption_properties())
263+
{
264+
reader =
265+
reader.with_decryption_properties(Some(file_decryption_properties));
266+
} else {
267+
reader = reader.with_page_indexes(true);
268+
}
269+
reader.try_load(&mut self.inner, object_meta.size).await?;
270+
let metadata = Arc::new(reader.finish()?);
271+
272+
metadata_cache.put_with_extra(
273+
&object_meta.location,
274+
Arc::clone(&metadata) as Arc<FileMetadata>,
275+
object_meta,
276+
);
277+
Ok(metadata)
278+
}
279+
.boxed()
280+
}
281+
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::cache::cache_unit::DefaultFilesMetadataCache;
1819
use crate::cache::CacheAccessor;
1920
use datafusion_common::{Result, Statistics};
2021
use object_store::path::Path;
2122
use object_store::ObjectMeta;
23+
use std::any::Any;
2224
use std::fmt::{Debug, Formatter};
2325
use std::sync::Arc;
2426

@@ -32,6 +34,13 @@ pub type FileStatisticsCache =
3234
pub type ListFilesCache =
3335
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
3436

37+
/// Represents generic file-embedded metadata.
38+
pub type FileMetadata = dyn Any + Send + Sync;
39+
40+
/// Cache to store file-embedded metadata.
41+
pub type FileMetadataCache =
42+
Arc<dyn CacheAccessor<Path, Arc<FileMetadata>, Extra = ObjectMeta>>;
43+
3544
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
3645
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3746
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
@@ -44,10 +53,17 @@ impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
4453
}
4554
}
4655

56+
impl Debug for dyn CacheAccessor<Path, Arc<FileMetadata>, Extra = ObjectMeta> {
57+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58+
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
59+
}
60+
}
61+
4762
#[derive(Default, Debug)]
4863
pub struct CacheManager {
4964
file_statistic_cache: Option<FileStatisticsCache>,
5065
list_files_cache: Option<ListFilesCache>,
66+
file_metadata_cache: Option<FileMetadataCache>,
5167
}
5268

5369
impl CacheManager {
@@ -59,6 +75,13 @@ impl CacheManager {
5975
if let Some(lc) = &config.list_files_cache {
6076
manager.list_files_cache = Some(Arc::clone(lc))
6177
}
78+
if let Some(mc) = &config.file_metadata_cache {
79+
manager.file_metadata_cache = Some(Arc::clone(mc));
80+
} else {
81+
manager.file_metadata_cache =
82+
Some(Arc::new(DefaultFilesMetadataCache::default()));
83+
}
84+
6285
Ok(Arc::new(manager))
6386
}
6487

@@ -71,6 +94,11 @@ impl CacheManager {
7194
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
7295
self.list_files_cache.clone()
7396
}
97+
98+
/// Get the file embedded metadata cache.
99+
pub fn get_file_metadata_cache(&self) -> Option<FileMetadataCache> {
100+
self.file_metadata_cache.clone()
101+
}
74102
}
75103

76104
#[derive(Clone, Default)]
@@ -86,6 +114,10 @@ pub struct CacheManagerConfig {
86114
/// location.
87115
/// Default is disable.
88116
pub list_files_cache: Option<ListFilesCache>,
117+
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
118+
/// data file (e.g., Parquet footer and page metadata).
119+
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
120+
pub file_metadata_cache: Option<FileMetadataCache>,
89121
}
90122

91123
impl CacheManagerConfig {
@@ -101,4 +133,9 @@ impl CacheManagerConfig {
101133
self.list_files_cache = cache;
102134
self
103135
}
136+
137+
pub fn with_file_metadata_cache(mut self, cache: Option<FileMetadataCache>) -> Self {
138+
self.file_metadata_cache = cache;
139+
self
140+
}
104141
}

0 commit comments

Comments
 (0)