Skip to content

Commit 8bcee2f

Browse files
committed
Add GlobalLimitAndOffset
1 parent 1149727 commit 8bcee2f

File tree

16 files changed

+149
-122
lines changed

16 files changed

+149
-122
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,9 @@ trait CheckAnalysis extends PredicateHelper {
327327
}
328328
}
329329

330-
case GlobalLimit(limitExpr, _, _) => checkLimitLikeClause("limit", limitExpr)
330+
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
331331

332-
case LocalLimit(limitExpr, _, child) =>
332+
case LocalLimit(limitExpr, child) =>
333333
checkLimitLikeClause("limit", limitExpr)
334334
child match {
335335
case Offset(offsetExpr, _) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ object UnsupportedOperationChecker extends Logging {
357357
case GroupingSets(_, _, child, _) if child.isStreaming =>
358358
throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
359359

360-
case GlobalLimit(_, _, _) | LocalLimit(_, _, _)
360+
case GlobalLimit(_, _) | LocalLimit(_, _)
361361
if subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>
362362
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
363363
"output mode")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ package object dsl {
324324

325325
def deserialize[T : Encoder]: LogicalPlan = CatalystSerde.deserialize[T](logicalPlan)
326326

327-
def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, child = logicalPlan)
327+
def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)
328328

329329
def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)
330330

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -452,24 +452,21 @@ object LimitPushDown extends Rule[LogicalPlan] {
452452

453453
private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
454454
plan match {
455-
case GlobalLimit(_, _, child) => child
455+
case GlobalLimit(_, child) => child
456456
case _ => plan
457457
}
458458
}
459459

460-
private def maybePushLocalLimit(
461-
limitExp: Expression,
462-
offsetExp: Expression,
463-
plan: LogicalPlan): LogicalPlan = {
460+
private def maybePushLocalLimit(limitExp: Expression, plan: LogicalPlan): LogicalPlan = {
464461
(limitExp, plan.maxRowsPerPartition) match {
465462
case (IntegerLiteral(newLimit), Some(childMaxRows)) if newLimit < childMaxRows =>
466463
// If the child has a cap on max rows per partition and the cap is larger than
467464
// the new limit, put a new LocalLimit there.
468-
LocalLimit(limitExp, offsetExp, stripGlobalLimitIfPresent(plan))
465+
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
469466

470467
case (_, None) =>
471468
// If the child has no cap, put the new LocalLimit.
472-
LocalLimit(limitExp, offsetExp, stripGlobalLimitIfPresent(plan))
469+
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
473470

474471
case _ =>
475472
// Otherwise, don't put a new LocalLimit.
@@ -484,22 +481,22 @@ object LimitPushDown extends Rule[LogicalPlan] {
484481
// Note: right now Union means UNION ALL, which does not de-duplicate rows, so it is safe to
485482
// pushdown Limit through it. Once we add UNION DISTINCT, however, we will not be able to
486483
// pushdown Limit.
487-
case LocalLimit(le, oe, Union(children)) =>
488-
LocalLimit(le, oe, Union(children.map(maybePushLocalLimit(le, oe, _))))
484+
case LocalLimit(exp, Union(children)) =>
485+
LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _))))
489486
// Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
490487
// the left and right sides, respectively. It's not safe to push limits below FULL OUTER
491488
// JOIN in the general case without a more invasive rewrite.
492489
// We also need to ensure that this limit pushdown rule will not eventually introduce limits
493490
// on both sides if it is applied multiple times. Therefore:
494491
// - If one side is already limited, stack another limit on top if the new limit is smaller.
495492
// The redundant limit will be collapsed by the CombineLimits rule.
496-
case LocalLimit(le, oe, join @ Join(left, right, joinType, _, _)) =>
493+
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
497494
val newJoin = joinType match {
498-
case RightOuter => join.copy(right = maybePushLocalLimit(le, oe, right))
499-
case LeftOuter => join.copy(left = maybePushLocalLimit(le, oe, left))
495+
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
496+
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
500497
case _ => join
501498
}
502-
LocalLimit(le, oe, newJoin)
499+
LocalLimit(exp, newJoin)
503500
}
504501
}
505502

