Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetRecordBatchReaderBuilder (#2427) #2435

Merged

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #2427

Rationale for this change

This standardises the configuration of the async and sync arrow parquet readers, helping to avoid inconsistency and reducing duplication.

What changes are included in this PR?

Adds a new ParquetRecordBatchReaderBuilder and deprecates the old APIs

Are there any user-facing changes?

This deprecates old APIs, however, it doesn't remove any of them

@tustvold
Copy link
Contributor Author

@Ted-Jiang could you perhaps take a look and make sure this makes sense

@github-actions github-actions bot added the parquet Changes to the parquet crate label Aug 12, 2022
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
selection: Option<RowSelection>,
page_index: bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a detail worth highlighting here, this forces decoding of the page index for all row groups, as the row group selection isn't known at the point the metadata is read. I experimented with APIs to allow for this, but they were very clunky, and ultimately the index information should be relatively small and cheap to decode so I didn't think it was worth it.

Copy link
Member

@Ted-Jiang Ted-Jiang Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable.
I think read page_index should belong to the open file, have you find out how long read page_index cost🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
page_index: bool,
/// if true, forces decoding of the page index for all row groups
/// as the group selection isn't known at the point the metadata is read
page_index: bool,

///
/// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available
#[allow(unused)]
pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is removed, as it was impossible to use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it moved to ArrowReaderBuilder::with_row_selection?


// Verify that the schema was correctly parsed
let original_schema = arrow_reader.get_schema().unwrap().fields().clone();
assert_eq!(original_schema, *record_batch_reader.schema().fields());
assert_eq!(original_schema.fields(), reader.schema().fields());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure this test makes sense anymore, but I kept the spirit of it

}

/// Returns a reference to the [`ParquetMetaData`] for this parquet file
pub fn metadata(&self) -> &Arc<ParquetMetaData> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is moved into ArrowReaderBuilder

/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
///
/// [`ParquetRecordBatchStreamBuilder`]: [crate::arrow::async_reader::ParquetRecordBatchStreamBuilder]
pub struct ArrowReaderBuilder<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is largely moved from ParquetRecordBatchStreamBuilder

@@ -194,112 +194,23 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}
}

#[doc(hidden)]
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
pub struct AsyncReader<T>(T);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the type trickery that allows sharing the same builder for both the sync and async versions, whilst also not breaking the existing ParquetRecordBatchStreamBuilder API

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change sounds reasonable to me !👍
But i found something: I think now only ParquetRecordBatchReader has the ability to read the page index info near the footer.

Should we also support it in async reader or
because it's cost is small we could use the SyncReader before using async one

@tustvold
Copy link
Contributor Author

I filed #2430 to track adding page index support to the async reader. There is a slight additional complication as it needs to perform IO to read the corresponding bytes, but nothing intractable. Thinking a bit more, I wonder if this should be handled by AsyncFileReader? 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great -- thanks @tustvold

The only question I had was if it made sense to put this API somewhere other than arrow as I think everything (except RowFilter) is applicable to other uses as well.

Also, another check that might be worth doing is to make a draft PR to DataFusion to ensure this API can be used without issue

@@ -124,6 +124,49 @@ impl RowGroupCollection for Arc<dyn FileReader> {
}
}

pub(crate) struct FileReaderRowGroupCollection {
reader: Arc<dyn FileReader>,
row_groups: Option<Vec<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to document what usize means here -- I assume it is the index of the row group within the parquet file? And that if this is None, all row groups will be read?

let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);

// read all fields by columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the usecase (and test) of reading a partial schema still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no separate get_schema_by_columns API anymore, and there is no difference between specifying ProjectionMask::all and not specifying a mask, so this additional bit of the test no longer makes sense. It was added from when get_schema_by_columns used completely different logic from the array reader.

@@ -194,112 +194,23 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}
}

#[doc(hidden)]
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
/// Allows sharing the same builder for both the sync and async versions, whilst also not
/// breaking the existing ParquetRecordBatchStreamBuilder API

/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`]
/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
///
/// [`ParquetRecordBatchStreamBuilder`]: [crate::arrow::async_reader::ParquetRecordBatchStreamBuilder]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually it would be great it update the examples to use this (much nicer) API as well: https://docs.rs/parquet/20.0.0/parquet/arrow/index.html

&self.schema
}

/// Set the size of [`RecordBatch`] to produce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Set the size of [`RecordBatch`] to produce
/// Set the size of [`RecordBatch`] to produce. Defaults to 1024

}
}

/// Provide a [`RowFilter`] to skip decoding rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Provide a [`RowFilter`] to skip decoding rows
/// Provide a [`RowFilter`] to skip decoding rows. Row filters are applied
/// after row group selection and row selection

pub(crate) selection: Option<RowSelection>,
}

impl<T> ArrowReaderBuilder<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking like a very nice api 👌 👨‍🍳

🎩 tip to you @tustvold @Ted-Jiang and @thinkharderdev for this. Very cool

#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
selection: Option<RowSelection>,
page_index: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
page_index: bool,
/// if true, forces decoding of the page index for all row groups
/// as the group selection isn't known at the point the metadata is read
page_index: bool,

///
/// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available
#[allow(unused)]
pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it moved to ArrowReaderBuilder::with_row_selection?

let file = File::open(&path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();

let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 very nice

@tustvold
Copy link
Contributor Author

as I think everything (except RowFilter) is applicable to other uses as well.

I would rather leave this for when I eventually get to cleaning up the lower level APIs, this PR can be viewed as decoupling the arrow implementation from the other APIs, so that a subsequent PR can revisit them

Comment on lines +150 to +156
#[allow(unused)]
pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self {
Self {
selection: Some(selection),
..self
}
}
Copy link
Contributor

@thinkharderdev thinkharderdev Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would it make sense to collapse with_row_selection and with_row_filter. The API is a bit confusing with both. And you could always just define a RowSelection as an ArrowPredicate.

Edit: To clarify a bit, I'm not sure I understand the use case in which you would have the RowSelection when constructing the reader. Obviously defining the selection in terms of an ArrowPredicate is not ideal since it is only applied after decoding, which pretty much defeats the purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are related but different, in particular with_row_selection exists to allow you to specify a row selection before reading any data, e.g. based on information in the PageIndex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to clarify this in the comments (as others will likely have the same question)

@tustvold tustvold merged commit 76cfe83 into apache:master Aug 15, 2022
@ursabot
Copy link

ursabot commented Aug 15, 2022

Benchmark runs are scheduled for baseline = 3f0e12d and contender = 76cfe83. 76cfe83 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Standardize creation and configuration of parquet --> Arrow readers ( ParquetRecordBatchReaderBuilder)
5 participants