Skip to content

[SPARK-39057][SQL] Offset could work without Limit #36417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is allowed in the LIMIT clause or be the outermost node,
|but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))

case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case _: Union | _: SetOperation if operator.children.length > 1 =>
Expand Down Expand Up @@ -608,7 +615,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
}
checkCollectedMetrics(plan)
checkOffsetOperator(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -851,30 +857,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
check(plan)
}

/**
* Validate whether the [[Offset]] is valid.
*/
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
plan.foreachUp {
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
case _ =>
}
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeWindowFunctions,
CollapseWindow,
CombineFilters,
EliminateOffsets,
EliminateLimits,
RewriteOffsets,
CombineUnions,
// Constant folding and strength reduction
OptimizeRepartition,
Expand Down Expand Up @@ -673,7 +673,7 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
}

/**
* Pushes down [[LocalLimit]] beneath UNION ALL and joins.
* Pushes down [[LocalLimit]] beneath UNION ALL, OFFSET and joins.
*/
object LimitPushDown extends Rule[LogicalPlan] {

Expand Down Expand Up @@ -750,6 +750,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly =>
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child))))
// Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset.
case LocalLimit(le, Offset(oe, grandChild)) =>
Offset(oe, LocalLimit(Add(le, oe), grandChild))
}
}

Expand Down Expand Up @@ -1871,21 +1874,22 @@ object EliminateLimits extends Rule[LogicalPlan] {
}

/**
* Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]],
* merging the expressions into one single expression. See [[Limit]] for more information
* about the difference between [[LocalLimit]] and [[GlobalLimit]].
* This rule optimizes Offset operators by:
* 1. Eliminate [[Offset]] operators if offset == 0.
* 2. Replace [[Offset]] operators to empty [[LocalRelation]]
* if [[Offset]]'s child max row <= offset.
* 3. Combines two adjacent [[Offset]] operators into one, merging the
* expressions into one single expression.
*/
object RewriteOffsets extends Rule[LogicalPlan] {
object EliminateOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, Offset(oe, grandChild)) =>
GlobalLimitAndOffset(le, oe, grandChild)
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) =>
val offset = oe.eval().asInstanceOf[Int]
if (offset == 0) {
localLimit.withNewChildren(Seq(grandChild))
} else {
Offset(oe, LocalLimit(Add(le, oe), grandChild))
}
case Offset(oe, child) if oe.foldable && oe.eval().asInstanceOf[Int] == 0 =>
child
case Offset(oe, child)
if oe.foldable && child.maxRows.exists(_ <= oe.eval().asInstanceOf[Int]) =>
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
case Offset(oe1, Offset(oe2, child)) =>
Offset(Add(oe1, oe2), child)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1304,36 +1304,16 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
}

object LimitAndOffset {
def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = {
def unapply(p: GlobalLimit): Option[(Expression, Expression, LogicalPlan)] = {
p match {
case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int]
case GlobalLimit(le1, Offset(le2, LocalLimit(le3, child))) if le1.eval().asInstanceOf[Int]
+ le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] =>
Some((le1, le2, child))
case _ => None
}
}
}

/**
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
* emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a
* total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining.
*/
case class GlobalLimitAndOffset(
limitExpr: Expression,
offsetExpr: Expression,
child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
case _ => None
}
}
override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset =
copy(child = newChild)
}

/**
* This is similar with [[Limit]] except:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,18 +556,11 @@ class AnalysisErrorSuite extends AnalysisTest {
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause is found to be the outermost node." :: Nil
)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause found in: Filter." :: Nil
"The OFFSET clause is allowed in the LIMIT clause or be the outermost node," +
" but the OFFSET clause found in: Filter." :: Nil
)

errorTest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Add, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class EliminateOffsetsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Eliminate Offset", FixedPoint(10), EliminateOffsets) :: Nil
}

val testRelation = LocalRelation.fromExternalRows(
Seq("a".attr.int, "b".attr.int, "c".attr.int),
1.to(10).map(_ => Row(1, 2, 3))
)

test("Offsets: eliminate Offset operators if offset == 0") {
val originalQuery =
testRelation
.select($"a")
.offset(0)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select($"a")
.analyze

comparePlans(optimized, correctAnswer)
}

test("Offsets: cannot eliminate Offset operators if offset > 0") {
val originalQuery =
testRelation
.select($"a")
.offset(2)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select($"a")
.offset(2)
.analyze

comparePlans(optimized, correctAnswer)
}

test("Replace Offset operators to empty LocalRelation if child max row <= offset") {
val child = testRelation.select($"a").analyze
val originalQuery = child.offset(10)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming).analyze

comparePlans(optimized, correctAnswer)
}

test("Cannot replace Offset operators to empty LocalRelation if child max row > offset") {
val child = testRelation.select($"a").analyze
val originalQuery = child.offset(3)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = originalQuery.analyze

comparePlans(optimized, correctAnswer)
}

test("Combines Offset operators") {
val child = testRelation.select($"a").analyze
val originalQuery = child.offset(2).offset(3)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = child.offset(Add(Literal(3), Literal(2))).analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,10 @@ class LimitPushdownSuite extends PlanTest {
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze),
x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze)
}

test("Push down limit 1 through Offset") {
comparePlans(
Optimize.execute(testRelation.offset(2).limit(1).analyze),
GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), Some(offset)) :: Nil
limit, order, child.output, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child), Some(offset)) :: Nil
limit, order, projectList, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil
CollectLimitExec(limit, planLater(child), offset) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
CollectLimitExec(child = planLater(child), offset = offset) :: Nil
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
Expand All @@ -116,20 +118,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// In this case we generate a physical top-K sorting operator, passing down
// the limit and offset values to be evaluated inline during the physical
// sorting operation for greater efficiency.
case LimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Sort(order, true, child))
if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), Some(offset)) :: Nil
case LimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), Some(offset)) :: Nil
case _ => Nil
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case _ =>
Nil
}
}

Expand Down Expand Up @@ -818,8 +818,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
case g @ logical.Generate(generator, _, outer, _, _, child) =>
Expand Down
Loading