Skip to content

[SPARK-12213] [SQL] use multiple partitions for single distinct query #10228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Dec 9, 2015

Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other
works better for high cardinality column (default one).

This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag spark.sql.specializeSingleDistinctAggPlanning (introduced in 1.6).

For a query like SELECT COUNT(DISTINCT a) FROM table will be

AGG-4 (count distinct)
  Shuffle to a single reducer
    Partial-AGG-3 (count distinct, no grouping)
      Partial-AGG-2 (grouping on a)
        Shuffle by a
          Partial-AGG-1 (grouping on a)

This PR also includes large refactor for aggregation (reduce 500+ lines of code)

cc @yhuai @nongli @marmbrus

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47442 has started for PR 10228 at commit e3f2e79.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47443 has started for PR 10228 at commit 5e42c76.

@yhuai
Copy link
Contributor

yhuai commented Dec 9, 2015

oh, just realized that the plan for a query like SELECT COUNT(DISTINCT a) FROM table will be

AGG-2 (count distinct)
  Shuffle to a single reducer
    AGG-1 (grouping on a)
      Shuffle by a
        Partial-AGG-1 (grouping on a)

Ideally, we should still use four aggregate operators like the one shown below but without the overhead of using Expand.

AGG-2 (count distinct)
  Shuffle to a single reducer
    Partial-AGG-2 (count distinct)
      AGG-1 (grouping on a)
        Shuffle by a
          Partial-AGG-1 (grouping on a)

@hvanhovell
Copy link
Contributor

We could move the planning of a distinct queries entirely to the DistinctAggregateRewriter. This would require us to merge the non-distinct aggregate paths and the first distinct group aggregate path, so we could avoid the expand in case of a single disinct column group.

This is quite a bit of work; I don't know if this is worth the effort.

@SparkQA
Copy link

SparkQA commented Dec 10, 2015

Test build #47533 has finished for PR 10228 at commit 3f60962.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val aggregationBufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val modes = aggregateExpressions.map(_.mode).distinct
if (aggregateExpressions.nonEmpty) {
val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this just check groupingKeyAttributes.nonEmpty?

@SparkQA
Copy link

SparkQA commented Dec 10, 2015

Test build #47536 has finished for PR 10228 at commit a9eae30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Dec 10, 2015

@hvanhovell The difficulty of doing this in DistinctAggregateRewriter is that DistinctAggregateRewriter will generate two logical plan, but some aggregation functions have different updateExpression and mergeExpression, so will could not work as update-merge-update-final, they should work as update-merge-merge-final.

@hvanhovell
Copy link
Contributor

@davies don't get me wrong. I think this PR is an improvement of the current situation (it never crossed my mind to change partitioning when I was working on that part of the code), and should be added.

I am also not to keen on changing the MultipleDistinctRewriter; given the time it'll take and the objections you've raised. The only thing that bugs me is, is that we currently rewrite distinct aggregates in two places, and I was thinking (out-loud) about a potential solution.

@davies
Copy link
Contributor Author

davies commented Dec 10, 2015

@hvanhovell If we could figure out a better solution, it's definitely welcomed.

@SparkQA
Copy link

SparkQA commented Dec 10, 2015

Test build #47544 has finished for PR 10228 at commit 3f1ea7f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 11, 2015

Test build #47559 has finished for PR 10228 at commit 8262ad8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 11, 2015

Test build #47570 has finished for PR 10228 at commit 740e725.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2015

Test build #2211 has finished for PR 10228 at commit 740e725.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2015

Test build #47601 has finished for PR 10228 at commit 71e0b1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2015

Test build #2212 has finished for PR 10228 at commit 71e0b1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

protected val allAggregateFunctions: Array[AggregateFunction] = {
protected def initializeAggregateFunctions(
expressions: Seq[AggregateExpression],
startingInputBufferOffset: Int): Array[AggregateFunction] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format

case other =>
throw new IllegalStateException(
s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
override def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override protected

@yhuai
Copy link
Contributor

yhuai commented Dec 13, 2015

@hvanhovell With this change, we will use the planner rule to handle single distinct aggregation and use the rewriter to handle multiple distinct aggregations, which is the same as when you originally introduced the rewriter. I guess the compilation logic after this change is better than our current logic (having two different rules that handle the same case). What do you think?

@yhuai
Copy link
Contributor

yhuai commented Dec 13, 2015

@davies I only left a few minor comments. Overall, it is very cool!

@hvanhovell
Copy link
Contributor

@yhuai I think having the two clearly separated paths (this PR) is an improvement of the current situation. I also admit that I am responsible for introducing the second path. Your comment on having four aggregate steps without the exhange triggered me, and I was thinking out loud on how we could do this using the rewriting rule (the removal of one of the paths would have been a bonus).

i += 1
val joinedRow = new JoinedRow
if (aggregateExpressions.nonEmpty) {
val mergeExpressions = aggregateFunctions.zipWithIndex.flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zip(aggregateExpressions) ?

@SparkQA
Copy link

SparkQA commented Dec 13, 2015

Test build #47622 has finished for PR 10228 at commit 51ca055.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Dec 13, 2015

@hvanhovell Yea, it's great to think about how to use a single rule to handle aggregation queries with distinct after we have this improvement. The logical rewriter rules probably is a good place because rewriting logical plans is easier. If it is the right approach, we can make some changes to our physical planner to make it respect the aggregation mode of an agg expression in a logical agg operator (right now, our physical planner always ignore the mode). So, when we create physical plan, we can understand that, for example, a logical agg operator is used to merge aggregation buffers.

@yhuai
Copy link
Contributor

yhuai commented Dec 14, 2015

@hvanhovell How about we merge this first and we take a look at how to use a single rule to handle aggregation queries with distinct?

@hvanhovell
Copy link
Contributor

@yhuai LGTM. Yea, lets merge this one, I'll create a ticket for the distinct rules

@yhuai
Copy link
Contributor

yhuai commented Dec 14, 2015

Cool. I am merging this one to master.

@asfgit asfgit closed this in 834e714 Dec 14, 2015
mbautin pushed a commit to mbautin/spark that referenced this pull request Feb 1, 2016
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other
works better for high cardinality column (default one).

This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6).

For a query like `SELECT COUNT(DISTINCT a) FROM table` will be
```
AGG-4 (count distinct)
  Shuffle to a single reducer
    Partial-AGG-3 (count distinct, no grouping)
      Partial-AGG-2 (grouping on a)
        Shuffle by a
          Partial-AGG-1 (grouping on a)
```

This PR also includes large refactor for aggregation (reduce 500+ lines of code)

cc yhuai nongli marmbrus

Author: Davies Liu <davies@databricks.com>

Closes apache#10228 from davies/single_distinct.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants