Skip to content

[SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression #35975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ Below is a list of all the keywords in Spark SQL.
|NULL|reserved|non-reserved|reserved|
|NULLS|non-reserved|non-reserved|non-reserved|
|OF|non-reserved|non-reserved|reserved|
|OFFSET|reserved|non-reserved|reserved|
|ON|reserved|strict-non-reserved|reserved|
|ONLY|reserved|non-reserved|reserved|
|OPTION|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ NOT: 'NOT' | '!';
NULL: 'NULL';
NULLS: 'NULLS';
OF: 'OF';
OFFSET: 'OFFSET';
ON: 'ON';
ONLY: 'ONLY';
OPTION: 'OPTION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ queryOrganization
(SORT BY sort+=sortItem (COMMA sort+=sortItem)*)?
windowClause?
(LIMIT (ALL | limit=expression))?
(OFFSET offset=expression)?
;

multiInsertQueryBody
Expand Down Expand Up @@ -1450,6 +1451,7 @@ nonReserved
| NULL
| NULLS
| OF
| OFFSET
| ONLY
| OPTION
| OPTIONS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,34 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)

case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case LocalLimit(limitExpr, child) =>
checkLimitLikeClause("limit", limitExpr)
child match {
case Offset(offsetExpr, _) =>
val limit = limitExpr.eval().asInstanceOf[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the parser it looks like we accept any expression for the OFFSET, but here we call asInstanceOf[Int]. Can we have an explicit check that this expression has integer type with an appropriate error message if not, and an accompanying test case that covers it?

Copy link
Contributor Author

@beliefer beliefer Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, We check it with checkLimitLikeClause("offset", offsetExpr) on line 432.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. Will the checkLimitLikeClause execute before this asInstanceOf[Int] call here? If so, we are OK. Otherwise, we would receive an exception here, which might result in an confusing error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's OK

val offset = offsetExpr.eval().asInstanceOf[Int]
if (Int.MaxValue - limit < offset) {
failAnalysis(
s"""
|The sum of the LIMIT clause and the OFFSET clause must not be greater than
|the maximum 32-bit integer value (2,147,483,647),
|but found limit = $limit, offset = $offset.
|""".stripMargin.replace("\n", " "))
}
case _ =>
}

case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)

case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this check to checkOutermostOffset and rename it to checkOffsetOperator? It's better to have a central place to check offset position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))

case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
def ordinalNumber(i: Int): String = i match {
Expand Down Expand Up @@ -588,6 +612,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
}
checkCollectedMetrics(plan)
checkOutermostOffset(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -830,6 +855,21 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
check(plan)
}

/**
* Validate that the root node of query or subquery is [[Offset]].
*/
private def checkOutermostOffset(plan: LogicalPlan): Unit = {
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
case _ =>
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, could line 864 to line 871 be merged with line 855 to line 862? Do case Offset(offsetExpr, _) match somehow during plan.foreachUp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 864 to line 871 used to find the outermost node. line 855 to line 862 can't help do this.

}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ object UnsupportedOperationChecker extends Logging {
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
"output mode")

case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a unit test case to cover this error message?

Copy link
Contributor Author

@beliefer beliefer Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because L859 of CheckAnalysis.scala and Spark SQL not supports streaming yet, streaming cannot go there.


case Sort(_, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ package object dsl {

def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)

def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)

def join(
otherPlan: LogicalPlan,
joinType: JoinType = Inner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
CollapseWindow,
CombineFilters,
EliminateLimits,
RewriteOffsets,
CombineUnions,
// Constant folding and strength reduction
OptimizeRepartition,
Expand Down Expand Up @@ -1845,6 +1846,20 @@ object EliminateLimits extends Rule[LogicalPlan] {
}
}

/**
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combines two adjacent [[Offset]] operators into one

where does it happen?

* merging the expressions into one single expression. See [[Limit]] for more information
* about the difference between LocalLimit and GlobalLimit.
*/
object RewriteOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, Offset(oe, grandChild)) =>
GlobalLimitAndOffset(le, oe, grandChild)
case LocalLimit(le, Offset(oe, grandChild)) =>
Offset(oe, LocalLimit(Add(le, oe), grandChild))
}
}

/**
* Check if there any cartesian products between joins of any type in the optimized plan tree.
* Throw an error if a cartesian product is found without an explicit cross join specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
case _: Sort => empty(p)
case _: GlobalLimit if !p.isStreaming => empty(p)
case _: LocalLimit if !p.isStreaming => empty(p)
case _: Offset => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
case _: RebalancePartitions => empty(p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case _: Sample => true
case _: GlobalLimit => true
case _: LocalLimit => true
case _: Offset => true
case _: Generate => true
case _: Distinct => true
case _: AppendColumns => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,16 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val withOffset = withWindow.optional(offset) {
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withWindow.optional(limit) {
Limit(typedVisit(limit), withWindow)
withOffset.optional(limit) {
Limit(typedVisit(limit), withOffset)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
}
}

override def visitOffset(p: Offset): Set[ExpressionSet] = {
p.maxRows match {
case Some(value) if value <= 1 => p.output.map(attr => ExpressionSet(Seq(attr))).toSet
case _ => p.child.distinctKeys
}
}

override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
if (!p.isAll) Set(ExpressionSet(p.output)) else default(p)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait LogicalPlanVisitor[T] {
case p: Filter => visitFilter(p)
case p: Generate => visitGenerate(p)
case p: GlobalLimit => visitGlobalLimit(p)
case p: Offset => visitOffset(p)
case p: Intersect => visitIntersect(p)
case p: Join => visitJoin(p)
case p: LocalLimit => visitLocalLimit(p)
Expand Down Expand Up @@ -64,6 +65,8 @@ trait LogicalPlanVisitor[T] {

def visitGlobalLimit(p: GlobalLimit): T

def visitOffset(p: Offset): T

def visitIntersect(p: Intersect): T

def visitJoin(p: Join): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,23 @@ case class Expand(
copy(child = newChild)
}

/**
* A logical offset, which may removing a specified number of rows from the beginning of the
* output of child logical plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we mention in this comment:

  1. Whether an accompanying LIMIT is also required, or the OFFSET is allowed to stand alone
  2. The semantics of which rows get skipped or preserved in the presence of an accompanying LIMIT. For example, if we have LIMIT 10 OFFSET 5, do we (a) impose a total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining?
  3. What are the boundary conditions, e.g. min and max allowed values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. is good but have described in CheckAnalysis.scala.
  2. Offset is a logical node. The information you mentioned is a physical behavior.
  3. The boundary conditions is also mentioned in CheckAnalysis.scala.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, the logical Offset just removes the first N rows. When we combine it with a Limit in the physical plan, then we can think about these semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The comments about these semantics already added to GlobalLimitAndOffset.

*/
case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
import scala.math.max
offsetExpr match {
case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) }
case _ => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect the offset expression to always be an integer literal by this point, enforced by the parser? So we can drop this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But this is good for Scala's pattern match.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can turn this into an assertion, but I'm fine with what it is as the code is the same in Limit operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
}
override protected def withNewChildInternal(newChild: LogicalPlan): Offset =
copy(child = newChild)
}

/**
* A constructor for creating a pivot, which will later be converted to a [[Project]]
* or an [[Aggregate]] during the query analysis.
Expand Down Expand Up @@ -1256,6 +1273,26 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
copy(child = newChild)
}

/**
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
* emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a
* total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining.
*/
case class GlobalLimitAndOffset(
limitExpr: Expression,
offsetExpr: Expression,
child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
case _ => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, if we enforce that the child is an integer literal by the parser, we can drop this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But this is good for Scala's pattern match.

}
}
override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset =
copy(child = newChild)
}

