Skip to content

showcase, DO NOT MERGE #14876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

showcase, DO NOT MERGE #14876

wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

this is another approach to fix SPARK-12978, which was done in #10896

How was this patch tested?

new test in PlannerSuite

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64660 has finished for PR 14876 at commit a5b6d71.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 30, 2016

Thank you for your concrete example! I'll check in hours.

// Normal partial aggregate pair
case outer @ HashAggregateExec(_, _, _, _, _, _, inner: HashAggregateExec)
if outer.aggregateExpressions.forall(_.mode == Final) &&
inner.aggregateExpressions.forall(_.mode == Partial) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to have more strict conditions to make sure these two operators are for the same group by clause (Although I do not have a case showing this will break, it is better to list the condition in a more specific way).

@maropu
Copy link
Member

maropu commented Aug 31, 2016

I found that we need to push-down partial aggregation below exchange operators instead of merging them? For example, in the spark v2.0 branch,

(0 to 1000).map(x => (x % 2, x.toString)).toDF("a", "b").repartition($"a").createOrReplaceTempView("t")
spark.sql("select max(b) from t group by a").explain

This prints like:

== Physical Plan ==
SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#20])
+- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5,max#22])
   +- *Sort [a#5 ASC], false, 0
      +- Exchange hashpartitioning(a#5, 4)
         +- LocalTableScan [a#5, b#6]

In this case, I think it is more natural to push-down the partial aggregation below the exchange.

@maropu
Copy link
Member

maropu commented Aug 31, 2016

On the other hand, when caching the already-partitioned input table, we cannot push-down them;

(0 to 1000).map(x => (x % 2, x.toString)).toDF("a", "b").repartition($"a").cache.createOrReplaceTempView("t")
spark.sql("select max(b) from t group by a").explain
== Physical Plan ==
SortAggregate(key=[a#40], functions=[max(b#41)])
+- SortAggregate(key=[a#40], functions=[partial_max(b#41)])
   +- *Sort [a#40 ASC], false, 0
      +- InMemoryTableScan [a#40, b#41]
         :  +- InMemoryRelation [a#40, b#41], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         :     :  +- Exchange hashpartitioning(a#40, 4)
         :     :     +- LocalTableScan [a#40, b#41]

ISTM all we can do is merge the aggregations into one in this case.

@cloud-fan
Copy link
Contributor Author

yea, pushing down partial aggregate below exchange is a good idea, but I think it's out of the scope of SPARK-12978, which is aim to remove unnecessary partial aggregate right?

@maropu
Copy link
Member

maropu commented Aug 31, 2016

yea, I thinks so. I like the approach in this pr.

@cloud-fan
Copy link
Contributor Author

closing, @maropu will take over

@cloud-fan cloud-fan closed this Sep 1, 2016
asfgit pushed a commit that referenced this pull request Sep 1, 2016
## What changes were proposed in this pull request?

according to the discussion in the original PR #10896 and the new approach PR #14876 , we decided to revert these 2 PRs and go with the new approach.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14909 from cloud-fan/revert.
@cloud-fan cloud-fan deleted the agg branch December 14, 2016 12:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants