Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate MetadataLoader #6474

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
/// Create a new [`MetadataLoader`] by reading the footer information
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
}

/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
Self {
fetch,
Expand All @@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
///
/// * `column_index`: if true will load column index
/// * `offset_index`: if true will load offset index
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
if !column_index && !offset_index {
return Ok(());
Expand Down Expand Up @@ -210,6 +213,16 @@ where
}
}

impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move { self.0(range).await }.boxed()
}
}

/// Fetches parquet metadata
///
/// Parameters:
Expand All @@ -226,6 +239,7 @@ where
/// in the first request, instead of 8, and only issue further requests
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
/// significantly reduce the number of `fetch` requests, and consequently latency
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
Expand All @@ -235,11 +249,15 @@ where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
Ok(loader.finish())
let mut fetch = MetadataFetchFn(fetch);
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
.load_and_finish(&mut fetch, file_size)
.await
}

// these tests are all replicated in parquet::file::metadata::reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

#[allow(deprecated)]
#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,18 @@ impl ArrowReaderMetadata {
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only is this akward, it is also a common source of confusion / bugs (namely that when someone supplies the ParquetMetaData to the arrow reader options to avoid a second object store request, if often turns out the second fetch happens anyways to read the page index (thus obviating the attempt at optimization)

To avoid this they need to ensure when they read the metadata in the first place, they also read the page index.

This is (in a roundabout way) what is happening to @progval in apache/datafusion#12593

I will try and file a ticket explaining the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #6476

// took an argument to fetch the page indexes.
let mut metadata = input.get_metadata().await?;

if options.page_index
&& metadata.column_index().is_none()
&& metadata.offset_index().is_none()
{
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
let mut loader = MetadataLoader::new(input, m);
loader.load_page_index(true, true).await?;
metadata = Arc::new(loader.finish())
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
reader.load_page_index(input).await?;
metadata = Arc::new(reader.finish()?)
}
Self::try_new(metadata, options)
}
Expand Down
18 changes: 8 additions & 10 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
///
Expand Down Expand Up @@ -124,15 +124,13 @@ impl AsyncFileReader for ParquetObjectReader {

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let preload_column_index = self.preload_column_index;
let preload_offset_index = self.preload_offset_index;
let file_size = self.meta.size;
let prefetch = self.metadata_size_hint;
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
loader
.load_page_index(preload_column_index, preload_offset_index)
let metadata = ParquetMetaDataReader::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so-beautiful

.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, self.meta.size)
.await?;
Ok(Arc::new(loader.finish()))
Ok(Arc::new(metadata))
})
}
}
Expand Down
87 changes: 47 additions & 40 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,14 @@ impl ParquetMetaDataReader {
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(feature = "async")]
pub async fn load_and_finish<F: MetadataFetch>(
pub async fn load_and_finish<F>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this API is new and has not yet been released, this is not a breaking API change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Specifically ParquetMetaDataReader has not been released yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is why I was rushing to get this into 53.1.0. 😄 But if we revert this change then this PR can merge after 53.1.0 is released.

mut self,
fetch: F,
fetch: &mut F,
file_size: usize,
) -> Result<ParquetMetaData> {
) -> Result<ParquetMetaData>
where
for<'a> &'a mut F: MetadataFetch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @alamb. I did not like that the load... functions in ParquetMetaDataReader took ownership of fetch.

What don't you like about it? Is the idea that MetadataFetch actually has mutable state so that calling fetch would change its state?

I think in general trying to get async functions to be happy with references is often much harder than with owned objects.

So I guess I think we should keep the API as owned to make working with these APIs simpler, unless there is some compelling reason for the change (better performance, etc)

Copy link
Contributor Author

@etseidl etseidl Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What don't you like about it? Is the idea that MetadataFetch actually has mutable state so that calling fetch would change its state?

Some of it is my lack of Rust experience. I'm having a hard time tracking the ownership here, and am confusing traits with objects. Implementing the MetadataFetch trait for a mutable reference to something that implements the AsyncFileReader trait leaves my head spinning a little.

I think in general trying to get async functions to be happy with references is often much harder than with owned objects.

Indeed, it took me days to get this to compile 😅.

So I guess I think we should keep the API as owned to make working with these APIs simpler, unless there is some compelling reason for the change (better performance, etc)

I'll revert now that I understand the mechanics a little better. But one consequence is some things will get a little more verbose. For instance, when not taking ownership, get_metadata for ParquetObjectReader is

    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
        Box::pin(async move {
            let metadata = ParquetMetaDataReader::new()
                .with_column_indexes(self.preload_column_index)
                .with_offset_indexes(self.preload_offset_index)
                .with_prefetch_hint(self.metadata_size_hint)
                .load_and_finish(self, self.meta.size)
                .await?;
            Ok(Arc::new(metadata))
        })
    }

