Skip to content

Commit 58c3613

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-39057][SQL] Offset could work without Limit
### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: #35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes #36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 02f4522 commit 58c3613

File tree

14 files changed

+533
-146
lines changed

14 files changed

+533
-146
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
433433

434434
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
435435

436+
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
437+
&& o.children.exists(_.isInstanceOf[Offset]) =>
438+
failAnalysis(
439+
s"""
440+
|The OFFSET clause is allowed in the LIMIT clause or be the outermost node,
441+
|but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
442+
436443
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
437444

438445
case _: Union | _: SetOperation if operator.children.length > 1 =>
@@ -608,7 +615,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
608615
}
609616
}
610617
checkCollectedMetrics(plan)
611-
checkOffsetOperator(plan)
612618
extendedCheckRules.foreach(_(plan))
613619
plan.foreachUp {
614620
case o if !o.resolved =>
@@ -851,30 +857,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
851857
check(plan)
852858
}
853859

854-
/**
855-
* Validate whether the [[Offset]] is valid.
856-
*/
857-
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
858-
plan.foreachUp {
859-
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
860-
&& o.children.exists(_.isInstanceOf[Offset]) =>
861-
failAnalysis(
862-
s"""
863-
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
864-
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
865-
case _ =>
866-
}
867-
plan match {
868-
case Offset(offsetExpr, _) =>
869-
checkLimitLikeClause("offset", offsetExpr)
870-
failAnalysis(
871-
s"""
872-
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
873-
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
874-
case _ =>
875-
}
876-
}
877-
878860
/**
879861
* Validates to make sure the outer references appearing inside the subquery
880862
* are allowed.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
9595
OptimizeWindowFunctions,
9696
CollapseWindow,
9797
CombineFilters,
98+
EliminateOffsets,
9899
EliminateLimits,
99-
RewriteOffsets,
100100
CombineUnions,
101101
// Constant folding and strength reduction
102102
OptimizeRepartition,
@@ -673,7 +673,7 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
673673
}
674674

675675
/**
676-
* Pushes down [[LocalLimit]] beneath UNION ALL and joins.
676+
* Pushes down [[LocalLimit]] beneath UNION ALL, OFFSET and joins.
677677
*/
678678
object LimitPushDown extends Rule[LogicalPlan] {
679679

@@ -750,6 +750,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
750750
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
751751
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly =>
752752
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child))))
753+
// Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset.
754+
case LocalLimit(le, Offset(oe, grandChild)) =>
755+
Offset(oe, LocalLimit(Add(le, oe), grandChild))
753756
}
754757
}
755758

@@ -1871,21 +1874,22 @@ object EliminateLimits extends Rule[LogicalPlan] {
18711874
}
18721875

18731876
/**
1874-
* Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]],
1875-
* merging the expressions into one single expression. See [[Limit]] for more information
1876-
* about the difference between [[LocalLimit]] and [[GlobalLimit]].
1877+
* This rule optimizes Offset operators by:
1878+
* 1. Eliminate [[Offset]] operators if offset == 0.
1879+
* 2. Replace [[Offset]] operators to empty [[LocalRelation]]
1880+
* if [[Offset]]'s child max row <= offset.
1881+
* 3. Combines two adjacent [[Offset]] operators into one, merging the
1882+
* expressions into one single expression.
18771883
*/
1878-
object RewriteOffsets extends Rule[LogicalPlan] {
1884+
object EliminateOffsets extends Rule[LogicalPlan] {
18791885
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1880-
case GlobalLimit(le, Offset(oe, grandChild)) =>
1881-
GlobalLimitAndOffset(le, oe, grandChild)
1882-
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) =>
1883-
val offset = oe.eval().asInstanceOf[Int]
1884-
if (offset == 0) {
1885-
localLimit.withNewChildren(Seq(grandChild))
1886-
} else {
1887-
Offset(oe, LocalLimit(Add(le, oe), grandChild))
1888-
}
1886+
case Offset(oe, child) if oe.foldable && oe.eval().asInstanceOf[Int] == 0 =>
1887+
child
1888+
case Offset(oe, child)
1889+
if oe.foldable && child.maxRows.exists(_ <= oe.eval().asInstanceOf[Int]) =>
1890+
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
1891+
case Offset(oe1, Offset(oe2, child)) =>
1892+
Offset(Add(oe1, oe2), child)
18891893
}
18901894
}
18911895

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,36 +1304,16 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
13041304
}
13051305

13061306
object LimitAndOffset {
1307-
def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = {
1307+
def unapply(p: GlobalLimit): Option[(Expression, Expression, LogicalPlan)] = {
13081308
p match {
1309-
case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int]
1309+
case GlobalLimit(le1, Offset(le2, LocalLimit(le3, child))) if le1.eval().asInstanceOf[Int]
13101310
+ le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] =>
13111311
Some((le1, le2, child))
13121312
case _ => None
13131313
}
13141314
}
13151315
}
13161316

1317-
/**
1318-
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
1319-
* emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a
1320-
* total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining.
1321-
*/
1322-
case class GlobalLimitAndOffset(
1323-
limitExpr: Expression,
1324-
offsetExpr: Expression,
1325-
child: LogicalPlan) extends OrderPreservingUnaryNode {
1326-
override def output: Seq[Attribute] = child.output
1327-
override def maxRows: Option[Long] = {
1328-
limitExpr match {
1329-
case IntegerLiteral(limit) => Some(limit)
1330-
case _ => None
1331-
}
1332-
}
1333-
override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset =
1334-
copy(child = newChild)
1335-
}
1336-
13371317
/**
13381318
* This is similar with [[Limit]] except:
13391319
*

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -556,18 +556,11 @@ class AnalysisErrorSuite extends AnalysisTest {
556556
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
557557
)
558558

559-
errorTest(
560-
"OFFSET clause is outermost node",
561-
testRelation.offset(Literal(10, IntegerType)),
562-
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
563-
" clause is found to be the outermost node." :: Nil
564-
)
565-
566559
errorTest(
567560
"OFFSET clause in other node",
568561
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
569-
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
570-
" clause found in: Filter." :: Nil
562+
"The OFFSET clause is allowed in the LIMIT clause or be the outermost node," +
563+
" but the OFFSET clause found in: Filter." :: Nil
571564
)
572565

573566
errorTest(
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.dsl.expressions._
22+
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.expressions.{Add, Literal}
24+
import org.apache.spark.sql.catalyst.plans.PlanTest
25+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
26+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
27+
28+
class EliminateOffsetsSuite extends PlanTest {
29+
object Optimize extends RuleExecutor[LogicalPlan] {
30+
val batches =
31+
Batch("Eliminate Offset", FixedPoint(10), EliminateOffsets) :: Nil
32+
}
33+
34+
val testRelation = LocalRelation.fromExternalRows(
35+
Seq("a".attr.int, "b".attr.int, "c".attr.int),
36+
1.to(10).map(_ => Row(1, 2, 3))
37+
)
38+
39+
test("Offsets: eliminate Offset operators if offset == 0") {
40+
val originalQuery =
41+
testRelation
42+
.select($"a")
43+
.offset(0)
44+
45+
val optimized = Optimize.execute(originalQuery.analyze)
46+
val correctAnswer =
47+
testRelation
48+
.select($"a")
49+
.analyze
50+
51+
comparePlans(optimized, correctAnswer)
52+
}
53+
54+
test("Offsets: cannot eliminate Offset operators if offset > 0") {
55+
val originalQuery =
56+
testRelation
57+
.select($"a")
58+
.offset(2)
59+
60+
val optimized = Optimize.execute(originalQuery.analyze)
61+
val correctAnswer =
62+
testRelation
63+
.select($"a")
64+
.offset(2)
65+
.analyze
66+
67+
comparePlans(optimized, correctAnswer)
68+
}
69+
70+
test("Replace Offset operators to empty LocalRelation if child max row <= offset") {
71+
val child = testRelation.select($"a").analyze
72+
val originalQuery = child.offset(10)
73+
74+
val optimized = Optimize.execute(originalQuery.analyze)
75+
val correctAnswer =
76+
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming).analyze
77+
78+
comparePlans(optimized, correctAnswer)
79+
}
80+
81+
test("Cannot replace Offset operators to empty LocalRelation if child max row > offset") {
82+
val child = testRelation.select($"a").analyze
83+
val originalQuery = child.offset(3)
84+
85+
val optimized = Optimize.execute(originalQuery.analyze)
86+
val correctAnswer = originalQuery.analyze
87+
88+
comparePlans(optimized, correctAnswer)
89+
}
90+
91+
test("Combines Offset operators") {
92+
val child = testRelation.select($"a").analyze
93+
val originalQuery = child.offset(2).offset(3)
94+
95+
val optimized = Optimize.execute(originalQuery.analyze)
96+
val correctAnswer = child.offset(Add(Literal(3), Literal(2))).analyze
97+
98+
comparePlans(optimized, correctAnswer)
99+
}
100+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,10 @@ class LimitPushdownSuite extends PlanTest {
270270
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze),
271271
x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze)
272272
}
273+
274+
test("Push down limit 1 through Offset") {
275+
comparePlans(
276+
Optimize.execute(testRelation.offset(2).limit(1).analyze),
277+
GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze)
278+
}
273279
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,16 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
9393
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
9494
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
9595
TakeOrderedAndProjectExec(
96-
limit, order, child.output, planLater(child), Some(offset)) :: Nil
96+
limit, order, child.output, planLater(child), offset) :: Nil
9797
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
9898
Project(projectList, Sort(order, true, child)))
9999
if limit + offset < conf.topKSortFallbackThreshold =>
100100
TakeOrderedAndProjectExec(
101-
limit, order, projectList, planLater(child), Some(offset)) :: Nil
101+
limit, order, projectList, planLater(child), offset) :: Nil
102102
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
103-
CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil
103+
CollectLimitExec(limit, planLater(child), offset) :: Nil
104+
case logical.Offset(IntegerLiteral(offset), child) =>
105+
CollectLimitExec(child = planLater(child), offset = offset) :: Nil
104106
case Tail(IntegerLiteral(limit), child) =>
105107
CollectTailExec(limit, planLater(child)) :: Nil
106108
case other => planLater(other) :: Nil
@@ -116,20 +118,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
116118
// In this case we generate a physical top-K sorting operator, passing down
117119
// the limit and offset values to be evaluated inline during the physical
118120
// sorting operation for greater efficiency.
119-
case LimitAndOffset(
120-
IntegerLiteral(limit),
121-
IntegerLiteral(offset),
122-
Sort(order, true, child))
123-
if limit + offset < conf.topKSortFallbackThreshold =>
124-
TakeOrderedAndProjectExec(
125-
limit, order, child.output, planLater(child), Some(offset)) :: Nil
126-
case LimitAndOffset(
127-
IntegerLiteral(limit),
128-
IntegerLiteral(offset),
129-
Project(projectList, Sort(order, true, child)))
121+
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
122+
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
123+
TakeOrderedAndProjectExec(
124+
limit, order, child.output, planLater(child), offset) :: Nil
125+
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
126+
Project(projectList, Sort(order, true, child)))
130127
if limit + offset < conf.topKSortFallbackThreshold =>
131-
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), Some(offset)) :: Nil
132-
case _ => Nil
128+
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil
129+
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
130+
GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
131+
case _ =>
132+
Nil
133133
}
134134
}
135135

@@ -818,8 +818,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
818818
execution.LocalLimitExec(limit, planLater(child)) :: Nil
819819
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
820820
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
821-
case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
822-
execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
821+
case logical.Offset(IntegerLiteral(offset), child) =>
822+
GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil
823823
case union: logical.Union =>
824824
execution.UnionExec(union.children.map(planLater)) :: Nil
825825
case g @ logical.Generate(generator, _, outer, _, _, child) =>

0 commit comments

Comments
 (0)