Skip to content

Improve RepartitionExec for better query performance #7001

Open
@alamb

Description

@alamb

Summary

The key to using multiple cores efficiently is related to query plan is paralleized. In DataFusion this often involves repartitioning data between partitions. This ticket describes ideas for improving the RepartitionExec operator so that DataFusion's performance scales more linearly with the number of cores.

Experiments @YjyJeff in #6928 show that some part of RepartitionExec can be the bottleneck in certain queries. While we likely can't use the unbounded buffering approach of #6929 for the reasons @crepererum explains in #6929 (comment), there is clearly room to improve. Also, we have some earlier evidence from @andygrove in #5999 that scaling with core counts is a challenge.

Background

DataFusion (and most other commercial systems) uses the "Exchange Operator" 1 based approach to parallelism. I believe the exchange approach is so commonly implemented because it is simple to reason about, and works quite well in practice with sufficient engineering care.

DataFusion's planner picks the target number of partitions and then RepartionExec redistributes RecordBatches to that number of output partitions.

For example, if DataFusion has target_partitions=3 (trying to us 3 cores) but scanning an input with 2 partitions, it makes a plan like this:

        ▲                  ▲                  ▲
        │                  │                  │
        │                  │                  │
        │                  │                  │
┌───────────────┐  ┌───────────────┐  ┌───────────────┐
│    GroupBy    │  │    GroupBy    │  │    GroupBy    │
│   (Partial)   │  │   (Partial)   │  │   (Partial)   │
└───────────────┘  └───────────────┘  └───────────────┘
        ▲                  ▲                  ▲
        └──────────────────┼──────────────────┘
                           │
              ┌─────────────────────────┐
              │     RepartitionExec     │
              │   (hash/round robin)    │
              └─────────────────────────┘
                         ▲   ▲
             ┌───────────┘   └───────────┐
             │                           │
             │                           │
        .─────────.                 .─────────.
     ,─'           '─.           ,─'           '─.
    ;      Input      :         ;      Input      :
    :   Partition 0   ;         :   Partition 1   ;
     ╲               ╱           ╲               ╱
      '─.         ,─'             '─.         ,─'
         `───────'                   `───────'

Note there There are alternative approaches, such as described in the Morsel-Driven Parallelism Paper, but other than the notable exception of DuckDB I don't know of any widely used engine that takes this approach. The major benefit to the morsel driven approach that I understand is that it can, in theory, respond better to dynamic resource changes (e.g. throttle down to use 2 cores when a high priority job comes in and then go back up to 8 cores when done).2

The engineering challenge of the RepartitionExec is twofold:

Synchronization and Stalls

Given RepartitionExec is a synchronization point between multiple threads, without careful engineering the lock contention can become the bottleneck.

Also, without careful scheduling the consumers may "stall" waiting for data from a producer.

NUMA

Non Uniform Memory Access (NUMA) basically means that data that is "closer" to a core is faster to access. In practice what this means is that optimal performance is achieved when the same thread /core that produces data (e.g. decodes a RecordBatch from parquet), also then process it until the next pipeline breaking operation (e..g update the hash table). If one core produces the RecordBatch and another consumes it, additional memory latency is incurred.

Since the RepartitionExec is designed to shuffle data, it is very easy to destroy NUMA locality if care is not taken.

I believe the current RoundRobin approach is not NUMA friendly. It will very likely decode some parquet data, put that batch in a queue, and then go and decode more parquet data (rather than return control to the operator that was waiting for it)

Idea 1: Buffering to the RepartitionExec

One idea, suggested by @Dandandan and @ozankabak in #6929 (comment) is to introduce some more (but not unbounded) buffering into the Repartition operator

For example, perhaps we could extend the existing DistributionSender to have a queue of RecordBatches (2 or 3 for example) rather than just a single Option<> so that it was possible to start fetching the next input immediately

https://github.com/apache/arrow-datafusion/blob/d316702722e6c301fdb23a9698f7ec415ef548e9/datafusion/core/src/physical_plan/repartition/distributor_channels.rs#L180-L182

Idea 2: Make RoundRobin more adaptive

Another idea would be to restructure the "RoundRobin" repartition strategy to be more adaptive. Currently each input evenly sends RecordBatches to each output partition and will return control via tx.send(Some(Ok(batch))).await.is_err() if any output partition is full.

We could potentially improve this logic rather than giving up control by attempting to find an output partition that is empty and filling it (aka tx.try_send and if no space, try some other partitions)

Footnotes

  1. This model was first described in the 1989 paper Encapsulation of parallelism in the Volcano query processing system Paper which uses the term "Exchange" for the concept of repartitioning data across threads.

  2. We actually had a version of this approach as an experimental feature in DataFusion for a while but removed it as we found it didn't offer compelling enough performance improvement to justify the engineering effort.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions