-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression #35975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
f95a4b4
c50fbf7
1dbfc95
b6f07cc
936456b
bf1b4cf
6bb5b62
8551083
58287ab
beddfcd
0616e44
235c49a
8df2c62
038a50f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -411,10 +411,34 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |
|
||
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) | ||
|
||
case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) | ||
case LocalLimit(limitExpr, child) => | ||
checkLimitLikeClause("limit", limitExpr) | ||
child match { | ||
case Offset(offsetExpr, _) => | ||
val limit = limitExpr.eval().asInstanceOf[Int] | ||
val offset = offsetExpr.eval().asInstanceOf[Int] | ||
if (Int.MaxValue - limit < offset) { | ||
failAnalysis( | ||
s""" | ||
|The sum of the LIMIT clause and the OFFSET clause must not be greater than | ||
|the maximum 32-bit integer value (2,147,483,647), | ||
|but found limit = $limit, offset = $offset. | ||
|""".stripMargin.replace("\n", " ")) | ||
} | ||
case _ => | ||
} | ||
|
||
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) | ||
|
||
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) | ||
|
||
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this check to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
&& o.children.exists(_.isInstanceOf[Offset]) => | ||
failAnalysis( | ||
s""" | ||
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET | ||
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) | ||
|
||
case _: Union | _: SetOperation if operator.children.length > 1 => | ||
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType) | ||
def ordinalNumber(i: Int): String = i match { | ||
|
@@ -588,6 +612,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |
} | ||
} | ||
checkCollectedMetrics(plan) | ||
checkOutermostOffset(plan) | ||
extendedCheckRules.foreach(_(plan)) | ||
plan.foreachUp { | ||
case o if !o.resolved => | ||
|
@@ -830,6 +855,21 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |
check(plan) | ||
} | ||
|
||
/** | ||
* Validate that the root node of query or subquery is [[Offset]]. | ||
*/ | ||
private def checkOutermostOffset(plan: LogicalPlan): Unit = { | ||
plan match { | ||
case Offset(offsetExpr, _) => | ||
checkLimitLikeClause("offset", offsetExpr) | ||
failAnalysis( | ||
s""" | ||
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET | ||
|clause is found to be the outermost node.""".stripMargin.replace("\n", " ")) | ||
case _ => | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking, could line 864 to line 871 be merged with line 855 to line 862? Do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line 864 to line 871 used to find the outermost node. line 855 to line 862 can't help do this. |
||
} | ||
|
||
/** | ||
* Validates to make sure the outer references appearing inside the subquery | ||
* are allowed. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -386,6 +386,8 @@ object UnsupportedOperationChecker extends Logging { | |
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " + | ||
"output mode") | ||
|
||
case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a unit test case to cover this error message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because L859 of CheckAnalysis.scala and Spark SQL not supports streaming yet, streaming cannot go there. |
||
|
||
case Sort(_, _, _) if !containsCompleteData(subPlan) => | ||
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " + | ||
"aggregated DataFrame/Dataset in Complete output mode") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,7 @@ abstract class Optimizer(catalogManager: CatalogManager) | |
CollapseWindow, | ||
CombineFilters, | ||
EliminateLimits, | ||
RewriteOffsets, | ||
CombineUnions, | ||
// Constant folding and strength reduction | ||
OptimizeRepartition, | ||
|
@@ -1845,6 +1846,20 @@ object EliminateLimits extends Rule[LogicalPlan] { | |
} | ||
} | ||
|
||
/** | ||
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
where does it happen? |
||
* merging the expressions into one single expression. See [[Limit]] for more information | ||
* about the difference between LocalLimit and GlobalLimit. | ||
*/ | ||
object RewriteOffsets extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case GlobalLimit(le, Offset(oe, grandChild)) => | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
GlobalLimitAndOffset(le, oe, grandChild) | ||
case LocalLimit(le, Offset(oe, grandChild)) => | ||
Offset(oe, LocalLimit(Add(le, oe), grandChild)) | ||
} | ||
} | ||
|
||
/** | ||
* Check if there any cartesian products between joins of any type in the optimized plan tree. | ||
* Throw an error if a cartesian product is found without an explicit cross join specified. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1145,6 +1145,23 @@ case class Expand( | |
copy(child = newChild) | ||
} | ||
|
||
/** | ||
* A logical offset, which may removing a specified number of rows from the beginning of the | ||
* output of child logical plan. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we mention in this comment:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG, the logical Offset just removes the first N rows. When we combine it with a Limit in the physical plan, then we can think about these semantics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The comments about these semantics already added to |
||
*/ | ||
case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { | ||
override def output: Seq[Attribute] = child.output | ||
override def maxRows: Option[Long] = { | ||
import scala.math.max | ||
offsetExpr match { | ||
case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) } | ||
case _ => None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We expect the offset expression to always be an integer literal by this point, enforced by the parser? So we can drop this line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. But this is good for Scala's pattern match. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea we can turn this into an assertion, but I'm fine with what it is as the code is the same in Limit operator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
} | ||
} | ||
override protected def withNewChildInternal(newChild: LogicalPlan): Offset = | ||
copy(child = newChild) | ||
} | ||
|
||
/** | ||
* A constructor for creating a pivot, which will later be converted to a [[Project]] | ||
* or an [[Aggregate]] during the query analysis. | ||
|
@@ -1256,6 +1273,26 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr | |
copy(child = newChild) | ||
} | ||
|
||
/** | ||
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and | ||
* emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a | ||
* total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining. | ||
*/ | ||
case class GlobalLimitAndOffset( | ||
limitExpr: Expression, | ||
offsetExpr: Expression, | ||
child: LogicalPlan) extends OrderPreservingUnaryNode { | ||
override def output: Seq[Attribute] = child.output | ||
override def maxRows: Option[Long] = { | ||
limitExpr match { | ||
case IntegerLiteral(limit) => Some(limit) | ||
case _ => None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above, if we enforce that the child is an integer literal by the parser, we can drop this line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. But this is good for Scala's pattern match. |
||
} | ||
} | ||
override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset = | ||
copy(child = newChild) | ||
} | ||
|
||
/** | ||
* This is similar with [[Limit]] except: | ||
* | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ | |||||
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} | ||||||
import org.apache.spark.sql.internal.SQLConf | ||||||
import org.apache.spark.sql.types._ | ||||||
import org.apache.spark.unsafe.types.UTF8String | ||||||
|
||||||
private[sql] case class GroupableData(data: Int) { | ||||||
def getData: Int = data | ||||||
|
@@ -531,6 +532,52 @@ class AnalysisErrorSuite extends AnalysisTest { | |||||
"The limit expression must be equal to or greater than 0, but got -1" :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"an evaluated offset class must not be string", | ||||||
testRelation.offset(Literal(UTF8String.fromString("abc"), StringType)), | ||||||
"The offset expression must be integer type, but got string" :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"an evaluated offset class must not be long", | ||||||
testRelation.offset(Literal(10L, LongType)), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way for SQL to specify if a number is long, thus we hit this error message? Like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has nothing to do with it. |
||||||
"The offset expression must be integer type, but got bigint" :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"an evaluated offset class must not be null", | ||||||
testRelation.offset(Literal(null, IntegerType)), | ||||||
"The evaluated offset expression must not be null, but got " :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"num_rows in offset clause must be equal to or greater than 0", | ||||||
listRelation.offset(-1), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also add test cases where:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 and 2 are OK. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
"The offset expression must be equal to or greater than 0, but got -1" :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"OFFSET clause is outermost node", | ||||||
testRelation.offset(Literal(10, IntegerType)), | ||||||
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + | ||||||
" clause is found to be the outermost node." :: Nil | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the second half of error message is a bit confusing. I would guess it tries to say that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are different. users could call |
||||||
) | ||||||
|
||||||
errorTest( | ||||||
"OFFSET clause in other node", | ||||||
testRelation2.offset(Literal(10, IntegerType)).where('b > 1), | ||||||
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + | ||||||
" clause found in: Filter." :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue", | ||||||
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)), | ||||||
"The sum of the LIMIT clause and the OFFSET clause must not be greater than" + | ||||||
" the maximum 32-bit integer value (2,147,483,647)," + | ||||||
" but found limit = 1000000000, offset = 2000000000." :: Nil | ||||||
) | ||||||
|
||||||
errorTest( | ||||||
"more than one generators in SELECT", | ||||||
listRelation.select(Explode($"list"), Explode($"list")), | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the parser it looks like we accept any expression for the OFFSET, but here we call
asInstanceOf[Int]
. Can we have an explicit check that this expression has integer type with an appropriate error message if not, and an accompanying test case that covers it?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, We check it with
checkLimitLikeClause("offset", offsetExpr)
on line 432.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG. Will the
checkLimitLikeClause
execute before thisasInstanceOf[Int]
call here? If so, we are OK. Otherwise, we would receive an exception here, which might result in an confusing error message?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's OK