-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes #35779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -47,6 +47,17 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { | |||
} else { | |||
newAggregate | |||
} | |||
|
|||
case agg @ Aggregate(groupingExps, _, child) | |||
if agg.groupOnly && child.deterministic && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't get why child.deterministic
is required. If the child output is completely random, then it should not report any distinct keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking twice, it is not required.
Project(agg.aggregateExpressions, child) | ||
|
||
case agg @ Aggregate(groupingExps, aggregateExps, child) | ||
if aggregateExps.forall(a => a.isInstanceOf[Alias] && a.children.forall(_.foldable)) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it just a.foldable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Alias
is'n foldable, but the child is foldable.
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
Show resolved
Hide resolved
private def projectDistinctKeys( | ||
keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = { | ||
val outputSet = ExpressionSet(projectList.map(_.toAttribute)) | ||
val distinctKeys = keys.filter(_.subsetOf(outputSet)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only do this filter in if (aliases.isEmpty)
. The distinct key can be a + b
and the project list may have a + b AS c
, then the result distinct key should be c
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then do the filter again in L49. It's incorrect to do this filter this early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output)) | ||
|
||
override def visitExcept(p: Except): Set[ExpressionSet] = | ||
if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why p.deterministic
is required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
} | ||
|
||
override def visitIntersect(p: Intersect): Set[ExpressionSet] = { | ||
if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
|
||
override def visitProject(p: Project): Set[ExpressionSet] = { | ||
if (p.child.distinctKeys.nonEmpty) { | ||
projectDistinctKeys(p.child.distinctKeys.map(ExpressionSet(_)), p.projectList) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't p.child.distinctKeys
already a Set[ExpressionSet]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -153,8 +150,14 @@ class RemoveRedundantAggregatesSuite extends PlanTest { | |||
comparePlans(optimized, expected) | |||
} | |||
|
|||
test("Remove redundant aggregate - upper has contains foldable expressions") { | |||
val originalQuery = x.groupBy('a, 'b)('a, 'b).groupBy('a)('a, TrueLiteral).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, how does this work? groupBy('a)('a, TrueLiteral)
is not group only and not literal-only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RemoveRedundantAggregates
already supported before this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah this works because these two aggregates are adjacent. If they are not, we have a problem.
I'm thinking that we should refine Aggregate.groupOnly
: .groupBy('a)('a, TrueLiteral)
is also group only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#35795 to address this.
test("SPARK-36194: Negative case: Remove aggregation from contains non-deterministic") { | ||
val query = relation | ||
.groupBy('a)('a, (count('b) + rand(0)).as("cnt")) | ||
.groupBy('a, 'cnt)('a, 'cnt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what can go wrong if we optimize this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can optimize this case.
@@ -1,54 +1,51 @@ | |||
== Physical Plan == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you summarize the plan changes? do we have less shuffles now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q14a/a14b reduces 1 Exchange and 2 HashAggregates.
q38 reduces 3 Exchange and 4 HashAggregates.
q38 reduces 3 Exchange and 4 HashAggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some minor comments
Do we see regressions in any TPCDS queries? |
…on has already been done on left side
There is no regression. |
if agg.groupOnly && child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) => | ||
Project(agg.aggregateExpressions, child) | ||
|
||
case agg @ Aggregate(groupingExps, aggregateExps, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this case? agg.groupOnly
should cover it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we do not need it.
Merged to master. |
### What changes were proposed in this pull request? This PR is a followup of #35779 , to propagate distinct keys more precisely in 2 cases: 1. For `LIMIT 1`, each output attribute is a distinct key, not the entire tuple. 2. For aggregate, we can still propagate distinct keys from child. ### Why are the changes needed? make the optimization cover more cases ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #36100 from cloud-fan/followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? This PR is a followup of #35779 , to propagate distinct keys more precisely in 2 cases: 1. For `LIMIT 1`, each output attribute is a distinct key, not the entire tuple. 2. For aggregate, we can still propagate distinct keys from child. ### Why are the changes needed? make the optimization cover more cases ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #36100 from cloud-fan/followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit fbe82fb) Signed-off-by: Yuming Wang <yumwang@ebay.com>
What changes were proposed in this pull request?
This pr add a new logical plan visitor named
DistinctKeyVisitor
to find out all the distinct attributes in current logical plan. For example:The output is: {a#1, b#2}.
Enhance
RemoveRedundantAggregates
to remove the aggregation if it is groupOnly and the child can guarantee distinct. For example:Before this PR:
After this PR:
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and TPC-DS benchmark test.