Skip to content

Leverage PartialReduce AggregationExec mode to drastically reduce shuffle size #360

@fmonjalet

Description

@fmonjalet

Summary

When DataFusion 53 is released, we should use the PartialReduce mode of AggregateExec (apache/datafusion#20019, kudos to @njsmith) for maximum local data reduction before shuffling.

The expectation is reducing the amount of data exchanged by up to a factor of n_partition_per_task (for example 16x).

Details

Very schematically, here is how a shuffle happens:

Image

My understanding is that we will send duplicate aggregate keys on the network, where we could actually have reduced them to one line locally before the shuffle.

We should leverage PartialReduce to further reduce data before shuffling:

Image

In this example, we'd reduce data exchanged by 4x (because there are 4 partitions per task).

No strong opinion on whether we should add another RepartitionExec layer like in this graph, or if we should run the AggregateExec(PartialReduce) on the "ready to shuffle" partitions. Since shuffles can have hundreds of partitions, I don't know what is the comparative cost of:

  • RepartitionExec(Hash, output_partitions=16) + AggregateExec(PartialReduce) on 16 partitions
  • vs AggregateExec(PartialReduce) on potentially hundreds of partitions

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions