Skip to content

Parallel parquet writer can panic when max_row_group_size < execution.batch_size #9736

@devinjdangelo

Description

@devinjdangelo

Describe the bug

An underflow panic can sometimes be triggered by setting max_row_group_size to less than execution.batch_size.

thread 'tokio-runtime-worker' panicked at datafusion/core/src/datasource/file_format/parquet.rs:886:33:
attempt to subtract with overflow
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

To Reproduce

I found this by trying a wide range of max_row_group_sizes

#[tokio::main]
async fn main() -> Result<()> {
    // create local execution context
    let ctx = SessionContext::new();

    let testdata = "benchmarks/data/tpch_sf10/lineitem";

    let filename = &format!("{testdata}/part-0.parquet");

    // define the query using the DataFrame trait
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?
        .limit(0, Some(200_000))?;

    println!("{}", df.clone().count().await?);

    
    for row_group_size in (1..8193).step_by(283).rev(){
        println!("row group size: {}", row_group_size);
        println!("Writing without parallelism!");
        let row_group_path = format!("/tmp/{}.parquet", row_group_size);
        let mut options = TableParquetOptions::default();
        options.global.max_row_group_size = row_group_size;
        options.global.allow_single_file_parallelism = false;
        df.clone().write_parquet(
                &row_group_path,
                DataFrameWriteOptions::new().with_single_file_output(true),
                Some(options),
        )
        .await
        .unwrap();

        println!("Writing with parallelism!");
        let row_group_path = format!("/tmp/para_{}.parquet", row_group_size);
        let mut options = TableParquetOptions::default();
        options.global.max_row_group_size = row_group_size;
        options.global.allow_single_file_parallelism = true;
        df.clone().write_parquet(
                &row_group_path,
                DataFrameWriteOptions::new().with_single_file_output(true),
                Some(options),
        )
        .await
        .unwrap();

    }
    
    Ok(())
}

Expected behavior

No combination of max_row_group_size and execution.batch_size should lead to panic

Additional context

Extremely tiny max_row_group sizes can cause a stack overflow error even if parallel_parquet writer is disabled. E.g. a max row group size of 1. We may want to raise a configuration validation error for absurdly small row group sizes.

This happens even if release mode:

row group size: 1
Writing without parallelism!

thread 'tokio-runtime-worker' has overflowed its stack
fatal runtime error: stack overflow
Aborted (core dumped)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions