Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,12 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// (reading) Whether or not to enable the caching of embedded metadata of Parquet files
/// (footer and page metadata). Enabling it can offer substantial performance improvements
/// for repeated queries over large files. By default, the cache is automatically
/// invalidated when the underlying file is modified.
pub cache_metadata: bool, default = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, I think it would be better to have this be a size setting metadata_cache_size as then that can represent both disabled (0 size) and a memory cap.

We can do this in a follow on PR

Copy link
Contributor

@alamb alamb Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl ParquetOptions {
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
cache_metadata: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -522,6 +523,7 @@ mod tests {
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
cache_metadata: defaults.cache_metadata,
}
}

Expand Down Expand Up @@ -634,6 +636,7 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
cache_metadata: global_options_defaults.cache_metadata,
},
column_specific_options,
key_value_metadata,
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Properties for decryption of Parquet files that use modular encryption
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
/// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and
/// page metadata). Enabling it can offer substantial performance improvements for repeated
/// queries over large files. By default, the cache is automatically invalidated when the
/// underlying file is modified.
pub cache_metadata: Option<bool>,
}

impl Default for ParquetReadOptions<'_> {
Expand All @@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> {
schema: None,
file_sort_order: vec![],
file_decryption_properties: None,
cache_metadata: None,
}
}
}
Expand Down Expand Up @@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> {
self.file_decryption_properties = Some(file_decryption_properties);
self
}

/// Specify whether to enable or not metadata caching
pub fn cache_metadata(mut self, cache_metadata: bool) -> Self {
self.cache_metadata = Some(cache_metadata);
self
}
}

/// Options that control the reading of ARROW files.
Expand Down Expand Up @@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
if let Some(file_decryption_properties) = &self.file_decryption_properties {
options.crypto.file_decryption = Some(file_decryption_properties.clone());
}
if let Some(cache_metadata) = self.cache_metadata {
options.global.cache_metadata = cache_metadata;
}
let mut file_format = ParquetFormat::new().with_options(options);

if let Some(parquet_pruning) = self.parquet_pruning {
Expand Down
18 changes: 17 additions & 1 deletion datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;

use crate::reader::CachedParquetFileReaderFactory;
use crate::source::{parse_coerce_int96_string, ParquetSource};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat {

async fn create_physical_plan(
&self,
_state: &dyn Session,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut metadata_size_hint = None;
Expand All @@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat {

let mut source = ParquetSource::new(self.options.clone());

// Use the CachedParquetFileReaderFactory when metadata caching is enabled
if self.options.global.cache_metadata {
if let Some(metadata_cache) =
state.runtime_env().cache_manager.get_file_metadata_cache()
{
let store = state
.runtime_env()
.object_store(conf.object_store_url.clone())?;
let cached_parquet_read_factory =
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
source =
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
}
}

if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
}
Expand Down
141 changes: 140 additions & 1 deletion datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
use crate::ParquetFileMetrics;
use bytes::Bytes;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -150,3 +153,139 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
}))
}
}

/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page
/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data.
/// This reader always loads the entire metadata (including page index, unless the file is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// encrypted), even if not required by the current query, to ensure it is always available for
/// those that need it.
#[derive(Debug)]
pub struct CachedParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
metadata_cache: Arc<dyn FileMetadataCache>,
}

impl CachedParquetFileReaderFactory {
pub fn new(
store: Arc<dyn ObjectStore>,
metadata_cache: Arc<dyn FileMetadataCache>,
) -> Self {
Self {
store,
metadata_cache,
}
}
}

impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
);
let store = Arc::clone(&self.store);

let mut inner =
ParquetObjectReader::new(store, file_meta.object_meta.location.clone())
.with_file_size(file_meta.object_meta.size);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
};

Ok(Box::new(CachedParquetFileReader {
inner,
file_metrics,
file_meta,
metadata_cache: Arc::clone(&self.metadata_cache),
}))
}
}

/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata
/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
/// updates the cache.
pub(crate) struct CachedParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
file_meta: FileMeta,
metadata_cache: Arc<dyn FileMetadataCache>,
}

