Skip to content

Commit d3ea308

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-34081][SQL] Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join
### What changes were proposed in this pull request? Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. ```scala spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` ### Why are the changes needed? 1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance. 2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test. SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- q14a | 660 | 594 q14b | 660 | 600 q38 | 55 | 29 q87 | 66 | 35 Before this pr: ![image](https://user-images.githubusercontent.com/5399861/104452849-8789fc80-55de-11eb-88da-44059899f9a9.png) After this pr: ![image](https://user-images.githubusercontent.com/5399861/104452899-9a043600-55de-11eb-9286-d8f3a23ca3b8.png) Closes #31145 from wangyum/SPARK-34081. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 467d758 commit d3ea308

File tree

15 files changed

+3316
-3121
lines changed

15 files changed

+3316
-3121
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ import org.apache.spark.sql.catalyst.rules.Rule
3131
* 4) Aggregate
3232
* 5) Other permissible unary operators. please see [[PushPredicateThroughNonJoin.canPushThrough]].
3333
*/
34-
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
34+
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
35+
with PredicateHelper
36+
with JoinSelectionHelper {
3537
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
3638
// LeftSemi/LeftAnti over Project
3739
case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
@@ -51,10 +53,11 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
5153
p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
5254
}
5355

54-
// LeftSemi/LeftAnti over Aggregate
56+
// LeftSemi/LeftAnti over Aggregate, only push down if join can be planned as broadcast join.
5557
case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(_), _, _)
5658
if agg.aggregateExpressions.forall(_.deterministic) && agg.groupingExpressions.nonEmpty &&
57-
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
59+
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
60+
canPlanAsBroadcastHashJoin(join, conf) =>
5861
val aliasMap = getAliasMap(agg)
5962
val canPushDownPredicate = (predicate: Expression) => {
6063
val replaced = replaceAlias(predicate, aliasMap)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,13 @@ trait JoinSelectionHelper {
307307
}
308308
}
309309

310+
def canPlanAsBroadcastHashJoin(join: Join, conf: SQLConf): Boolean = {
311+
getBroadcastBuildSide(join.left, join.right, join.joinType,
312+
join.hint, hintOnly = true, conf).isDefined ||
313+
getBroadcastBuildSide(join.left, join.right, join.joinType,
314+
join.hint, hintOnly = false, conf).isDefined
315+
}
316+
310317
def hintToBroadcastLeft(hint: JoinHint): Boolean = {
311318
hint.leftHint.exists(_.strategy.contains(BROADCAST))
312319
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans._
2525
import org.apache.spark.sql.catalyst.plans.logical._
2626
import org.apache.spark.sql.catalyst.rules._
27+
import org.apache.spark.sql.internal.SQLConf
2728
import org.apache.spark.sql.types.IntegerType
2829

2930
class LeftSemiPushdownSuite extends PlanTest {
@@ -443,4 +444,28 @@ class LeftSemiPushdownSuite extends PlanTest {
443444
}
444445
}
445446

447+
Seq(LeftSemi, LeftAnti).foreach { jt =>
448+
test(s"SPARK-34081: $jt only push down if join can be planned as broadcast join") {
449+
Seq(-1, 100000).foreach { threshold =>
450+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> threshold.toString) {
451+
val originalQuery = testRelation
452+
.groupBy('b)('b)
453+
.join(testRelation1, joinType = jt, condition = Some('b <=> 'd))
454+
455+
val optimized = Optimize.execute(originalQuery.analyze)
456+
val correctAnswer = if (threshold > 0) {
457+
testRelation
458+
.join(testRelation1, joinType = jt, condition = Some('b <=> 'd))
459+
.groupBy('b)('b)
460+
.analyze
461+
} else {
462+
originalQuery.analyze
463+
}
464+
465+
comparePlans(optimized, correctAnswer)
466+
}
467+
}
468+
}
469+
}
470+
446471
}

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/explain.txt

Lines changed: 452 additions & 437 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/simplified.txt

Lines changed: 117 additions & 108 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt

Lines changed: 387 additions & 372 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt

Lines changed: 106 additions & 97 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/explain.txt

Lines changed: 239 additions & 219 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/simplified.txt

Lines changed: 113 additions & 101 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt

Lines changed: 239 additions & 219 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt

Lines changed: 113 additions & 101 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt

Lines changed: 387 additions & 372 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt

Lines changed: 106 additions & 97 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/explain.txt

Lines changed: 860 additions & 845 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/simplified.txt

Lines changed: 159 additions & 150 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)