-
Notifications
You must be signed in to change notification settings - Fork 797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ParquetRecordBatchReaderBuilder (#2427) #2435
Add ParquetRecordBatchReaderBuilder (#2427) #2435
Conversation
@Ted-Jiang could you perhaps take a look and make sure this makes sense |
#[derive(Debug, Clone, Default)] | ||
pub struct ArrowReaderOptions { | ||
skip_arrow_metadata: bool, | ||
selection: Option<RowSelection>, | ||
page_index: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a detail worth highlighting here, this forces decoding of the page index for all row groups, as the row group selection isn't known at the point the metadata is read. I experimented with APIs to allow for this, but they were very clunky, and ultimately the index information should be relatively small and cheap to decode so I didn't think it was worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable.
I think read page_index should belong to the open file, have you find out how long read page_index
cost🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
page_index: bool, | |
/// if true, forces decoding of the page index for all row groups | |
/// as the group selection isn't known at the point the metadata is read | |
page_index: bool, |
/// | ||
/// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available | ||
#[allow(unused)] | ||
pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is removed, as it was impossible to use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it moved to ArrowReaderBuilder::with_row_selection
?
|
||
// Verify that the schema was correctly parsed | ||
let original_schema = arrow_reader.get_schema().unwrap().fields().clone(); | ||
assert_eq!(original_schema, *record_batch_reader.schema().fields()); | ||
assert_eq!(original_schema.fields(), reader.schema().fields()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure this test makes sense anymore, but I kept the spirit of it
} | ||
|
||
/// Returns a reference to the [`ParquetMetaData`] for this parquet file | ||
pub fn metadata(&self) -> &Arc<ParquetMetaData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is moved into ArrowReaderBuilder
/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`] | ||
/// | ||
/// [`ParquetRecordBatchStreamBuilder`]: [crate::arrow::async_reader::ParquetRecordBatchStreamBuilder] | ||
pub struct ArrowReaderBuilder<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is largely moved from ParquetRecordBatchStreamBuilder
@@ -194,112 +194,23 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T { | |||
} | |||
} | |||
|
|||
#[doc(hidden)] | |||
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async | |||
pub struct AsyncReader<T>(T); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the type trickery that allows sharing the same builder for both the sync and async versions, whilst also not breaking the existing ParquetRecordBatchStreamBuilder
API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change sounds reasonable to me !👍
But i found something: I think now only ParquetRecordBatchReader
has the ability to read the page index info near the footer.
Should we also support it in async reader or
because it's cost is small we could use the SyncReader before using async one
I filed #2430 to track adding page index support to the async reader. There is a slight additional complication as it needs to perform IO to read the corresponding bytes, but nothing intractable. Thinking a bit more, I wonder if this should be handled by AsyncFileReader? 🤔 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks great -- thanks @tustvold
The only question I had was if it made sense to put this API somewhere other than arrow
as I think everything (except RowFilter
) is applicable to other uses as well.
Also, another check that might be worth doing is to make a draft PR to DataFusion to ensure this API can be used without issue
@@ -124,6 +124,49 @@ impl RowGroupCollection for Arc<dyn FileReader> { | |||
} | |||
} | |||
|
|||
pub(crate) struct FileReaderRowGroupCollection { | |||
reader: Arc<dyn FileReader>, | |||
row_groups: Option<Vec<usize>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help to document what usize
means here -- I assume it is the index of the row group within the parquet file? And that if this is None, all row groups will be read?
let read_schema = arrow_reader.get_schema()?; | ||
assert_eq!(schema, read_schema); | ||
|
||
// read all fields by columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the usecase (and test) of reading a partial schema still valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no separate get_schema_by_columns API anymore, and there is no difference between specifying ProjectionMask::all and not specifying a mask, so this additional bit of the test no longer makes sense. It was added from when get_schema_by_columns used completely different logic from the array reader.
@@ -194,112 +194,23 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T { | |||
} | |||
} | |||
|
|||
#[doc(hidden)] | |||
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async | |
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async | |
/// Allows sharing the same builder for both the sync and async versions, whilst also not | |
/// breaking the existing ParquetRecordBatchStreamBuilder API |
/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`] | ||
/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`] | ||
/// | ||
/// [`ParquetRecordBatchStreamBuilder`]: [crate::arrow::async_reader::ParquetRecordBatchStreamBuilder] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually it would be great it update the examples to use this (much nicer) API as well: https://docs.rs/parquet/20.0.0/parquet/arrow/index.html
&self.schema | ||
} | ||
|
||
/// Set the size of [`RecordBatch`] to produce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Set the size of [`RecordBatch`] to produce | |
/// Set the size of [`RecordBatch`] to produce. Defaults to 1024 |
} | ||
} | ||
|
||
/// Provide a [`RowFilter`] to skip decoding rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Provide a [`RowFilter`] to skip decoding rows | |
/// Provide a [`RowFilter`] to skip decoding rows. Row filters are applied | |
/// after row group selection and row selection |
pub(crate) selection: Option<RowSelection>, | ||
} | ||
|
||
impl<T> ArrowReaderBuilder<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking like a very nice api 👌 👨🍳
🎩 tip to you @tustvold @Ted-Jiang and @thinkharderdev for this. Very cool
#[derive(Debug, Clone, Default)] | ||
pub struct ArrowReaderOptions { | ||
skip_arrow_metadata: bool, | ||
selection: Option<RowSelection>, | ||
page_index: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
page_index: bool, | |
/// if true, forces decoding of the page index for all row groups | |
/// as the group selection isn't known at the point the metadata is read | |
page_index: bool, |
/// | ||
/// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available | ||
#[allow(unused)] | ||
pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it moved to ArrowReaderBuilder::with_row_selection
?
let file = File::open(&path).unwrap(); | ||
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); | ||
|
||
let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 very nice
I would rather leave this for when I eventually get to cleaning up the lower level APIs, this PR can be viewed as decoupling the arrow implementation from the other APIs, so that a subsequent PR can revisit them |
#[allow(unused)] | ||
pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self { | ||
Self { | ||
selection: Some(selection), | ||
..self | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, would it make sense to collapse with_row_selection
and with_row_filter
. The API is a bit confusing with both. And you could always just define a RowSelection
as an ArrowPredicate
.
Edit: To clarify a bit, I'm not sure I understand the use case in which you would have the RowSelection
when constructing the reader. Obviously defining the selection in terms of an ArrowPredicate
is not ideal since it is only applied after decoding, which pretty much defeats the purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are related but different, in particular with_row_selection
exists to allow you to specify a row selection before reading any data, e.g. based on information in the PageIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to clarify this in the comments (as others will likely have the same question)
Benchmark runs are scheduled for baseline = 3f0e12d and contender = 76cfe83. 76cfe83 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #2427
Rationale for this change
This standardises the configuration of the async and sync arrow parquet readers, helping to avoid inconsistency and reducing duplication.
What changes are included in this PR?
Adds a new
ParquetRecordBatchReaderBuilder
and deprecates the old APIsAre there any user-facing changes?
This deprecates old APIs, however, it doesn't remove any of them