But if load_and_finish takes ownership of the mutable reference to self, we can't then get at self.meta.size in the same call, but need to save it earlier, IIUC.

    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
        Box::pin(async move {
            let file_size = self.meta.size;
            let metadata = ParquetMetaDataReader::new()
                .with_column_indexes(self.preload_column_index)
                .with_offset_indexes(self.preload_offset_index)
                .with_prefetch_hint(self.metadata_size_hint)
                .load_and_finish(self, file_size)
                .await?;
            Ok(Arc::new(metadata))
        })
    }

A small price to pay, but one I was hoping to avoid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small price to pay, but one I was hoping to avoid.

I agree it would be nicer.

While I don't fully understand all the implications here, this is in my mind related to how how the rust compiler generates async continuations and how it interacts with the borrow checker. Sometimes it has a hard time expressing that a borrow is ok for some reason

{
self.try_load(fetch, file_size).await?;
self.finish()
}
Expand All @@ -314,13 +317,12 @@ impl ParquetMetaDataReader {
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(feature = "async")]
pub async fn try_load<F: MetadataFetch>(
&mut self,
mut fetch: F,
file_size: usize,
) -> Result<()> {
pub async fn try_load<F>(&mut self, fetch: &mut F, file_size: usize) -> Result<()>
where
for<'a> &'a mut F: MetadataFetch,
{
let (metadata, remainder) =
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;
Self::load_metadata(fetch, file_size, self.get_prefetch_size()).await?;

self.metadata = Some(metadata);

Expand All @@ -329,17 +331,28 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index(fetch, remainder).await
self.load_page_index_with_remainder(fetch, remainder).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(feature = "async")]
pub async fn load_page_index<F: MetadataFetch>(
pub async fn load_page_index<F>(&mut self, fetch: &mut F) -> Result<()>
where
for<'a> &'a mut F: MetadataFetch,
{
self.load_page_index_with_remainder(fetch, None).await
}

#[cfg(feature = "async")]
async fn load_page_index_with_remainder<F>(
&mut self,
mut fetch: F,
mut fetch: &mut F,
remainder: Option<(usize, Bytes)>,
) -> Result<()> {
) -> Result<()>
where
for<'a> &'a mut F: MetadataFetch,
{
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
}
Expand Down Expand Up @@ -494,11 +507,14 @@ impl ParquetMetaDataReader {
}

#[cfg(feature = "async")]
async fn load_metadata<F: MetadataFetch>(
fetch: &mut F,
async fn load_metadata<F>(
mut fetch: &mut F,
file_size: usize,
prefetch: usize,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)>
where
for<'a> &'a mut F: MetadataFetch,
{
if file_size < FOOTER_SIZE {
return Err(eof_err!("file size of {} is less than footer", file_size));
}
Expand Down Expand Up @@ -836,7 +852,7 @@ mod async_tests {

struct MetadataFetchFn<F>(F);

impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now that the API has been changed back, these test changes are also not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change allows me to wrap the fetch function once, rather than for each invocation of load, but that could be done inline or with a From I suppose. Happy to revert if you'd like.

Copy link
Contributor

@alamb alamb Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in #6484

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought reverting them was nice to demonstrate the same API can still be used

I did notice that the metadata loader tests actually have a copy/paste MetadataFetchFn adapter. Maybe if the arrow crate needs it in two places, we should make it easier for actual users to write them 🤔 Not sure

where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
Expand Down Expand Up @@ -865,74 +881,68 @@ mod async_tests {
let expected = expected.file_metadata().schema();
let fetch_count = AtomicUsize::new(0);

let mut fetch = |range| {
let fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};

let input = MetadataFetchFn(&mut fetch);
let mut f = MetadataFetchFn(fetch);
let actual = ParquetMetaDataReader::new()
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small - below footer size
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(7))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(10))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too large
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(500))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

// Metadata hint exactly correct
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(428))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(input, 4)
.load_and_finish(&mut f, 4)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(input, 20)
.load_and_finish(&mut f, 20)
.await
.unwrap_err()
.to_string();
Expand All @@ -949,42 +959,39 @@ mod async_tests {
futures::future::ready(read_range(&mut file, range))
};

let f = MetadataFetchFn(&mut fetch);
let mut f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(1729));
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130649));
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130650))
.load_and_finish(f, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
Expand Down
14 changes: 6 additions & 8 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ mod tests {
/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
use crate::arrow::async_reader::MetadataFetch;
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -552,7 +552,7 @@ mod tests {
}
}

impl MetadataFetch for &mut MaskedBytes {
impl<'a> MetadataFetch for &'a mut MaskedBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let inner_range = self.inner_range.clone();
println!("inner_range: {:?}", inner_range);
Expand All @@ -569,13 +569,11 @@ mod tests {
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
let metadata = MetadataLoader::load(&mut reader, file_size, None)
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut reader, file_size)
.await
.unwrap();
let loaded_metadata = metadata.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await.unwrap();
metadata.finish()
.unwrap()
}

fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
Expand Down
Loading
Loading