@@ -714,11 +711,11 @@ object CollapseProject extends Rule[LogicalPlan] {
714711
agg.copy(aggregateExpressions = buildCleanedProjectList(
715712
p.projectList, agg.aggregateExpressions))
716713
}
717-
case Project(l1, g @ GlobalLimit(_, _, limit @ LocalLimit(_, _, p2 @ Project(l2, _))))
714+
case Project(l1, g @ GlobalLimit(_, limit @ LocalLimit(_, p2 @ Project(l2, _))))
718715
if isRenaming(l1, l2) =>
719716
val newProjectList = buildCleanedProjectList(l1, l2)
720717
g.copy(child = limit.copy(child = p2.copy(projectList = newProjectList)))
721-
case Project(l1, limit @ LocalLimit(_, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
718+
case Project(l1, limit @ LocalLimit(_, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
722719
val newProjectList = buildCleanedProjectList(l1, l2)
723720
limit.copy(child = p2.copy(projectList = newProjectList))
724721
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) =>
@@ -1386,12 +1383,12 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
13861383
*/
13871384
object CombineLimits extends Rule[LogicalPlan] {
13881385
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1389-
case GlobalLimit(le, oe, GlobalLimit(nle, noe, grandChild)) =>
1390-
GlobalLimit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
1391-
case LocalLimit(le, oe, LocalLimit(nle, noe, grandChild)) =>
1392-
LocalLimit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
1393-
case Limit(le, oe, Limit(nle, noe, grandChild)) =>
1394-
Limit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
1386+
case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
1387+
GlobalLimit(Least(Seq(ne, le)), grandChild)
1388+
case LocalLimit(le, LocalLimit(ne, grandChild)) =>
1389+
LocalLimit(Least(Seq(ne, le)), grandChild)
1390+
case Limit(le, Limit(ne, grandChild)) =>
1391+
Limit(Least(Seq(ne, le)), grandChild)
13951392
}
13961393
}
13971394

@@ -1401,10 +1398,10 @@ object CombineLimits extends Rule[LogicalPlan] {
14011398
*/
14021399
object RewriteOffsets extends Rule[LogicalPlan] {
14031400
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1404-
case GlobalLimit(le, oe, Offset(noe, grandChild)) =>
1405-
GlobalLimit(le, Greatest(Seq(noe, oe)), grandChild)
1406-
case LocalLimit(le, oe, Offset(noe, grandChild)) =>
1407-
Offset(noe, LocalLimit(le, Greatest(Seq(noe, oe)), grandChild))
1401+
case GlobalLimit(le, Offset(oe, grandChild)) =>
1402+
GlobalLimitAndOffset(le, oe, grandChild)
1403+
case LocalLimit(le, Offset(oe, grandChild)) =>
1404+
Offset(oe, LocalLimit(Add(le, oe), grandChild))
14081405
}
14091406
}
14101407

@@ -1512,11 +1509,8 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
15121509
projection.initialize(0)
15131510
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)
15141511

1515-
case Limit(
1516-
IntegerLiteral(limit),
1517-
IntegerLiteral(offset),
1518-
LocalRelation(output, data, isStreaming)) =>
1519-
LocalRelation(output, data.drop(offset).take(limit), isStreaming)
1512+
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
1513+
LocalRelation(output, data.take(limit), isStreaming)
15201514

15211515
case Filter(condition, LocalRelation(output, data, isStreaming))
15221516
if !hasUnevaluableExpr(condition) =>
@@ -1799,7 +1793,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
17991793
// changes up the Logical Plan.
18001794
//
18011795
// Replace Global Limit 0 nodes with empty Local Relation
1802-
case gl @ GlobalLimit(IntegerLiteral(0), _, _) =>
1796+
case gl @ GlobalLimit(IntegerLiteral(0), _) =>
18031797
empty(gl)
18041798

18051799
// Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a
@@ -1808,7 +1802,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
18081802
// then the following rule will handle that case as well.
18091803
//
18101804
// Replace Local Limit 0 nodes with empty Local Relation
1811-
case ll @ LocalLimit(IntegerLiteral(0), _, _) =>
1805+
case ll @ LocalLimit(IntegerLiteral(0), _) =>
18121806
empty(ll)
18131807
}
18141808
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
6363
case exists: Exists if exists.children.isEmpty =>
6464
IsNotNull(
6565
ScalarSubquery(
66-
plan = Limit(Literal(1), child = Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
66+
plan = Limit(Literal(1), Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
6767
exprId = exists.exprId))
6868
}
6969
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
562562
// LIMIT
563563
// - LIMIT ALL is the same as omitting the LIMIT clause
564564
withOffset.optional(limit) {
565-
Limit(typedVisit(limit), Literal(0), withOffset)
565+
Limit(typedVisit(limit), withOffset)
566566
}
567567
}
568568

@@ -1007,7 +1007,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
10071007

10081008
ctx.sampleMethod() match {
10091009
case ctx: SampleByRowsContext =>
1010-
Limit(expression(ctx.expression), child = query)
1010+
Limit(expression(ctx.expression), query)
10111011

10121012
case ctx: SampleByPercentileContext =>
10131013
val fraction = ctx.percentage.getText.toDouble

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -797,32 +797,24 @@ case class Pivot(
797797
* So we introduced LocalLimit and GlobalLimit in the logical plan node for limit pushdown.
798798
*/
799799
object Limit {
800-
def apply(
801-
limitExpr: Expression,
802-
offsetExpr: Expression = Literal(0),
803-
child: LogicalPlan): UnaryNode = {
804-
GlobalLimit(limitExpr, offsetExpr, LocalLimit(limitExpr, offsetExpr, child))
800+
def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
801+
GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
805802
}
806803

807-
def unapply(p: GlobalLimit): Option[(Expression, Expression, LogicalPlan)] = {
804+
def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
808805
p match {
809-
case GlobalLimit(le1, oe1, LocalLimit(le2, oe2, child)) if le1 == le2 && oe1 == oe2 =>
810-
Some((le1, oe1, child))
806+
case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
811807
case _ => None
812808
}
813809
}
814810
}
815811

816812
/**
817-
* A global (coordinated) limit. This operator can remove at most `offsetExpr` number
818-
* and emit at most `limitExpr` number in total.
813+
* A global (coordinated) limit. This operator can emit at most `limitExpr` number in total.
819814
*
820815
* See [[Limit]] for more information.
821816
*/
822-
case class GlobalLimit(
823-
limitExpr: Expression,
824-
offsetExpr: Expression,
825-
child: LogicalPlan) extends OrderPreservingUnaryNode {
817+
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
826818
override def output: Seq[Attribute] = child.output
827819
override def maxRows: Option[Long] = {
828820
limitExpr match {
@@ -833,18 +825,28 @@ case class GlobalLimit(
833825
}
834826

835827
/**
836-
* A partition-local (non-coordinated) limit. This operator remove at most `offsetExpr`
837-
* number and emit at most `limitExpr` number of tuples on each physical partition.
828+
* A partition-local (non-coordinated) limit. This operator can emit at most `limitExpr` number
829+
* of tuples on each physical partition.
838830
*
839831
* See [[Limit]] for more information.
840832
*/
841-
case class LocalLimit(
833+
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
834+
override def output: Seq[Attribute] = child.output
835+
836+
override def maxRowsPerPartition: Option[Long] = {
837+
limitExpr match {
838+
case IntegerLiteral(limit) => Some(limit)
839+
case _ => None
840+
}
841+
}
842+
}
843+
844+
case class GlobalLimitAndOffset(
842845
limitExpr: Expression,
843846
offsetExpr: Expression,
844847
child: LogicalPlan) extends OrderPreservingUnaryNode {
845848
override def output: Seq[Attribute] = child.output
846-
847-
override def maxRowsPerPartition: Option[Long] = {
849+
override def maxRows: Option[Long] = {
848850
limitExpr match {
849851
case IntegerLiteral(limit) => Some(limit)
850852
case _ => None

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ class AnalysisErrorSuite extends AnalysisTest {
646646
val plan4 = Filter(
647647
Exists(
648648
Limit(1,
649-
child = Filter(EqualTo(UnresolvedAttribute("a"), b), LocalRelation(b)))
649+
Filter(EqualTo(UnresolvedAttribute("a"), b), LocalRelation(b)))
650650
),
651651
LocalRelation(a))
652652
assertAnalysisError(plan4, "Accessing outer query column is not allowed in" :: Nil)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ class CollapseProjectSuite extends PlanTest {
149149

150150
test("collapse redundant alias through local limit") {
151151
val relation = LocalRelation('a.int, 'b.int)
152-
val query = LocalLimit(1, 0, relation.select('a as 'b)).select('b as 'c).analyze
152+
val query = LocalLimit(1, relation.select('a as 'b)).select('b as 'c).analyze
153153
val optimized = Optimize.execute(query)
154-
val expected = LocalLimit(1, 0, relation.select('a as 'c)).analyze
154+
val expected = LocalLimit(1, relation.select('a as 'c)).analyze
155155
comparePlans(optimized, expected)
156156
}
157157

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,23 @@ class LimitPushdownSuite extends PlanTest {
4949
val unionQuery = Union(testRelation, testRelation2).limit(1)
5050
val unionOptimized = Optimize.execute(unionQuery.analyze)
5151
val unionCorrectAnswer =
52-
Limit(1, 0, Union(LocalLimit(1, 0, testRelation), LocalLimit(1, 0, testRelation2))).analyze
52+
Limit(1, Union(LocalLimit(1, testRelation), LocalLimit(1, testRelation2))).analyze
5353
comparePlans(unionOptimized, unionCorrectAnswer)
5454
}
5555

5656
test("Union: limit to each side with constant-foldable limit expressions") {
5757
val unionQuery = Union(testRelation, testRelation2).limit(Add(1, 1))
5858
val unionOptimized = Optimize.execute(unionQuery.analyze)
5959
val unionCorrectAnswer =
60-
Limit(2, 0, Union(LocalLimit(2, 0, testRelation), LocalLimit(2, 0, testRelation2))).analyze
60+
Limit(2, Union(LocalLimit(2, testRelation), LocalLimit(2, testRelation2))).analyze
6161
comparePlans(unionOptimized, unionCorrectAnswer)
6262
}
6363

6464
test("Union: limit to each side with the new limit number") {
6565
val unionQuery = Union(testRelation, testRelation2.limit(3)).limit(1)
6666
val unionOptimized = Optimize.execute(unionQuery.analyze)
6767
val unionCorrectAnswer =
68-
Limit(1, 0, Union(LocalLimit(1, 0, testRelation), LocalLimit(1, 0, testRelation2))).analyze
68+
Limit(1, Union(LocalLimit(1, testRelation), LocalLimit(1, testRelation2))).analyze
6969
comparePlans(unionOptimized, unionCorrectAnswer)
7070
}
7171

@@ -74,7 +74,7 @@ class LimitPushdownSuite extends PlanTest {
7474
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).limit(2)
7575
val unionOptimized = Optimize.execute(unionQuery.analyze)
7676
val unionCorrectAnswer =
77-
Limit(2, 0, Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1))).analyze
77+
Limit(2, Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1))).analyze
7878
comparePlans(unionOptimized, unionCorrectAnswer)
7979
}
8080

@@ -83,8 +83,8 @@ class LimitPushdownSuite extends PlanTest {
8383
Union(testRelation.limit(3), testRelation2.select('d, 'e, 'f).limit(4)).limit(2)
8484
val unionOptimized = Optimize.execute(unionQuery.analyze)
8585
val unionCorrectAnswer =
86-
Limit(2, 0, Union(
87-
LocalLimit(2, 0, testRelation), LocalLimit(2, 0, testRelation2.select('d, 'e, 'f)))).analyze
86+
Limit(2, Union(
87+
LocalLimit(2, testRelation), LocalLimit(2, testRelation2.select('d, 'e, 'f)))).analyze
8888
comparePlans(unionOptimized, unionCorrectAnswer)
8989
}
9090

@@ -93,49 +93,49 @@ class LimitPushdownSuite extends PlanTest {
9393
test("left outer join") {
9494
val originalQuery = x.join(y, LeftOuter).limit(1)
9595
val optimized = Optimize.execute(originalQuery.analyze)
96-
val correctAnswer = Limit(1, 0, LocalLimit(1, 0, x).join(y, LeftOuter)).analyze
96+
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze
9797
comparePlans(optimized, correctAnswer)
9898
}
9999

100100
test("left outer join and left sides are limited") {
101101
val originalQuery = x.limit(2).join(y, LeftOuter).limit(1)
102102
val optimized = Optimize.execute(originalQuery.analyze)
103-
val correctAnswer = Limit(1, 0, LocalLimit(1, 0, x).join(y, LeftOuter)).analyze
103+
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze
104104
comparePlans(optimized, correctAnswer)
105105
}
106106

107107
test("left outer join and right sides are limited") {
108108
val originalQuery = x.join(y.limit(2), LeftOuter).limit(1)
109109
val optimized = Optimize.execute(originalQuery.analyze)
110-
val correctAnswer = Limit(1, 0, LocalLimit(1, 0, x).join(Limit(2, 0, y), LeftOuter)).analyze
110+
val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), LeftOuter)).analyze
111111
comparePlans(optimized, correctAnswer)
112112
}
113113

114114
test("right outer join") {
115115
val originalQuery = x.join(y, RightOuter).limit(1)
116116
val optimized = Optimize.execute(originalQuery.analyze)
117-
val correctAnswer = Limit(1, 0, x.join(LocalLimit(1, 0, y), RightOuter)).analyze
117+
val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze
118118
comparePlans(optimized, correctAnswer)
119119
}
120120

121121
test("right outer join and right sides are limited") {
122122
val originalQuery = x.join(y.limit(2), RightOuter).limit(1)
123123
val optimized = Optimize.execute(originalQuery.analyze)
124-
val correctAnswer = Limit(1, 0, x.join(LocalLimit(1, 0, y), RightOuter)).analyze
124+
val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze
125125
comparePlans(optimized, correctAnswer)
126126
}
127127

128128
test("right outer join and left sides are limited") {
129129
val originalQuery = x.limit(2).join(y, RightOuter).limit(1)
130130
val optimized = Optimize.execute(originalQuery.analyze)
131-
val correctAnswer = Limit(1, 0, Limit(2, 0, x).join(LocalLimit(1, 0, y), RightOuter)).analyze
131+
val correctAnswer = Limit(1, Limit(2, x).join(LocalLimit(1, y), RightOuter)).analyze
132132
comparePlans(optimized, correctAnswer)
133133
}
134134

135135
test("larger limits are not pushed on top of smaller ones in right outer join") {
136136
val originalQuery = x.join(y.limit(5), RightOuter).limit(10)
137137
val optimized = Optimize.execute(originalQuery.analyze)
138-
val correctAnswer = Limit(10, 0, x.join(Limit(5, 0, y), RightOuter)).analyze
138+
val correctAnswer = Limit(10, x.join(Limit(5, y), RightOuter)).analyze
139139
comparePlans(optimized, correctAnswer)
140140
}
141141

0 commit comments

Comments
 (0)