Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: parallel parquet can underflow when max_record_batch_rows < execution.batch_size #9737

Merged
merged 7 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,47 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn write_parquet_with_small_rg_size() -> Result<()> {
let mut test_df = test_util::test_table().await?;
// make the test data larger so there are multiple batches
for _ in 0..7 {
test_df = test_df.clone().union(test_df)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I ran this test it takes more than 22 seconds on my laptop. I wonder if we really need to generate so much data -- maybe we can try slicing up the batch (or else maybe use larger rg_sizes)

$ cargo test --lib -p datafusion -- write_parquet_with_small_rg_size
...
    Finished test [unoptimized + debuginfo] target(s) in 0.16s
     Running unittests src/lib.rs (target/debug/deps/datafusion-4cbfc61ad6017be4)

running 1 test
test dataframe::parquet::tests::write_parquet_with_small_rg_size ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 651 filtered out; finished in 22.31s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to trigger the issue with less data by lowering execution.batch_size to something small like 10 rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test with a batch size of 10 still panics on main and passes in this PR, but runs in 0.41 seconds.

}
let output_path = "file://local/test.parquet";

for rg_size in (1..7).step_by(5) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of the docs and my playground experiments suggests this is the same as [1, 6] -- is that the intent? Or did you mean 1, 5, 10, 15, 20, 25, 30, 35?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, [1, 6] is all I meant and would be clearer... I originally wanted to loop over more rg sizes but the test was slow. If we streamline the test, we could actually range over more values here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now loops over 0..10 with datafusion.execution.batch_size set to 10.

let df = test_df.clone();
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
let ctx = &test_df.session_state;
ctx.runtime_env().register_object_store(&local_url, local);
let mut options = TableParquetOptions::default();
options.global.max_row_group_size = rg_size;
options.global.allow_single_file_parallelism = true;
df.write_parquet(
output_path,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(options),
)
.await?;

// Check that file actually used the correct rg size
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling into_path here I think means the file won't be cleaned up

I think calling path() would ensure the file is cleaned up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this in the new test and a preexisting one.


let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file)
.unwrap();

let parquet_metadata = reader.metadata();

let written_rows = parquet_metadata.row_group(0).num_rows();

assert_eq!(written_rows as usize, rg_size);
}

Ok(())
}
}
73 changes: 39 additions & 34 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,42 +876,47 @@ fn spawn_parquet_parallel_serialization_task(
)?;
let mut current_rg_rows = 0;

while let Some(rb) = data.recv().await {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;
while let Some(mut rb) = data.recv().await {
// This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
// when max_row_group_rows < execution.batch_size as an alternative to a recursive async
// function.
loop {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
break;
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);

let b = rb.slice(rows_left, rb.num_rows() - rows_left);
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
send_arrays_to_col_writers(&col_array_channels, &b, schema.clone())
.await?;
current_rg_rows = b.num_rows();
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
}
}
}

Expand Down
Loading