/**
* This is similar with [[Limit]] except:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p)

override def visitOffset(p: Offset): Statistics = fallback(p)

override def visitIntersect(p: Intersect): Statistics = {
val leftStats = p.left.stats
val rightStats = p.right.stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
rowCount = Some(rowCount))
}

override def visitOffset(p: Offset): Statistics = {
val offset = p.offsetExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
val rowCount: BigInt = childStats.rowCount.map(c => c - offset).map(_.max(0)).getOrElse(0)
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
rowCount = Some(rowCount))
}

override def visitIntersect(p: Intersect): Statistics = {
val leftSize = p.left.stats.sizeInBytes
val rightSize = p.right.stats.sizeInBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

private[sql] case class GroupableData(data: Int) {
def getData: Int = data
Expand Down Expand Up @@ -531,6 +532,52 @@ class AnalysisErrorSuite extends AnalysisTest {
"The limit expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"an evaluated offset class must not be string",
testRelation.offset(Literal(UTF8String.fromString("abc"), StringType)),
"The offset expression must be integer type, but got string" :: Nil
)

errorTest(
"an evaluated offset class must not be long",
testRelation.offset(Literal(10L, LongType)),
Copy link
Contributor

@amaliujia amaliujia Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way for SQL to specify if a number is long, thus we hit this error message?

Like LIMIT 1 in which 1 is integer while LIMIT 1L in which 1L is long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has nothing to do with it.
The cost of Limit or Offset is decided by its value. We shouldn't use long value.

"The offset expression must be integer type, but got bigint" :: Nil
)

errorTest(
"an evaluated offset class must not be null",
testRelation.offset(Literal(null, IntegerType)),
"The evaluated offset expression must not be null, but got " :: Nil
)

errorTest(
"num_rows in offset clause must be equal to or greater than 0",
listRelation.offset(-1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add test cases where:

  1. the OFFSET expression is not an integer value, e.g. "abc"
  2. the OFFSET expression is a long integer value
  3. the OFFSET expression is a constant but non-literal value, e.g. CASTing the current date to an integer, or some integer-valued UDF

Copy link
Contributor Author

@beliefer beliefer Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 and 2 are OK.
3. current_date is foldable and UDF is not available in catalyst. In fact, postgreSQL/limit.sql already includes this test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
listRelation.offset(-1),
testRelation.offset(-1),

"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause is found to be the outermost node." :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the second half of error message is a bit confusing.

I would guess it tries to say that the OFFSET is found to be used without a LIMIT (it is phrased as it is be the outermost node)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are different. users could call relation.limit().offset().

)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause found in: Filter." :: Nil
)

errorTest(
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),
"The sum of the LIMIT clause and the OFFSET clause must not be greater than" +
" the maximum 32-bit integer value (2,147,483,647)," +
" but found limit = 1000000000, offset = 2000000000." :: Nil
)

errorTest(
"more than one generators in SELECT",
listRelation.select(Explode($"list"), Explode($"list")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ class DistinctKeyVisitorSuite extends PlanTest {
Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b)), ExpressionSet(Seq(c))))
}

test("Offset's distinct attributes") {
checkDistinctAttributes(Distinct(t1).limit(12).offset(10).limit(10),
Set(ExpressionSet(Seq(a, b, c))))
checkDistinctAttributes(LocalLimit(10, Offset(10, LocalLimit(12, Distinct(t1)))),
Set(ExpressionSet(Seq(a, b, c))))
checkDistinctAttributes(t1.offset(1).limit(1),
Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b)), ExpressionSet(Seq(c))))
}

test("Intersect's distinct attributes") {
checkDistinctAttributes(Intersect(t1, t2, false), Set(ExpressionSet(Seq(a, b, c))))
checkDistinctAttributes(Intersect(t1, t2, true), Set.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = windowsStats)
}

test("offset estimation: offset < child's rowCount") {
val offset = Offset(Literal(2), plan)
checkStats(offset, Statistics(sizeInBytes = 96, rowCount = Some(8)))
}

test("offset estimation: offset > child's rowCount") {
val offset = Offset(Literal(20), plan)
checkStats(offset, Statistics(sizeInBytes = 1, rowCount = Some(0)))
}

test("offset estimation: offset = 0") {
val offset = Offset(Literal(0), plan)
// Offset is equal to zero, so Offset's stats is equal to its child's stats.
checkStats(offset, plan.stats.copy(attributeStats = AttributeMap(Nil)))
}

test("limit estimation: limit < child's rowCount") {
val localLimit = LocalLimit(Literal(2), plan)
val globalLimit = GlobalLimit(Literal(2), plan)
Expand Down
Loading