Skip to content

Parallel CSV reading #6325

@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

As part of having a great "out of the box" experience it is important to use as many cores as possible in DataFusion. Given modern consumer laptops have 8-16 cores using multiple cores can literally translate to an order of magnitude faster performance.

While DataFusion offers the ability to read partitioned datasets (aka when the input is in multiple files), often, especially for initially testing out the tool, people will simply running queries on their existing CSV or JSON datasets and it will be relatively slow.

We already have the great datafusion.optimizer.repartition_file_scans option (see docs) -- added by @korowa in #5057 (👋 !) which uses multiple cores to decode the parquet files in parallel. I would like a similar feature for CSV files

Describe the solution you'd like

One basic approach (following what @korowa did for Parquet) would be:

  1. If the datafusion.optimizer.repartition_file_scans option is set, divide the file into even (byte) sized contiguous blocks, probably with some lower limit (like 1MB)
  2. Update CsvExec to process partitions using those subsets of the viles

Notes:
Given the vagaries of CSV (e.g. unescaped quoted newlines) it is likely impossible to parallelize CSV reading for all possible files. I think this is fine, and as long as we can turn off the reading in parallel it is better to have faster out of the box query performance for 99.99% of the queries than handle bizzare CSV files always

Care will be required to make sure all records are read exactly once, given the partition splits will likely be in the middle of rows.

One idea for parsing a partition (offset, len):

  1. Start CSV parsing the data starting after finding the next newline after the offset bytes
  2. Continue CSV parsing until the newline after the offset + len byte
        0        A,1,2,3,4,5,6,7,8,9\n                            
        20       A,1,2,3,4,5,6,7,8,9\n                            
        40       A,1,2,3,4,5,6,7,8,9\n ◀─ ─ ─ ─ ─ ─ ─ ─           
        60       A,1,2,3,4,5,6,7,8,9\n                 │          
        80       A,1,2,3,4,5,6,7,8,9\n                            
        100      A,1,2,3,4,5,6,7,8,9\n                 │          
                                                                  
Byte Offset       Lines of CSV Data                    │          
                  (in this case 20                                
                  bytes per line)           Split at byte 50 is in
                                              the middle of this  
                                                     line         
                                                                  

This is similar to what is described in #5205 (comment)

Describe alternatives you've considered

No response

Additional context

@kmitchener noticed the same thing in: #5205

The duckdb implementation of a similar feature may offer some inspiration: duckdb/duckdb#5194

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestperformanceMake DataFusion faster

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions