Skip to content

Commit 7316cca

Browse files
committed
Support ansi offset clause
1 parent 6a32d83 commit 7316cca

File tree

27 files changed

+496
-139
lines changed

27 files changed

+496
-139
lines changed

docs/sql-keywords.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ Below is a list of all the keywords in Spark SQL.
184184
<tr><td>NULL</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
185185
<tr><td>NULLS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
186186
<tr><td>OF</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
187+
<tr><td>OFFSET</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
187188
<tr><td>ON</td><td>reserved</td><td>strict-non-reserved</td><td>reserved</td></tr>
188189
<tr><td>ONLY</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
189190
<tr><td>OPTION</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ queryOrganization
441441
(SORT BY sort+=sortItem (',' sort+=sortItem)*)?
442442
windowClause?
443443
(LIMIT (ALL | limit=expression))?
444+
(OFFSET offset=expression)?
444445
;
445446

446447
multiInsertQueryBody
@@ -1338,6 +1339,7 @@ nonReserved
13381339
| NULL
13391340
| NULLS
13401341
| OF
1342+
| OFFSET
13411343
| ONLY
13421344
| OPTION
13431345
| OPTIONS
@@ -1595,6 +1597,7 @@ NOT: 'NOT' | '!';
15951597
NULL: 'NULL';
15961598
NULLS: 'NULLS';
15971599
OF: 'OF';
1600+
OFFSET: 'OFFSET';
15981601
ON: 'ON';
15991602
ONLY: 'ONLY';
16001603
OPTION: 'OPTION';

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,12 +327,36 @@ trait CheckAnalysis extends PredicateHelper {
327327
}
328328
}
329329

330-
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
330+
case GlobalLimit(limitExpr, _, _) => checkLimitLikeClause("limit", limitExpr)
331+
332+
case LocalLimit(limitExpr, _, child) =>
333+
checkLimitLikeClause("limit", limitExpr)
334+
child match {
335+
case Offset(offsetExpr, _) =>
336+
val limit = limitExpr.eval().asInstanceOf[Int]
337+
val offset = offsetExpr.eval().asInstanceOf[Int]
338+
if (Int.MaxValue - limit < offset) {
339+
failAnalysis(
340+
s"""The sum of limit and offset must not be greater than Int.MaxValue,
341+
| but found limit = $limit, offset = $offset.""".stripMargin)
342+
}
343+
case _ =>
344+
}
331345

332-
case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
346+
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
333347

334348
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
335349

350+
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
351+
&& o.children.exists(_.isInstanceOf[Offset]) =>
352+
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
353+
failAnalysis(
354+
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
355+
| clause found in: ${o.nodeName}. If you know exactly that OFFSET clause
356+
| does not cause excessive overhead and still want to use it, set
357+
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
358+
}
359+
336360
case _: Union | _: SetOperation if operator.children.length > 1 =>
337361
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
338362
def ordinalNumber(i: Int): String = i match {
@@ -630,6 +654,7 @@ trait CheckAnalysis extends PredicateHelper {
630654
}
631655
}
632656
checkCollectedMetrics(plan)
657+
checkOutermostOffset(plan)
633658
extendedCheckRules.foreach(_(plan))
634659
plan.foreachUp {
635660
case o if !o.resolved =>
@@ -755,6 +780,24 @@ trait CheckAnalysis extends PredicateHelper {
755780
check(plan)
756781
}
757782

783+
/**
784+
* Validate that the root node of query or subquery is [[Offset]].
785+
*/
786+
private def checkOutermostOffset(plan: LogicalPlan): Unit = {
787+
plan match {
788+
case Offset(offsetExpr, _) =>
789+
checkLimitLikeClause("offset", offsetExpr)
790+
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
791+
failAnalysis(
792+
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
793+
| clause is found to be the outermost node. If you know exactly that OFFSET
794+
| clause does not cause excessive overhead and still want to use it, set
795+
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
796+
}
797+
case _ =>
798+
}
799+
}
800+
758801
/**
759802
* Validates to make sure the outer references appearing inside the subquery
760803
* are allowed.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,11 +357,13 @@ object UnsupportedOperationChecker extends Logging {
357357
case GroupingSets(_, _, child, _) if child.isStreaming =>
358358
throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
359359

360-
case GlobalLimit(_, _) | LocalLimit(_, _)
360+
case GlobalLimit(_, _, _) | LocalLimit(_, _, _)
361361
if subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>
362362
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
363363
"output mode")
364364

365+
case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")
366+
365367
case Sort(_, _, _) if !containsCompleteData(subPlan) =>
366368
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
367369
"aggregated DataFrame/Dataset in Complete output mode")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,9 @@ package object dsl {
324324

325325
def deserialize[T : Encoder]: LogicalPlan = CatalystSerde.deserialize[T](logicalPlan)
326326

327-
def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)
327+
def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, child = logicalPlan)
328+
329+
def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)
328330

329331
def join(
330332
otherPlan: LogicalPlan,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
8181
CollapseWindow,
8282
CombineFilters,
8383
CombineLimits,
84+
RewriteOffsets,
8485
CombineUnions,
8586
// Constant folding and strength reduction
8687
TransposeWindow,
@@ -451,21 +452,24 @@ object LimitPushDown extends Rule[LogicalPlan] {
451452

452453
private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
453454
plan match {
454-
case GlobalLimit(_, child) => child
455+
case GlobalLimit(_, _, child) => child
455456
case _ => plan
456457
}
457458
}
458459

459-
private def maybePushLocalLimit(limitExp: Expression, plan: LogicalPlan): LogicalPlan = {
460+
private def maybePushLocalLimit(
461+
limitExp: Expression,
462+
offsetExp: Expression,
463+
plan: LogicalPlan): LogicalPlan = {
460464
(limitExp, plan.maxRowsPerPartition) match {
461465
case (IntegerLiteral(newLimit), Some(childMaxRows)) if newLimit < childMaxRows =>
462466
// If the child has a cap on max rows per partition and the cap is larger than
463467
// the new limit, put a new LocalLimit there.
464-
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
468+
LocalLimit(limitExp, offsetExp, stripGlobalLimitIfPresent(plan))
465469

466470
case (_, None) =>
467471
// If the child has no cap, put the new LocalLimit.
468-
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
472+
LocalLimit(limitExp, offsetExp, stripGlobalLimitIfPresent(plan))
469473

470474
case _ =>
471475
// Otherwise, don't put a new LocalLimit.
@@ -480,22 +484,22 @@ object LimitPushDown extends Rule[LogicalPlan] {
480484
// Note: right now Union means UNION ALL, which does not de-duplicate rows, so it is safe to
481485
// pushdown Limit through it. Once we add UNION DISTINCT, however, we will not be able to
482486
// pushdown Limit.
483-
case LocalLimit(exp, Union(children)) =>
484-
LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _))))
487+
case LocalLimit(le, oe, Union(children)) =>
488+
LocalLimit(le, oe, Union(children.map(maybePushLocalLimit(le, oe, _))))
485489
// Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
486490
// the left and right sides, respectively. It's not safe to push limits below FULL OUTER
487491
// JOIN in the general case without a more invasive rewrite.
488492
// We also need to ensure that this limit pushdown rule will not eventually introduce limits
489493
// on both sides if it is applied multiple times. Therefore:
490494
// - If one side is already limited, stack another limit on top if the new limit is smaller.
491495
// The redundant limit will be collapsed by the CombineLimits rule.
492-
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
496+
case LocalLimit(le, oe, join @ Join(left, right, joinType, _, _)) =>
493497
val newJoin = joinType match {
494-
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
495-
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
498+
case RightOuter => join.copy(right = maybePushLocalLimit(le, oe, right))
499+
case LeftOuter => join.copy(left = maybePushLocalLimit(le, oe, left))
496500
case _ => join
497501
}
498-
LocalLimit(exp, newJoin)
502+
LocalLimit(le, oe, newJoin)
499503
}
500504
}
501505

@@ -710,11 +714,11 @@ object CollapseProject extends Rule[LogicalPlan] {
710714
agg.copy(aggregateExpressions = buildCleanedProjectList(
711715
p.projectList, agg.aggregateExpressions))
712716
}
713-
case Project(l1, g @ GlobalLimit(_, limit @ LocalLimit(_, p2 @ Project(l2, _))))
717+
case Project(l1, g @ GlobalLimit(_, _, limit @ LocalLimit(_, _, p2 @ Project(l2, _))))
714718
if isRenaming(l1, l2) =>
715719
val newProjectList = buildCleanedProjectList(l1, l2)
716720
g.copy(child = limit.copy(child = p2.copy(projectList = newProjectList)))
717-
case Project(l1, limit @ LocalLimit(_, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
721+
case Project(l1, limit @ LocalLimit(_, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
718722
val newProjectList = buildCleanedProjectList(l1, l2)
719723
limit.copy(child = p2.copy(projectList = newProjectList))
720724
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) =>
@@ -1382,12 +1386,29 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
13821386
*/
13831387
object CombineLimits extends Rule[LogicalPlan] {
13841388
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1385-
case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
1386-
GlobalLimit(Least(Seq(ne, le)), grandChild)
1387-
case LocalLimit(le, LocalLimit(ne, grandChild)) =>
1388-
LocalLimit(Least(Seq(ne, le)), grandChild)
1389-
case Limit(le, Limit(ne, grandChild)) =>
1390-
Limit(Least(Seq(ne, le)), grandChild)
1389+
case GlobalLimit(le, oe, GlobalLimit(nle, noe, grandChild)) =>
1390+
GlobalLimit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
1391+
case LocalLimit(le, oe, LocalLimit(nle, noe, grandChild)) =>
1392+
LocalLimit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
1393+
case Limit(le, oe, Limit(nle, noe, grandChild)) =>
1394+
Limit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
1395+
}
1396+
}
1397+
1398+
/**
1399+
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
1400+
* merging the expressions into one single expression.
1401+
*/
1402+
object RewriteOffsets extends Rule[LogicalPlan] {
1403+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1404+
case GlobalLimit(le, oe, Offset(noe, grandChild)) =>
1405+
GlobalLimit(le, Greatest(Seq(noe, oe)), grandChild)
1406+
case LocalLimit(le, oe, Offset(noe, grandChild)) =>
1407+
Offset(noe, LocalLimit(le, Greatest(Seq(noe, oe)), grandChild))
1408+
case Offset(oe, Offset(noe, grandChild)) =>
1409+
Offset(Greatest(Seq(noe, oe)), grandChild)
1410+
case Offset(oe @ Literal(v, IntegerType), child) =>
1411+
Limit(Limit.INVALID_LIMIT, oe, child)
13911412
}
13921413
}
13931414

