Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a 'prefetch' option to ParquetRecordBatchStream to load the next row group while decoding #6676

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

masonh22
Copy link

@masonh22 masonh22 commented Nov 3, 2024

Which issue does this PR close?

Closes #6559.

Rationale for this change

This improves performance when reading from filesystems with high latency and/or low bandwidth.

What changes are included in this PR?

This adds an option to ParquetRecordBatchStream to load the next row group while decoding the current one. This adds a new state called Prefetch to the stream state. In this state, a future for the next row group is polled before returning data from the current ParquetRecordBatchReader.

Are there any user-facing changes?

There is a new option for ParquetRecordBatchStreamBuilder called prefetch that is set by a method called with_prefetch.

commit 28b2bf1
Author: Mason Hall <mason@jmh.fyi>
Date:   Fri Oct 18 15:49:31 2024 -0400

    Cleaned up prefetch and added a test

commit 3d7e018
Author: Mason Hall <mason@jmh.fyi>
Date:   Fri Oct 18 13:32:22 2024 -0400

    prefetch working
@github-actions github-actions bot added the parquet Changes to the parquet crate label Nov 3, 2024
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to find some time over the next week to look into this, but I'm afraid it may be a while before I have time to sit down with this, the logic here is rather subtle and breakages can be very hard to spot / detect.

},
}
StreamState::Prefetch(batch_reader, f) => {
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for doing this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid any potential overhead from using the real context when polling the future here. Since we're always returning Poll::Ready out of this state (or transitioning to another state), we don't need to rely on the real context to wake the main stream future.

I'm not an expert at async rust code though so if it would make more sense to do something else here I'm happy to make that change.

@alamb
Copy link
Contributor

alamb commented Nov 15, 2024

This is still on my radar to review, I just haven't had a chance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParquetRecordBatchStream API to fetch the next row group while decoding
3 participants