Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fusing partial aggregation with repartition #12596

Open
Rachelint opened this issue Sep 23, 2024 · 7 comments
Open

Fusing partial aggregation with repartition #12596

Rachelint opened this issue Sep 23, 2024 · 7 comments
Assignees
Labels
enhancement New feature or request

Comments

@Rachelint
Copy link
Contributor

Is your feature request related to a problem or challenge?

I impl a poc #12526, and found this idea can actually improve performance.

But for some reasons stated in #11680 (comment)

I think this improvement is not so suitable to be pushed forward currently.

Just file an issue to track it.

Describe the solution you'd like

  • Introduce the partitioned hashtable in partial aggregation, and we partition the datafusion before inserting them into hashtable.
  • And we push them into final aggregation partition by partition after, rather than split them again in repartition, and merge them again in coalesce.

Describe alternatives you've considered

No response

Additional context

No response

@Rachelint Rachelint added the enhancement New feature or request label Sep 23, 2024
@Rachelint
Copy link
Contributor Author

take

@Rachelint
Copy link
Contributor Author

I will push this forward after tasks listed in #11680 (comment) finished

@Rachelint
Copy link
Contributor Author

@waynexia may be also interested about this?

@yjshen
Copy link
Member

yjshen commented Sep 25, 2024

Introduce the partitioned hashtable in partial aggregation, and we partition the datafusion before inserting them into hashtable.
And we push them into final aggregation partition by partition after, rather than split them again in repartition, and merge them again in coalesce.

I'm not clear on how this proposal works. Could you please explain why it provides performance benefits compared to partial aggregation, exchange, and final aggregation? Is the proposal aimed explicitly at accelerating high cardinality aggregation, or is it intended to enhance aggregation performance?

@Rachelint
Copy link
Contributor Author

Rachelint commented Sep 25, 2024

Introduce the partitioned hashtable in partial aggregation, and we partition the datafusion before inserting them into hashtable.
And we push them into final aggregation partition by partition after, rather than split them again in repartition, and merge them again in coalesce.

I'm not clear on how this proposal works. Could you please explain why it provides performance benefits compared to partial aggregation, exchange, and final aggregation? Is the proposal aimed explicitly at accelerating high cardinality aggregation, or is it intended to enhance aggregation performance?

I think it enhances aggregation performance generally?

  • Currently we can think GroupValues and GroupAccumulator uses a single Vec to manage intermediate states in partial aggr.

  • After finishing work in partial aggr, we pass the batch to exchange, then we recompute the hashes of batch. Actually the hashes have been computed in GroupValues, the this recomputing is the first avoidable cpu cost.

  • Then we split the batch to multiple batches, according to the partition nubmers computed from hashes. The splitting needs to creating multiple new batches to hold the values from the source batch, and need to copy data into them, and that is the second avoidable cpu cost.

  • Finally, before passing data to final aggr of the partition, we need to copy the splitted small batches of the partition to buffer in coalesce firstly, until the buffer large enough (usually the default batch size 8192), and that is the third avoidable cpu cost.

After using partitioned approach in GroupValues and GroupAccumulator:

  • We can naturally reuse the computed hashes in GroupValues when we calculating the partition numbers of the batches.

  • We store the intermediate states in partial aggr partition by partition. And when we submit them to final aggr, we just submit them partition by partition, rather than splitting first and merging after.

@jayzhan211
Copy link
Contributor

I think our goal is to combine partial + repartition + final into single operator, and partial + repartition fusing is the first step of this. After that we could try doing final aggr step as well.

@Rachelint
Copy link
Contributor Author

Rachelint commented Sep 25, 2024

I think our goal is to combine partial + repartition + final into single operator, and partial + repartition fusing is the first step of this. After that we could try doing final aggr step as well.

Yes, it may be attractive if we combine them by someway, we seem to have chance to do more optimizations.
🤔

  • Now we build record batch from partial internal states.
  • Then pass them to final
  • And after we need to copy the needed data and put them to final's internal states.

It seems to be expensive for bytes and string? Maybe we can pass the internal states directly to final, and avoid some copies?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants