-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When LIMIT/TABLESAMPLE is Non-foldable, Zero or Negative #14034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #61678 has finished for PR 14034 at commit
|
cc @cloud-fan @davies Thanks! |
s"number_rows:$numRows") | ||
} | ||
|
||
case l: LocalLimit => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about merging the two cases to case l @ (_: LocalLimit | _: GlobalLimit) =>
to remove the duplication (or at least introduce a local method).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think we can merge them, but, yeah, we can create a local function for it. Thank you!
Test build #61736 has finished for PR 14034 at commit
|
@@ -46,6 +46,15 @@ trait CheckAnalysis extends PredicateHelper { | |||
}).length > 1 | |||
} | |||
|
|||
private def checkLimitClause(limitExpr: Expression): Unit = { | |||
val numRows = limitExpr.eval().asInstanceOf[Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the limit expression guaranteed to be literal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. Users can input an expression here. For example,
spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
Line 234 in e5d703b
assertEqual(s"$sql limit cast(9 / 4 as int)", plan.limit(Cast(Literal(9) / 4, IntegerType))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, but it's still foldable. Is it possible it's non-foldable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Oracle: http://docs.oracle.com/javadb/10.5.3.0/ref/rrefsqljoffsetfetch.html
- DB2 z/OS: https://www.ibm.com/support/knowledgecenter/SSEPEK_10.0.0/com.ibm.db2z10.doc.sqlref/src/tpc/db2z_sql_fetchfirstclause.html
- MySQL: http://dev.mysql.com/doc/refman/5.7/en/select.html
- PostgreSQL: https://www.postgresql.org/docs/8.1/static/queries-limit.html
It sounds like nobody supports it. All of the mainstream DB vendors only support integer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not support non-foldable limit clauses.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Lines 67 to 89 in d063898
object SpecialLimits extends Strategy { | |
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | |
case logical.ReturnAnswer(rootPlan) => rootPlan match { | |
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => | |
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil | |
case logical.Limit( | |
IntegerLiteral(limit), | |
logical.Project(projectList, logical.Sort(order, true, child))) => | |
execution.TakeOrderedAndProjectExec( | |
limit, order, Some(projectList), planLater(child)) :: Nil | |
case logical.Limit(IntegerLiteral(limit), child) => | |
execution.CollectLimitExec(limit, planLater(child)) :: Nil | |
case other => planLater(other) :: Nil | |
} | |
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => | |
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil | |
case logical.Limit( | |
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => | |
execution.TakeOrderedAndProjectExec( | |
limit, order, Some(projectList), planLater(child)) :: Nil | |
case _ => Nil | |
} | |
} |
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Lines 398 to 401 in d063898
case logical.LocalLimit(IntegerLiteral(limit), child) => | |
execution.LocalLimitExec(limit, planLater(child)) :: Nil | |
case logical.GlobalLimit(IntegerLiteral(limit), child) => | |
execution.GlobalLimitExec(limit, planLater(child)) :: Nil |
But,,, we do not issue an exception if users do it. Thus, the error we got is strange:
assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
+- LocalLimit (_nondeterministic#202 > 0.2)
+- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
+- LogicalRDD [key#11, value#12]
java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
+- LocalLimit (_nondeterministic#202 > 0.2)
+- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
+- LogicalRDD [key#11, value#12]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do it in this PR. Thank you for your review! : )
Test build #61746 has finished for PR 14034 at commit
|
} | ||
|
||
assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") | ||
assert(sizes(0).equals(BigInt(96)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you equals
? Would ===
not work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just following what the other test cases did. Sure, we can change all of them. Let me do it. Thanks!
Test build #61761 has finished for PR 14034 at commit
|
ping @cloud-fan : ) |
@@ -660,7 +660,12 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN | |||
} | |||
override lazy val statistics: Statistics = { | |||
val limit = limitExpr.eval().asInstanceOf[Int] | |||
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum | |||
var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum | |||
if (sizeInBytes == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a special case for limit. How about we make it more clear? e.g.
val sizeInBytes = if (limit == 0) {
1
} else {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Let me fix it.
arrayData.collect().take(1).map(Row.fromTuple).toSeq) | ||
|
||
checkAnswer( | ||
sql("SELECT * FROM mapData LIMIT 1"), | ||
mapData.collect().take(1).map(Row.fromTuple).toSeq) | ||
} | ||
|
||
test("non-foldable expressions in LIMIT") { | ||
val e = intercept[AnalysisException] { | ||
sql("SELECT * FROM testData LIMIT key > 3") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what will happen if the type is wrong? e.g. LIMIT true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good question! : ) Now, the exception we issued is not good:
java.lang.Boolean cannot be cast to java.lang.Integer
java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
Let me fix it and throw a more reasonable exception:
number_rows in limit clause cannot be cast to integer:true;
case o: Int if o >= 0 => // OK | ||
case o: Int => failAnalysis( | ||
s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") | ||
case o => failAnalysis( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we allow other integral type? e.g. byte, long, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know, we can do it, but we need to fix the other parts. Let me try it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to do it in different ways, but the best way I found is to introduce an Analyzer rule.
We have one rule in Optimizer and one rule in Planner for Limit
. Both assume the limitExpr
is an Integer
type. It looks ugly to convert the type everywhere. To support different numeric types, we need to convert these foldable numeric values to int in Analyzer
, I think. Let me upload a fix at first. You can judge whether this is a good way or not. Thanks!
Test build #61963 has finished for PR 14034 at commit
|
Test build #61974 has finished for PR 14034 at commit
|
Test build #61973 has finished for PR 14034 at commit
|
Test build #61997 has finished for PR 14034 at commit
|
retest this please |
object ResolveLimits extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case g @ GlobalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => | ||
g.copy(limitExpr = Literal(limitExpr.eval().asInstanceOf[Number].intValue(), IntegerType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way is like
g.copy(limitExpr = Literal(Cast(limitExpr, IntegerType).eval(), IntegerType))
Not sure which one is better here.
Test build #62002 has finished for PR 14034 at commit
|
Test build #62006 has finished for PR 14034 at commit
|
@@ -2045,6 +2047,21 @@ object EliminateUnions extends Rule[LogicalPlan] { | |||
} | |||
|
|||
/** | |||
* Converts foldable numeric expressions to integers in [[GlobalLimit]] and [[LocalLimit]] operators | |||
*/ | |||
object ResolveLimits extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for trying this out! Yea looks like it's doable. Let's remove it first and do it in follow-ups, to make this PR surgical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can create a JIRA first and discuss with others to decide if it's useful to support all integral types in limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me revert it back. Thanks!
Test build #62027 has finished for PR 14034 at commit
|
case o: Int => failAnalysis( | ||
s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") | ||
case o => failAnalysis( | ||
s"""number_rows in limit clause cannot be cast to integer:\"$o\".""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cannot be cast to integer
-> must be integer
? e.g. byte is castable to int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
"Limit:(testdata.`key` > 3)")) | ||
} | ||
|
||
test("Limit: unable to evaluate and cast expressions in limit clauses to Int") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also update the test name, we don't try to cast the limit expression to int type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, will change it. Thanks!
LGTM except some style comments, thanks for working on it! |
Test build #62058 has finished for PR 14034 at commit
|
you missed one comment : https://github.com/apache/spark/pull/14034/files#r70183958 :) |
uh... Thanks! Let me do it now. |
LGTM, pending jenkins |
Thank you! |
Test build #62069 has finished for PR 14034 at commit
|
Test build #62073 has finished for PR 14034 at commit
|
retest this please |
Test build #62078 has finished for PR 14034 at commit
|
Test build #62080 has finished for PR 14034 at commit
|
thanks, merging to master and 2.0! |
…n-foldable, Zero or Negative #### What changes were proposed in this pull request? **Issue 1:** When a query containing LIMIT/TABLESAMPLE 0, the statistics could be zero. Results are correct but it could cause a huge performance regression. For example, ```Scala Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") .createOrReplaceTempView("test") val df1 = spark.table("test") val df2 = spark.table("test").limit(0) val df = df1.join(df2, Seq("k"), "left") ``` The statistics of both `df` and `df2` are zero. The statistics values should never be zero; otherwise `sizeInBytes` of `BinaryNode` will also be zero (product of children). This PR is to increase it to `1` when the num of rows is equal to 0. **Issue 2:** When a query containing negative LIMIT/TABLESAMPLE, we should issue exceptions. Negative values could break the implementation assumption of multiple parts. For example, statistics calculation. Below is the example query. ```SQL SELECT * FROM testData TABLESAMPLE (-1 rows) SELECT * FROM testData LIMIT -1 ``` This PR is to issue an appropriate exception in this case. **Issue 3:** Spark SQL follows the restriction of LIMIT clause in Hive. The argument to the LIMIT clause must evaluate to a constant value. It can be a numeric literal, or another kind of numeric expression involving operators, casts, and function return values. You cannot refer to a column or use a subquery. Currently, we do not detect whether the expression in LIMIT clause is foldable or not. If non-foldable, we might issue a strange error message. For example, ```SQL SELECT * FROM testData LIMIT rand() > 0.2 ``` Then, a misleading error message is issued, like ``` assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2) +- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203] +- LocalLimit (_nondeterministic#202 > 0.2) +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202] +- LogicalRDD [key#11, value#12] java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2) +- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203] +- LocalLimit (_nondeterministic#202 > 0.2) +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202] +- LogicalRDD [key#11, value#12] ``` This PR detects it and then issues a meaningful error message. #### How was this patch tested? Added test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #14034 from gatorsmile/limit. (cherry picked from commit e226278) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Issue 1: When a query containing LIMIT/TABLESAMPLE 0, the statistics could be zero. Results are correct but it could cause a huge performance regression. For example,
The statistics of both
df
anddf2
are zero. The statistics values should never be zero; otherwisesizeInBytes
ofBinaryNode
will also be zero (product of children). This PR is to increase it to1
when the num of rows is equal to 0.Issue 2: When a query containing negative LIMIT/TABLESAMPLE, we should issue exceptions. Negative values could break the implementation assumption of multiple parts. For example, statistics calculation. Below is the example query.
This PR is to issue an appropriate exception in this case.
Issue 3: Spark SQL follows the restriction of LIMIT clause in Hive. The argument to the LIMIT clause must evaluate to a constant value. It can be a numeric literal, or another kind of numeric expression involving operators, casts, and function return values. You cannot refer to a column or use a subquery. Currently, we do not detect whether the expression in LIMIT clause is foldable or not. If non-foldable, we might issue a strange error message. For example,
Then, a misleading error message is issued, like
This PR detects it and then issues a meaningful error message.
How was this patch tested?
Added test cases.