Skip to content

[SPARK-12978][SQL] Skip unnecessary final group-by when input data already clustered with group-by keys #10896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Jan 25, 2016

This ticket targets the optimization to skip an unnecessary group-by operation below;

Without opt.:

== Physical Plan ==
TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Final,isDistinct=false),(avg(col2#161),mode=Final,isDistinct=false)], output=[col0#159,sum(col1)#177,avg(col2)#178])
+- TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Partial,isDistinct=false),(avg(col2#161),mode=Partial,isDistinct=false)], output=[col0#159,sum#200,sum#201,count#202L])
   +- TungstenExchange hashpartitioning(col0#159,200), None
      +- InMemoryColumnarTableScan [col0#159,col1#160,col2#161], InMemoryRelation [col0#159,col1#160,col2#161], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None

With opt.:

== Physical Plan ==
TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Complete,isDistinct=false),(avg(col2#161),mode=Final,isDistinct=false)], output=[col0#159,sum(col1)#177,avg(col2)#178])
+- TungstenExchange hashpartitioning(col0#159,200), None
  +- InMemoryColumnarTableScan [col0#159,col1#160,col2#161], InMemoryRelation [col0#159,col1#160,col2#161], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None

@SparkQA
Copy link

SparkQA commented Jan 25, 2016

Test build #49985 has finished for PR 10896 at commit 5ab19c1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 25, 2016

Test build #49988 has finished for PR 10896 at commit 1b7e3d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Feb 2, 2016

@marmbrus Could you review this and give me suggestions?

@marmbrus
Copy link
Contributor

marmbrus commented Feb 2, 2016

@yhuai would be better to review this, but neither of those plans look great to me. Why are we not partial aggregating before a shuffle? Seems like that will ship a lot of data around for no reason.

@yhuai
Copy link
Contributor

yhuai commented Feb 3, 2016

I guess that exchange is added because there is a distribute by?

@maropu
Copy link
Member Author

maropu commented Feb 3, 2016

Yes, it is. The input query is;

val fields = Seq(StringType, DoubleType, DoubleType)
  .zipWithIndex.map { case (dataType, index) =>
    StructField(s"col$index", dataType, true)
  }

val df = sqlContext.createDataFrame(rdd, StructType(fields))
val df2 = df.repartition($"col0").cache
val df3 = df2.groupBy($"col0").agg(Map("col1"->"sum", "col2"->"avg"))
df3.explain(true)

@marmbrus
Copy link
Contributor

marmbrus commented Feb 3, 2016

Okay, but that code doesn't actually produce an exchange right? Since its captured by the cache?

== Optimized Logical Plan ==
Aggregate [col0#38918], [col0#38918,(sum(cast(col1#38919 as bigint)),mode=Complete,isDistinct=false) AS sum(col1)#38936L,(avg(cast(col2#38920 as bigint)),mode=Complete,isDistinct=false) AS avg(col2)#38937]
+- InMemoryRelation [col0#38918,col1#38919,col2#38920], true, 10000, StorageLevel(true, true, false, true, 1), Exchange hashpartitioning(col0#38918,200), None, None

== Physical Plan ==
WholeStageCodegen
:  +- TungstenAggregate(key=[col0#38918], functions=[(sum(cast(col1#38919 as bigint)),mode=Final,isDistinct=false),(avg(cast(col2#38920 as bigint)),mode=Final,isDistinct=false)], output=[col0#38918,sum(col1)#38936L,avg(col2)#38937])
:     +- TungstenAggregate(key=[col0#38918], functions=[(sum(cast(col1#38919 as bigint)),mode=Partial,isDistinct=false),(avg(cast(col2#38920 as bigint)),mode=Partial,isDistinct=false)], output=[col0#38918,sum#38959L,sum#38960,count#38961L])
:        +- INPUT
+- InMemoryColumnarTableScan [col0#38918,col1#38919,col2#38920], InMemoryRelation [col0#38918,col1#38919,col2#38920], true, 10000, StorageLevel(true, true, false, true, 1), Exchange hashpartitioning(col0#38918,200), None, None

Eitherway, I'll let @yhuai sign off on this.

@@ -86,20 +86,40 @@ object Utils {
aggregateExpressions: Seq[AggregateExpression],
aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), Attribute],
resultExpressions: Seq[NamedExpression],
skipUnnecessaryAggregate: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer if you called this partialAggregation. Its not unnecessary, its an optimization in most cases.

@maropu
Copy link
Member Author

maropu commented Feb 4, 2016

Ah, yes..., the code produces no exchange because of cache.

@maropu
Copy link
Member Author

maropu commented Feb 4, 2016

As @marmbrus said, we also need push down partial aggregation under an exchange;
The current Catalyst transforms

df.repartition($"col0").groupBy($"col0").agg(Map("col1"->"sum", "col2"->"avg")).explain(true)

into

== Physical Plan ==
TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Final,isDistinct=false),(avg(col2#161),mode=Final,isDistinct=false)], output=[col0#159,sum(col1)#177,avg(col2)#178])
+- TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Partial,isDistinct=false),(avg(col2#161),mode=Partial,isDistinct=false)], output=[col0#159,sum#200,sum#201,count#202L])
   +- TungstenExchange hashpartitioning(col0#159,200), None
      +- InMemoryColumnarTableScan [col0#159,col1#160,col2#161], InMemoryRelation [col0#159,col1#160,col2#161], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None

@maropu
Copy link
Member Author

maropu commented Feb 5, 2016

@yhuai ping

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50812 has finished for PR 10896 at commit 140da25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Feb 9, 2016

@yhuai ping

@yhuai
Copy link
Contributor

yhuai commented Feb 10, 2016

I probably will not be able to take a close look on this PR until later this month. I have a question regarding the approach of PR. Right now, we always plan partial aggregation operators first (in SparkStrategies) and then add Exchange operators (in EnsureRequirements). Another approach will be that we do not add partial aggregation operators in SparkStrategies. Then, after we figure out where we need exchange operators, we add partial aggregation operators. This approach probably needs more code changes. But, I feel it is a more cleaner approach.

@maropu @marmbrus what do you think?

@maropu
Copy link
Member Author

maropu commented Feb 15, 2016

@yhuai The second approach's good to me though, I'm not exactly sure how to remove unnecessary final aggregation covered in this pr. IMO these kinds of partial aggregation optimization seem to be similar to Filter optmization such as push-downs and pruning. So, it'd be better to get together these kinds of optimization in a same file. Even in the above example, we should push down the partial aggregation below TungstenExchange generated by DataFrame#repartition.

@maropu maropu force-pushed the SkipGroupbySpike branch from 140da25 to 9d77c90 Compare April 25, 2016 06:06
@SparkQA
Copy link

SparkQA commented Apr 25, 2016

Test build #56883 has finished for PR 10896 at commit 9d77c90.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2016

Test build #56884 has finished for PR 10896 at commit dcc51a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 20, 2016

cc @hvanhovell can you review this?

@maropu
Copy link
Member Author

maropu commented May 25, 2016

@hvanhovell ping

@hvanhovell
Copy link
Contributor

@maropu I'll take a look today. Is the description up-to-date?

@maropu
Copy link
Member Author

maropu commented May 25, 2016

@hvanhovell yeah, it is up-to-dated.

@@ -81,20 +81,38 @@ object Utils {
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
partialAggregation: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial aggregation IMO implies that we add a partial aggregation step. What do you think?

@hvanhovell
Copy link
Contributor

hvanhovell commented May 30, 2016

@maropu IIUC this is still the old approach instead of the approach @yhuai suggests. Do you feel up to see if his approach works? We could also do this in a follow-up.

I have left some minor comments, but in all this looks pretty good.

@@ -257,10 +257,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
planLater(child))
}
} else if (functionsWithDistinct.isEmpty) {
// Check if the child operator satisfies the group-by distribution requirements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move this code block into aggregate.Utils.planAggregateWithoutDistinct?

@maropu
Copy link
Member Author

maropu commented May 31, 2016

Thank for you comments! I'll check them in a few days.

@maropu maropu force-pushed the SkipGroupbySpike branch 2 times, most recently from 2b1bea6 to 36553bc Compare June 7, 2016 08:58

def unapply(plan: SparkPlan): Option[Distribution] = plan match {
case agg: AggregateExec
if agg.aggregateExpressions.map(_.aggregateFunction).forall(_.supportsPartial) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this in a function. This can be found a few times in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, okay.

@maropu
Copy link
Member Author

maropu commented Aug 24, 2016

okay, done

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64322 has finished for PR 10896 at commit 8a81e23.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64324 has finished for PR 10896 at commit d5e0ed3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2016

Test build #64388 has finished for PR 10896 at commit ac68145.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Aug 25, 2016

@hvanhovell could you also give me comments on #13852?

@hvanhovell
Copy link
Contributor

LGTM - merging to master. Thanks!

@asfgit asfgit closed this in 2b0cc4e Aug 25, 2016
@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 30, 2016

After this PR, we create the partial aggregate operator in EnsureRequirements, which makes the aggregation code harder to understand and also mess up EnsureRequirements.

I have a simpler idea: add a new rule which is run after EnsureRequirements. In this rule, we can combine adjacent partial aggregate and final aggregate into one.

cc @maropu @hvanhovell

* - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous)
* - PartialMerge (now there is at most 1 tuple per group)
* - StateStoreSave (saves the tuple for the next batch)
* - Complete (output the current result of the aggregation)
*
* If the first aggregation needs a shuffle to satisfy its distribution, a map-side partial
* an aggregation and a shuffle are added in `EnsureRequirements`.
*/
def planStreamingAggregation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we tested the streaming aggregation with the optimization?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a bit risky to touch this part.

@liancheng
Copy link
Contributor

+1 for @cloud-fan's proposal. Instead of creating a performant plan using tricky code, it's clearer to create a naive but correct physical plan first and then optimize it.

@hvanhovell
Copy link
Contributor

You could also argue the other way around, planning a partial aggregate is also a premature optimization, and that the planning of such an Aggregate could also be considered tricky code. BTW: the solution implemented in this PR was initially proposed by @yhuai.

I do think things could be simplified even more, and that either pruning an unneeded partial aggregate or planning one in a new rule both have merit.

@maropu
Copy link
Member Author

maropu commented Aug 30, 2016

@cloud-fan @liancheng yea, adding a new rule after EnsureRequirements sounds good to me. One question; creating a partial aggregation in the planner and removing it in the new rule seem to be kind of wasteful, so is it a bad idea to create a partial aggregation in the rule? Physical plans are always correct with/without partial aggregations and it is one of optimizations.

@cloud-fan
Copy link
Contributor

I agree that partial aggregate is also kind of optimization, and it's tricky to put it in planner. I think it makes sense to clean it up, after we have sufficient discussion and come to a consensus, but not finishing it within an optimization.

For this particular optimization, I think it's much simpler to add an extra rule to merge the partial and final aggregate, than spreading the aggregation stuff to EnsureRequirements.

cc @yhuai too

@maropu
Copy link
Member Author

maropu commented Aug 30, 2016

Sorry for my bad explanation. yes, I agree that we remove the aggregation stuff from EnsureRequirements for simpler codes. I'd say, how about moving the aggregation stuff (creating partial aggregations) into the extra rule after EnsureRequirements?

@yhuai
Copy link
Contributor

yhuai commented Aug 30, 2016

@maropu Thank you for working on this. Sorry that I did not get time to look at it after you updated the pr. I looked at it today. I think this optimization deserves a feature flag since it determines if we can generate a valid physical plan. We can enable it by default. But, we will have the flexibility to disable it when there is an issue.

After looking at the code, I am not sure it is a good approach to put the logic of adding partial aggregate operators in EnsureRequirements. Originally, I thought we could have a individual rule to add partial aggregate operators and then either extract logic in EnsureRequirements as a utility function or run EnsureRequirements again.

Also, due to the complexity of the logic for planning aggregations, seems after the change it is hard to track the planner logic.

So, seems it will be good to try your your original proposal by adding a rule to remove unnecessary operators (like @cloud-fan implemented in #14876). In this way, it will also be very easy to add the feature flag and keep the optimization rule in a single place. Later, we can revisit this approach if we can clean up the planner logic for aggregation. What do you think?

@maropu
Copy link
Member Author

maropu commented Aug 31, 2016

@yhuai Thanks your comment and I agree with you. We'll keep the discussion.

asfgit pushed a commit that referenced this pull request Sep 1, 2016
## What changes were proposed in this pull request?

according to the discussion in the original PR #10896 and the new approach PR #14876 , we decided to revert these 2 PRs and go with the new approach.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14909 from cloud-fan/revert.
@maropu maropu deleted the SkipGroupbySpike branch July 5, 2017 11:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants