-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: parallel parquet can underflow when max_record_batch_rows < execution.batch_size #9737
Changes from 4 commits
030e59d
4d46fcc
677272f
40cf8f9
8b67b12
c9da8c8
0dc697d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,4 +166,47 @@ mod tests { | |
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn write_parquet_with_small_rg_size() -> Result<()> { | ||
let mut test_df = test_util::test_table().await?; | ||
// make the test data larger so there are multiple batches | ||
for _ in 0..7 { | ||
test_df = test_df.clone().union(test_df)?; | ||
} | ||
let output_path = "file://local/test.parquet"; | ||
|
||
for rg_size in (1..7).step_by(5) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My reading of the docs and my playground experiments suggests this is the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, [1, 6] is all I meant and would be clearer... I originally wanted to loop over more rg sizes but the test was slow. If we streamline the test, we could actually range over more values here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This now loops over 0..10 with datafusion.execution.batch_size set to 10. |
||
let df = test_df.clone(); | ||
let tmp_dir = TempDir::new()?; | ||
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); | ||
let local_url = Url::parse("file://local").unwrap(); | ||
let ctx = &test_df.session_state; | ||
ctx.runtime_env().register_object_store(&local_url, local); | ||
let mut options = TableParquetOptions::default(); | ||
options.global.max_row_group_size = rg_size; | ||
options.global.allow_single_file_parallelism = true; | ||
df.write_parquet( | ||
output_path, | ||
DataFrameWriteOptions::new().with_single_file_output(true), | ||
Some(options), | ||
) | ||
.await?; | ||
|
||
// Check that file actually used the correct rg size | ||
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling I think calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fixed this in the new test and a preexisting one. |
||
|
||
let reader = | ||
parquet::file::serialized_reader::SerializedFileReader::new(file) | ||
.unwrap(); | ||
|
||
let parquet_metadata = reader.metadata(); | ||
|
||
let written_rows = parquet_metadata.row_group(0).num_rows(); | ||
|
||
assert_eq!(written_rows as usize, rg_size); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I ran this test it takes more than 22 seconds on my laptop. I wonder if we really need to generate so much data -- maybe we can try slicing up the batch (or else maybe use larger rg_sizes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to trigger the issue with less data by lowering
execution.batch_size
to something small like 10 rows.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test with a batch size of 10 still panics on main and passes in this PR, but runs in 0.41 seconds.