Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: parallel parquet can underflow when max_record_batch_rows < execution.batch_size #9737

Merged
merged 7 commits into from
Mar 23, 2024

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Mar 22, 2024

Which issue does this PR close?

Closes #9736

Rationale for this change

See issue

What changes are included in this PR?

Parallel parquet writer can now handle the case when max_record_batch_rows < execution.batch_size by iteratively splitting the record batch rather than assuming it only needs to be split once.

Are these changes tested?

Yes, added new test that would panic prior to this PR

Are there any user-facing changes?

Just bugfix

@github-actions github-actions bot added the core Core DataFusion crate label Mar 22, 2024
@devinjdangelo devinjdangelo marked this pull request as ready for review March 22, 2024 14:10
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this code looks good to me. I think there are some small issues with the test to fix, but otherwise I think this is good to go.

🙏

I ran the test without the changes in this PR and it fails like

b.rs (target/debug/deps/datafusion-4cbfc61ad6017be4)

attempt to subtract with overflow
thread 'dataframe::parquet::tests::write_parquet_with_small_rg_size' panicked at datafusion/core/src/datasource/file_format/parquet.rs:885:33:
attempt to subtract with overflow
stack backtrace:

}
let output_path = "file://local/test.parquet";

for rg_size in (1..7).step_by(5) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of the docs and my playground experiments suggests this is the same as [1, 6] -- is that the intent? Or did you mean 1, 5, 10, 15, 20, 25, 30, 35?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, [1, 6] is all I meant and would be clearer... I originally wanted to loop over more rg sizes but the test was slow. If we streamline the test, we could actually range over more values here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now loops over 0..10 with datafusion.execution.batch_size set to 10.

.await?;

// Check that file actually used the correct rg size
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling into_path here I think means the file won't be cleaned up

I think calling path() would ensure the file is cleaned up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this in the new test and a preexisting one.

let mut test_df = test_util::test_table().await?;
// make the test data larger so there are multiple batches
for _ in 0..7 {
test_df = test_df.clone().union(test_df)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I ran this test it takes more than 22 seconds on my laptop. I wonder if we really need to generate so much data -- maybe we can try slicing up the batch (or else maybe use larger rg_sizes)

$ cargo test --lib -p datafusion -- write_parquet_with_small_rg_size
...
    Finished test [unoptimized + debuginfo] target(s) in 0.16s
     Running unittests src/lib.rs (target/debug/deps/datafusion-4cbfc61ad6017be4)

running 1 test
test dataframe::parquet::tests::write_parquet_with_small_rg_size ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 651 filtered out; finished in 22.31s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to trigger the issue with less data by lowering execution.batch_size to something small like 10 rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test with a batch size of 10 still panics on main and passes in this PR, but runs in 0.41 seconds.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @devinjdangelo -- I also verified the test now runs quickly and still panics

Thank you so much

@@ -150,7 +152,7 @@ mod tests {
.await?;

// Check that file actually used the specified compression
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the driveby cleanup

@alamb alamb merged commit 02fd450 into apache:main Mar 23, 2024
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parallel parquet writer can panic when max_row_group_size < execution.batch_size
2 participants