Skip to content

[SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes #35779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Mar 9, 2022

What changes were proposed in this pull request?

  1. This pr add a new logical plan visitor named DistinctKeyVisitor to find out all the distinct attributes in current logical plan. For example:

    spark.sql("CREATE TABLE t(a int, b int, c int) using parquet")
    spark.sql("SELECT a, b, a % 10, max(c), sum(b) FROM t GROUP BY a, b").queryExecution.analyzed.distinctKeys

    The output is: {a#1, b#2}.

  2. Enhance RemoveRedundantAggregates to remove the aggregation if it is groupOnly and the child can guarantee distinct. For example:

    set spark.sql.autoBroadcastJoinThreshold=-1; -- avoid PushDownLeftSemiAntiJoin
    create table t1 using parquet as select id a, id as b from range(10);
    create table t2 using parquet as select id as a, id as b from range(8);
    select t11.a, t11.b from (select distinct a, b from t1) t11 left semi join t2 on (t11.a = t2.a) group by t11.a, t11.b;

    Before this PR:

    == Optimized Logical Plan ==
    Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
    +- Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
       :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
       :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
       :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
       +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
          +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
             +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
    

    After this PR:

    == Optimized Logical Plan ==
    Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
    :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
    :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
    :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
    +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
       +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
          +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
    

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and TPC-DS benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
q14a 206  193
q38 59 41
q87 127 113

@@ -47,6 +47,17 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
} else {
newAggregate
}

case agg @ Aggregate(groupingExps, _, child)
if agg.groupOnly && child.deterministic &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get why child.deterministic is required. If the child output is completely random, then it should not report any distinct keys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking twice, it is not required.

Project(agg.aggregateExpressions, child)

case agg @ Aggregate(groupingExps, aggregateExps, child)
if aggregateExps.forall(a => a.isInstanceOf[Alias] && a.children.forall(_.foldable)) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it just a.foldable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Alias is'n foldable, but the child is foldable.

private def projectDistinctKeys(
keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
val outputSet = ExpressionSet(projectList.map(_.toAttribute))
val distinctKeys = keys.filter(_.subsetOf(outputSet))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only do this filter in if (aliases.isEmpty). The distinct key can be a + b and the project list may have a + b AS c, then the result distinct key should be c.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then do the filter again in L49. It's incorrect to do this filter this early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))

override def visitExcept(p: Except): Set[ExpressionSet] =
if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why p.deterministic is required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

}

override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.


override def visitProject(p: Project): Set[ExpressionSet] = {
if (p.child.distinctKeys.nonEmpty) {
projectDistinctKeys(p.child.distinctKeys.map(ExpressionSet(_)), p.projectList)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't p.child.distinctKeys already a Set[ExpressionSet]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@@ -153,8 +150,14 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
comparePlans(optimized, expected)
}

test("Remove redundant aggregate - upper has contains foldable expressions") {
val originalQuery = x.groupBy('a, 'b)('a, 'b).groupBy('a)('a, TrueLiteral).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, how does this work? groupBy('a)('a, TrueLiteral) is not group only and not literal-only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveRedundantAggregates already supported before this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah this works because these two aggregates are adjacent. If they are not, we have a problem.

I'm thinking that we should refine Aggregate.groupOnly: .groupBy('a)('a, TrueLiteral) is also group only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#35795 to address this.

test("SPARK-36194: Negative case: Remove aggregation from contains non-deterministic") {
val query = relation
.groupBy('a)('a, (count('b) + rand(0)).as("cnt"))
.groupBy('a, 'cnt)('a, 'cnt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what can go wrong if we optimize this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optimize this case.

@@ -1,54 +1,51 @@
== Physical Plan ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you summarize the plan changes? do we have less shuffles now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q14a/a14b reduces 1 Exchange and 2 HashAggregates.
q38 reduces 3 Exchange and 4 HashAggregates.
q38 reduces 3 Exchange and 4 HashAggregates.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments

@cloud-fan
Copy link
Contributor

Do we see regressions in any TPCDS queries?

@wangyum
Copy link
Member Author

wangyum commented Mar 10, 2022

Do we see regressions in any TPCDS queries?

There is no regression.

if agg.groupOnly && child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
Project(agg.aggregateExpressions, child)

case agg @ Aggregate(groupingExps, aggregateExps, child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this case? agg.groupOnly should cover it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do not need it.

@wangyum wangyum closed this in c16a66a Mar 14, 2022
@wangyum
Copy link
Member Author

wangyum commented Mar 14, 2022

Merged to master.

@wangyum wangyum deleted the SPARK-36194 branch March 14, 2022 13:59
wangyum pushed a commit that referenced this pull request Apr 8, 2022
### What changes were proposed in this pull request?

This PR is a followup of #35779 , to propagate distinct keys more precisely in 2 cases:
1. For `LIMIT 1`, each output attribute is a distinct key, not the entire tuple.
2. For aggregate, we can still propagate distinct keys from child.

### Why are the changes needed?

make the optimization cover more cases

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #36100 from cloud-fan/followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Apr 8, 2022
### What changes were proposed in this pull request?

This PR is a followup of #35779 , to propagate distinct keys more precisely in 2 cases:
1. For `LIMIT 1`, each output attribute is a distinct key, not the entire tuple.
2. For aggregate, we can still propagate distinct keys from child.

### Why are the changes needed?

make the optimization cover more cases

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #36100 from cloud-fan/followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit fbe82fb)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants