-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Cache Parquet metadata in built in parquet reader #16971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
nuno-faria
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @alamb
| if let Some(lc) = &config.list_files_cache { | ||
| manager.list_files_cache = Some(Arc::clone(lc)) | ||
| } | ||
| if let Some(mc) = &config.file_metadata_cache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the file_metadata_cache is assigned to DefaultFilesMetadataCache if not provided. This makes it easier to enable metadata caching using just ParquetReadOptions or set datafusion.execution.parquet.cache_metadata = true.
| async move { | ||
| let object_meta = &file_meta.object_meta; | ||
|
|
||
| // lookup if the metadata is already cached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If two workers call this at the same time for the same file, they will both independently read the metadata and then update the cache. There should be no problem with this, but pointing it out just in case.
| | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | ||
| | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | ||
| | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | ||
| | datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metadata cache is automatically invalidated when the file changes, but is never cleared. Is there a problem with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it is a problem if we wanted to turn this feature on by default, which I do. However, i don't think we need to make any changes in this particular PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed an issue to add a limit to the cache here:
|
Thank you @nuno-faria - I plan to review this tomorrow |
|
🤖 |
|
🤖: Benchmark completed Details
|
I think this doesn't show anything as it's not enabled by default? Should we enable it? |
I made a PR to test here: I don't think we can enable this cache by default unless it is memory limited. I will write more in my review (in progress) |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @nuno-faria
I found this PR to be well designed, well implemented and well tested. 🏆
I tried this out and I couldn't seem to see it make a difference with datafusion-cli locally for me
I expect the queries after the first run almost instantaneously, however I see them fetching a non trivial amount of data over the network
I think this is related to the fact that there is a separate path to retrieve statistics for ListingTable, specifically https://github.com/apache/datafusion/blob/1452333cf0933d4d8da032af68bc5a3a05c62483/datafusion/datasource-parquet/src/file_format.rs#L975-L974
> set datafusion.execution.parquet.cache_metadata = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> select count(*) from 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 4.632 seconds.
> select count(*) from 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 2.717 seconds.
> select count(*) from 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 3.409 seconds.Since this cache disabled by default and doesn't affect performance, I think would be fine to merge into main and then iterate on there.
However, I think we should aim to enable this cache by default as part of the "great performance out of the box" philosophy.
To do so I think we would need:
- Some sort of upper memory limit on the parquet cache. We can get the in memory size for parquet metadata from https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaData.html#method.memory_size
- Use the cache for statistics as well
I will file a follow on ticket to explain these items too
cc @Ted-Jiang who I think contributed the first version of this cache and the cache infrastructure quite a while ago.
| } | ||
|
|
||
| fn put(&self, _key: &Path, _value: Arc<FileMetadata>) -> Option<Arc<FileMetadata>> { | ||
| panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 a panic like that is unfortunate -- maybe we should change the API so this function can return an error (in a follow on PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preface: I'm a user looking forward to this work being implemented, so don't let anything said here impact the timeline for this PR.
As a user who has recently been looking into (seemingly) IO related performance using remote parquet backing listing tables, I came across the cache_unit.rs source the other day and similarly felt a bit concerned that methods in the default cache implementation code could panic. Obviously an error that can be handled is better than a panic; is this a situation that could be reasonably handled at compile time? This implementation makes use of only _with_extra implementations, but the DefaultListFilesCache makes use of only the normal get and put. Returning a Result would already be API breaking, perhaps the CacheAccessor trait could be broken into several parts indicating whether the implementer intends to support just get and put or the _with_extra variants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I studied this code more carefully, and here is an alternate proposal:
What if we made the key, from the cache's perspective, a tuple of (&Path, &ObjectMetadata) and didn't use extra
I think that is more in the spirit of this cache where the ObjectMetadata is logically part of the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it locally and it seemed possible
What do you think @nuno-faria ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think it makes sense, since the Extra is mandatory to ensure correct results in case the file changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb It appears to work, but I had to introduce lifetimes to FileMetadataCache in order to pass &(&Path, &ObjectMeta). What about using just ObjectMeta as the key, since it already has the path embedded?
pub trait FileMetadataCache:
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
{
}
...
fn get(&self, k: &ObjectMeta)And what about the [get|put]_with_extra methods in this new version, should they also panic! or call get/put and ignore the Extra?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using just ObjectMeta as the key, since it already has the path embedded?
That is a brilliant idea!
And what about the [get|put]_with_extra methods in this new version, should they also panic! or call get/put and ignore the Extra?
I think they should ignore the Extra and call get/put instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DefaultFileStatistics cache offers a caller the option of either checking for object modification or a presumably more performant option where there's no check (ostensibly this is a situation where the caller can reasonably assume the underlying files will not change). Is this something that could reasonably be implemented here, either now or in the future? If yes, it seems like it's important to ensure the key for the lookup can be reasonably built with little IO/latency overhead. Presumably this is why the other CacheAccessor implementations rely on Path since it's readily available.
I don't necessarily think this needs to be in the initial implementation of this metadata cache (again, I don't want to slow this PR down if possible), but it seems like having the option to always rely on the cached metadata when a caller can reasonably assume underlying objects will not change could be beneficial. Metadata requests for local files on a SSD/NVMe will be very fast, however when using remote storage of any sort even the head request to check modification incurs a relatively significant latency penalty compared to a blind cache lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that could reasonably be implemented here, either now or in the future? If yes, it seems like it's important to ensure the key for the lookup can be reasonably built with little IO/latency overhead.
I think the way the API is done now via traits it is possible to provide the default metadata file cache with one that ignores the last updated time
Also I should point out that that all this discussion only applies to people using the built in ListingTable catalog in DataFusion -- if you provide your own TableProvider you can implement many more sophisticated caching techniques (I am working on a blog to explain this more: apache/datafusion-site#98)
| /// (footer and page metadata). Enabling it can offer substantial performance improvements | ||
| /// for repeated queries over large files. By default, the cache is automatically | ||
| /// invalidated when the underlying file is modified. | ||
| pub cache_metadata: bool, default = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, I think it would be better to have this be a size setting metadata_cache_size as then that can represent both disabled (0 size) and a memory cap.
We can do this in a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| /// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page | ||
| /// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. | ||
| /// This reader always loads the entire metadata (including page index, unless the file is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| let file_meta = self.file_meta.clone(); | ||
| let metadata_cache = Arc::clone(&self.metadata_cache); | ||
|
|
||
| async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is impressive that you worked out this API dance -- it is something I really don't like about the current API of the parquet reader.
BTW I am working on improving it (no changes needed or suggested here, I am just self-promoting):
| /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a | ||
| /// data file (e.g., Parquet footer and page metadata). | ||
| /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. | ||
| pub file_metadata_cache: Option<FileMetadataCache>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it is already an option, why do we need a DefaultFilesMetadataCache? 🤔
Couldn't we just leave it as None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial idea here was to make it easy to enable the metadata cache without having to provide a custom FileMetadataCache when setting up the runtime (default). This way, the user can simply call set datafusion.execution.parquet.cache_metadata = true; or enable for a file with the ParquetReadOptions. But I don't know if there is a better approach (maybe removing the Option for the file_metadata_cache altogether?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I understand this now and it makes sense
| ); | ||
| } | ||
|
|
||
| #[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍
|
|
||
| # Updating the file should invalidate the cache. Otherwise, the following queries would fail | ||
| # (e.g., with "Arrow: Parquet argument error: External: incomplete frame"). | ||
| query I |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
amazing
| | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | ||
| | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | ||
| | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | ||
| | datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it is a problem if we wanted to turn this feature on by default, which I do. However, i don't think we need to make any changes in this particular PR
Update -- clickbench benchmarks look quite promising for the already shorter queries: |
|
This is super exciting! 🚀 |
|
I also filed a ticket to track making |
|
I have gathered follow on tasks in an epic: |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I filed the follow on tasks I could see and thus I think this PR is ready to merge from my perspective
Thank you again @nuno-faria
I think we should wait another day or two in case anyone else has comments they would like to provide
I do think it would be good to sort out the panic
| } | ||
|
|
||
| fn put(&self, _key: &Path, _value: Arc<FileMetadata>) -> Option<Arc<FileMetadata>> { | ||
| panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I studied this code more carefully, and here is an alternate proposal:
What if we made the key, from the cache's perspective, a tuple of (&Path, &ObjectMetadata) and didn't use extra
I think that is more in the spirit of this cache where the ObjectMetadata is logically part of the key.
| } | ||
|
|
||
| fn put(&self, _key: &Path, _value: Arc<FileMetadata>) -> Option<Arc<FileMetadata>> { | ||
| panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it locally and it seemed possible
What do you think @nuno-faria ?
|
I thought some more about this PR last night and I wanted to suggest another idea, which is once we have added the memory limit to the cache, ALWAYS have the built in parquet reader try and save items to the cache (and remove the current parquet config, only use the file metadata cache) The more I think about this series of PRs the better I am feeling |
|
EDIT: I realized my testing methodology is flawed for these results, as using CREATE TABLE enabled other caching that isn't present when trying to access the remote URL directly. I'm leaving what I had here to avoid confusion, but implementing a
DataFusion CLI v49.0.0
> set datafusion.execution.parquet.cache_metadata = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> CREATE EXTERNAL TABLE athena_partitioned
STORED AS PARQUET LOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 3.319 seconds.
> select count(*) from athena_partitioned;
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 2.141 seconds.
> select count(*) from athena_partitioned;
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 0.159 seconds.
> select count(*) from athena_partitioned;
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 0.161 seconds.
|
|
Thanks for checking @BlakeOrth
Update: i see the new code. I will review and merge |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @nuno-faria and @BlakeOrth -- I think this looks great. Let's merge this one in and make additional changes as a follow on
* feat: Cache Parquet metadata * Convert FileMetadata and FileMetadataCache to traits * Use as_any to respect MSRV * Use ObjectMeta as the key of FileMetadataCache
|
Thanks @BlakeOrth |
Which issue does this PR close?
Closes Cache Parquet Metadata #15582.
Related issues:
ParquetOpener#15179datafusion-cliwhen reading from remote storage #16365Rationale for this change
With large Parquet files, a non-negligible amount of time is spent reading the footer and page metadata. This overhead becomes more noticeable when the queries are relatively simple. With repeated scans over the same file, we can improve this by caching the metadata, so it is only read once.
With a benchmark using a large file (100M rows, 2 cols) and simple reads (
select where k = ...), caching the Parquet metadata makes it more than 10x faster.Simple benchmark
The metadata cache is disabled by default. It can be turned on for a specific Parquet using
ParquetReadOptions:It can also be enabled for all Parquet files, using the SQL API:
The cache is automatically invalidated when the file changes.
When the cache is enabled, the entire metadata will be read, including the page index, unless using encryption:
datafusion/datafusion/datasource-parquet/src/opener.rs
Lines 146 to 147 in 94e8548
This means that it is not worth enabling it for single file scans whose query does not need the page index.
What changes are included in this PR?
cache_metadatatoParquetOptions(default = false).cache_metadatatoParquetReadOptions(default = None).CachedParquetFileReaderFactoryandCachedParquetFileReader.file_metadata_cachetoCacheManager(default =DefaultFilesMetadataCache).DefaultFilesMetadataCache.ParquetFormatto call theCachedParquetFileReaderFactorywhen caching is enabled.proto::ParquetOptions.cache/cache_unit.rs.Are these changes tested?
Yes.
Are there any user-facing changes?
Added a new external configuration, but it is disabled by default.