-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-12213] [SQL] use multiple partitions for single distinct query #10228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #47442 has started for PR 10228 at commit |
Test build #47443 has started for PR 10228 at commit |
oh, just realized that the plan for a query like
Ideally, we should still use four aggregate operators like the one shown below but without the overhead of using Expand.
|
We could move the planning of a distinct queries entirely to the DistinctAggregateRewriter. This would require us to merge the non-distinct aggregate paths and the first distinct group aggregate path, so we could avoid the expand in case of a single disinct column group. This is quite a bit of work; I don't know if this is worth the effort. |
Test build #47533 has finished for PR 10228 at commit
|
val aggregationBufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes) | ||
val modes = aggregateExpressions.map(_.mode).distinct | ||
if (aggregateExpressions.nonEmpty) { | ||
val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this just check groupingKeyAttributes.nonEmpty?
Test build #47536 has finished for PR 10228 at commit
|
@hvanhovell The difficulty of doing this in DistinctAggregateRewriter is that DistinctAggregateRewriter will generate two logical plan, but some aggregation functions have different updateExpression and mergeExpression, so will could not work as update-merge-update-final, they should work as update-merge-merge-final. |
@davies don't get me wrong. I think this PR is an improvement of the current situation (it never crossed my mind to change partitioning when I was working on that part of the code), and should be added. I am also not to keen on changing the MultipleDistinctRewriter; given the time it'll take and the objections you've raised. The only thing that bugs me is, is that we currently rewrite distinct aggregates in two places, and I was thinking (out-loud) about a potential solution. |
@hvanhovell If we could figure out a better solution, it's definitely welcomed. |
Test build #47544 has finished for PR 10228 at commit
|
Test build #47559 has finished for PR 10228 at commit
|
8262ad8
to
740e725
Compare
Test build #47570 has finished for PR 10228 at commit
|
740e725
to
71e0b1c
Compare
Test build #2211 has finished for PR 10228 at commit
|
Test build #47601 has finished for PR 10228 at commit
|
Test build #2212 has finished for PR 10228 at commit
|
protected val allAggregateFunctions: Array[AggregateFunction] = { | ||
protected def initializeAggregateFunctions( | ||
expressions: Seq[AggregateExpression], | ||
startingInputBufferOffset: Int): Array[AggregateFunction] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format
case other => | ||
throw new IllegalStateException( | ||
s"${aggregationMode} should not be passed into TungstenAggregationIterator.") | ||
override def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override protected
@hvanhovell With this change, we will use the planner rule to handle single distinct aggregation and use the rewriter to handle multiple distinct aggregations, which is the same as when you originally introduced the rewriter. I guess the compilation logic after this change is better than our current logic (having two different rules that handle the same case). What do you think? |
@davies I only left a few minor comments. Overall, it is very cool! |
@yhuai I think having the two clearly separated paths (this PR) is an improvement of the current situation. I also admit that I am responsible for introducing the second path. Your comment on having four aggregate steps without the exhange triggered me, and I was thinking out loud on how we could do this using the rewriting rule (the removal of one of the paths would have been a bonus). |
i += 1 | ||
val joinedRow = new JoinedRow | ||
if (aggregateExpressions.nonEmpty) { | ||
val mergeExpressions = aggregateFunctions.zipWithIndex.flatMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zip(aggregateExpressions)
?
Test build #47622 has finished for PR 10228 at commit
|
@hvanhovell Yea, it's great to think about how to use a single rule to handle aggregation queries with distinct after we have this improvement. The logical rewriter rules probably is a good place because rewriting logical plans is easier. If it is the right approach, we can make some changes to our physical planner to make it respect the aggregation mode of an agg expression in a logical agg operator (right now, our physical planner always ignore the mode). So, when we create physical plan, we can understand that, for example, a logical agg operator is used to merge aggregation buffers. |
@hvanhovell How about we merge this first and we take a look at how to use a single rule to handle aggregation queries with distinct? |
@yhuai LGTM. Yea, lets merge this one, I'll create a ticket for the distinct rules |
Cool. I am merging this one to master. |
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other works better for high cardinality column (default one). This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6). For a query like `SELECT COUNT(DISTINCT a) FROM table` will be ``` AGG-4 (count distinct) Shuffle to a single reducer Partial-AGG-3 (count distinct, no grouping) Partial-AGG-2 (grouping on a) Shuffle by a Partial-AGG-1 (grouping on a) ``` This PR also includes large refactor for aggregation (reduce 500+ lines of code) cc yhuai nongli marmbrus Author: Davies Liu <davies@databricks.com> Closes apache#10228 from davies/single_distinct.
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other
works better for high cardinality column (default one).
This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag
spark.sql.specializeSingleDistinctAggPlanning
(introduced in 1.6).For a query like
SELECT COUNT(DISTINCT a) FROM table
will beThis PR also includes large refactor for aggregation (reduce 500+ lines of code)
cc @yhuai @nongli @marmbrus