Skip to content

Commit b5afff5

Browse files
committed
[SPARK-26138][SQL] Pushdown limit through InnerLike when condition is empty
### What changes were proposed in this pull request? This pr pushdown limit through InnerLike when condition is empty(Origin pr: #23104). For example: ```sql CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b FROM range(2); CREATE TABLE t2 using parquet AS SELECT id AS d FROM range(2); SELECT * FROM t1 CROSS JOIN t2 LIMIT 10; ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- CollectLimit 10 +- BroadcastNestedLoopJoin BuildRight, Cross :- FileScan parquet default.t1[a#5L,b#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- BroadcastExchange IdentityBroadcastMode, [id=#43] +- FileScan parquet default.t2[d#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- CollectLimit 10 +- BroadcastNestedLoopJoin BuildRight, Cross :- LocalLimit 10 : +- FileScan parquet default.t1[a#5L,b#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- BroadcastExchange IdentityBroadcastMode, [id=#51] +- LocalLimit 10 +- FileScan parquet default.t2[d#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<d:bigint> ``` ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #31567 from wangyum/SPARK-26138. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
1 parent 2e31e2c commit b5afff5

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -539,17 +539,22 @@ object LimitPushDown extends Rule[LogicalPlan] {
539539
// pushdown Limit.
540540
case LocalLimit(exp, u: Union) =>
541541
LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))
542-
// Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
543-
// the left and right sides, respectively. It's not safe to push limits below FULL OUTER
544-
// JOIN in the general case without a more invasive rewrite.
542+
// Add extra limits below JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
543+
// the left and right sides, respectively. For INNER and CROSS JOIN we push limits to
544+
// both the left and right sides if join condition is empty. It's not safe to push limits
545+
// below FULL OUTER JOIN in the general case without a more invasive rewrite.
545546
// We also need to ensure that this limit pushdown rule will not eventually introduce limits
546547
// on both sides if it is applied multiple times. Therefore:
547548
// - If one side is already limited, stack another limit on top if the new limit is smaller.
548549
// The redundant limit will be collapsed by the CombineLimits rule.
549-
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
550+
case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) =>
550551
val newJoin = joinType match {
551552
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
552553
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
554+
case _: InnerLike if conditionOpt.isEmpty =>
555+
join.copy(
556+
left = maybePushLocalLimit(exp, left),
557+
right = maybePushLocalLimit(exp, right))
553558
case _ => join
554559
}
555560
LocalLimit(exp, newJoin)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2222
import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.dsl.plans._
2424
import org.apache.spark.sql.catalyst.expressions.Add
25-
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, PlanTest, RightOuter}
25+
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, PlanTest, RightOuter}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules._
2828

@@ -194,4 +194,22 @@ class LimitPushdownSuite extends PlanTest {
194194
LocalLimit(1, y.groupBy(Symbol("b"))(count(1))))).analyze
195195
comparePlans(expected2, optimized2)
196196
}
197+
198+
test("SPARK-26138: pushdown limit through InnerLike when condition is empty") {
199+
Seq(Cross, Inner).foreach { joinType =>
200+
val originalQuery = x.join(y, joinType).limit(1)
201+
val optimized = Optimize.execute(originalQuery.analyze)
202+
val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), joinType)).analyze
203+
comparePlans(optimized, correctAnswer)
204+
}
205+
}
206+
207+
test("SPARK-26138: Should not pushdown limit through InnerLike when condition is not empty") {
208+
Seq(Cross, Inner).foreach { joinType =>
209+
val originalQuery = x.join(y, joinType, Some("x.a".attr === "y.b".attr)).limit(1)
210+
val optimized = Optimize.execute(originalQuery.analyze)
211+
val correctAnswer = Limit(1, x.join(y, joinType, Some("x.a".attr === "y.b".attr))).analyze
212+
comparePlans(optimized, correctAnswer)
213+
}
214+
}
197215
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2929
import org.apache.spark.sql.catalyst.expressions.GenericRow
3030
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
3131
import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite}
32-
import org.apache.spark.sql.catalyst.plans.logical.{Project, RepartitionByExpression}
32+
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression}
3333
import org.apache.spark.sql.catalyst.util.StringUtils
3434
import org.apache.spark.sql.execution.UnionExec
3535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3636
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
3737
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
3838
import org.apache.spark.sql.execution.command.FunctionsCommand
39-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
39+
import org.apache.spark.sql.execution.datasources.{LogicalRelation, SchemaColumnConvertNotSupportedException}
4040
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
4141
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
4242
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -4021,6 +4021,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
40214021
}
40224022
}
40234023
}
4024+
4025+
test("SPARK-26138 Pushdown limit through InnerLike when condition is empty") {
4026+
withTable("t1", "t2") {
4027+
spark.range(5).repartition(1).write.saveAsTable("t1")
4028+
spark.range(5).repartition(1).write.saveAsTable("t2")
4029+
val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 LIMIT 3")
4030+
val pushedLocalLimits = df.queryExecution.optimizedPlan.collect {
4031+
case l @ LocalLimit(_, _: LogicalRelation) => l
4032+
}
4033+
assert(pushedLocalLimits.length === 2)
4034+
checkAnswer(df, Row(0, 0) :: Row(0, 1) :: Row(0, 2) :: Nil)
4035+
}
4036+
}
40244037
}
40254038

40264039
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)