impl AsyncFileReader for CachedParquetFileReader {
fn get_bytes(
&mut self,
range: Range<u64>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let bytes_scanned = range.end - range.start;
self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
self.inner.get_bytes(range)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
where
Self: Send,
{
let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
self.file_metrics.bytes_scanned.add(total as usize);
self.inner.get_byte_ranges(ranges)
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let file_meta = self.file_meta.clone();
let metadata_cache = Arc::clone(&self.metadata_cache);

async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is impressive that you worked out this API dance -- it is something I really don't like about the current API of the parquet reader.

BTW I am working on improving it (no changes needed or suggested here, I am just self-promoting):

let object_meta = &file_meta.object_meta;

// lookup if the metadata is already cached
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two workers call this at the same time for the same file, they will both independently read the metadata and then update the cache. There should be no problem with this, but pointing it out just in case.

if let Some(metadata) = metadata_cache.get(object_meta) {
if let Some(parquet_metadata) =
metadata.as_any().downcast_ref::<CachedParquetMetaData>()
{
return Ok(Arc::clone(&parquet_metadata.0));
}
}

let mut reader = ParquetMetaDataReader::new();
// the page index can only be loaded with unencrypted files
if let Some(file_decryption_properties) =
options.and_then(|o| o.file_decryption_properties())
{
reader =
reader.with_decryption_properties(Some(file_decryption_properties));
} else {
reader = reader.with_page_indexes(true);
}
reader.try_load(&mut self.inner, object_meta.size).await?;
let metadata = Arc::new(reader.finish()?);
let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata)));

metadata_cache.put(object_meta, cached_metadata);
Ok(metadata)
}
.boxed()
}
}

/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
struct CachedParquetMetaData(Arc<ParquetMetaData>);

impl FileMetadata for CachedParquetMetaData {
fn as_any(&self) -> &dyn Any {
self
}
}
46 changes: 46 additions & 0 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand All @@ -32,6 +34,19 @@ pub type FileStatisticsCache =
pub type ListFilesCache =
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;

/// Represents generic file-embedded metadata.
pub trait FileMetadata: Any + Send + Sync {
/// Returns the file metadata as [`Any`] so that it can be downcasted to a specific
/// implementation.
fn as_any(&self) -> &dyn Any;
}

/// Cache to store file-embedded metadata.
pub trait FileMetadataCache:
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
{
}

impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
Expand All @@ -44,10 +59,17 @@ impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
}
}

impl Debug for dyn FileMetadataCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}

#[derive(Default, Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
}

impl CacheManager {
Expand All @@ -59,6 +81,13 @@ impl CacheManager {
if let Some(lc) = &config.list_files_cache {
manager.list_files_cache = Some(Arc::clone(lc))
}
if let Some(mc) = &config.file_metadata_cache {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the file_metadata_cache is assigned to DefaultFilesMetadataCache if not provided. This makes it easier to enable metadata caching using just ParquetReadOptions or set datafusion.execution.parquet.cache_metadata = true.

manager.file_metadata_cache = Some(Arc::clone(mc));
} else {
manager.file_metadata_cache =
Some(Arc::new(DefaultFilesMetadataCache::default()));
}

Ok(Arc::new(manager))
}

Expand All @@ -71,6 +100,11 @@ impl CacheManager {
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
self.list_files_cache.clone()
}

/// Get the file embedded metadata cache.
pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> {
self.file_metadata_cache.clone()
}
}

#[derive(Clone, Default)]
Expand All @@ -86,6 +120,10 @@ pub struct CacheManagerConfig {
/// location.
/// Default is disable.
pub list_files_cache: Option<ListFilesCache>,
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
/// data file (e.g., Parquet footer and page metadata).
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
}

impl CacheManagerConfig {
Expand All @@ -101,4 +139,12 @@ impl CacheManagerConfig {
self.list_files_cache = cache;
self
}

pub fn with_file_metadata_cache(
mut self,
cache: Option<Arc<dyn FileMetadataCache>>,
) -> Self {
self.file_metadata_cache = cache;
self
}
}
Loading