@@ -1495,8 +1516,11 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
14951516
projection.initialize(0)
14961517
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)
14971518

1498-
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
1499-
LocalRelation(output, data.take(limit), isStreaming)
1519+
case Limit(
1520+
IntegerLiteral(limit),
1521+
IntegerLiteral(offset),
1522+
LocalRelation(output, data, isStreaming)) =>
1523+
LocalRelation(output, data.drop(offset).take(limit), isStreaming)
15001524

15011525
case Filter(condition, LocalRelation(output, data, isStreaming))
15021526
if !hasUnevaluableExpr(condition) =>
@@ -1779,7 +1803,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
17791803
// changes up the Logical Plan.
17801804
//
17811805
// Replace Global Limit 0 nodes with empty Local Relation
1782-
case gl @ GlobalLimit(IntegerLiteral(0), _) =>
1806+
case gl @ GlobalLimit(IntegerLiteral(0), _, _) =>
17831807
empty(gl)
17841808

17851809
// Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a
@@ -1788,7 +1812,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
17881812
// then the following rule will handle that case as well.
17891813
//
17901814
// Replace Local Limit 0 nodes with empty Local Relation
1791-
case ll @ LocalLimit(IntegerLiteral(0), _) =>
1815+
case ll @ LocalLimit(IntegerLiteral(0), _, _) =>
17921816
empty(ll)
17931817
}
17941818
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
8787
case _: Sort => empty(p)
8888
case _: GlobalLimit => empty(p)
8989
case _: LocalLimit => empty(p)
90+
case _: Offset => empty(p)
9091
case _: Repartition => empty(p)
9192
case _: RepartitionByExpression => empty(p)
9293
// An aggregate with non-empty group expression will return one output row per group when the

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
688688
case _: Sample => true
689689
case _: GlobalLimit => true
690690
case _: LocalLimit => true
691+
case _: Offset => true
691692
case _: Generate => true
692693
case _: Distinct => true
693694
case _: AppendColumns => true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
6363
case exists: Exists if exists.children.isEmpty =>
6464
IsNotNull(
6565
ScalarSubquery(
66-
plan = Limit(Literal(1), Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
66+
plan = Limit(Literal(1), child = Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
6767
exprId = exists.exprId))
6868
}
6969
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -553,10 +553,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
553553
// WINDOWS
554554
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)
555555

556+
// OFFSET
557+
// - OFFSET 0 is the same as omitting the OFFSET clause
558+
val withOffset = withWindow.optional(offset) {
559+
Offset(typedVisit(offset), withWindow)
560+
}
561+
556562
// LIMIT
557563
// - LIMIT ALL is the same as omitting the LIMIT clause
558-
withWindow.optional(limit) {
559-
Limit(typedVisit(limit), withWindow)
564+
withOffset.optional(limit) {
565+
Limit(typedVisit(limit), Literal(0), withOffset)
560566
}
561567
}
562568

@@ -1001,7 +1007,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
10011007

10021008
ctx.sampleMethod() match {
10031009
case ctx: SampleByRowsContext =>
1004-
Limit(expression(ctx.expression), query)
1010+
Limit(expression(ctx.expression), child = query)
10051011

10061012
case ctx: SampleByPercentileContext =>
10071013
val fraction = ctx.percentage.getText.toDouble

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ trait LogicalPlanVisitor[T] {
3030
case p: Filter => visitFilter(p)
3131
case p: Generate => visitGenerate(p)
3232
case p: GlobalLimit => visitGlobalLimit(p)
33+
case p: Offset => visitOffset(p)
3334
case p: Intersect => visitIntersect(p)
3435
case p: Join => visitJoin(p)
3536
case p: LocalLimit => visitLocalLimit(p)
@@ -60,6 +61,8 @@ trait LogicalPlanVisitor[T] {
6061

6162
def visitGlobalLimit(p: GlobalLimit): T
6263

64+
def visitOffset(p: Offset): T
65+
6366
def visitIntersect(p: Intersect): T
6467

6568
def visitJoin(p: Join): T

0 commit comments

Comments
 (0)