Skip to content

[SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When LIMIT/TABLESAMPLE is Non-foldable, Zero or Negative #14034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from

Conversation

gatorsmile
Copy link
Member

@gatorsmile gatorsmile commented Jul 3, 2016

What changes were proposed in this pull request?

Issue 1: When a query containing LIMIT/TABLESAMPLE 0, the statistics could be zero. Results are correct but it could cause a huge performance regression. For example,

Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
  .createOrReplaceTempView("test")
val df1 = spark.table("test")
val df2 = spark.table("test").limit(0)
val df = df1.join(df2, Seq("k"), "left")

The statistics of both df and df2 are zero. The statistics values should never be zero; otherwise sizeInBytes of BinaryNode will also be zero (product of children). This PR is to increase it to 1 when the num of rows is equal to 0.

Issue 2: When a query containing negative LIMIT/TABLESAMPLE, we should issue exceptions. Negative values could break the implementation assumption of multiple parts. For example, statistics calculation. Below is the example query.

SELECT * FROM testData TABLESAMPLE (-1 rows)
SELECT * FROM testData LIMIT -1

This PR is to issue an appropriate exception in this case.

Issue 3: Spark SQL follows the restriction of LIMIT clause in Hive. The argument to the LIMIT clause must evaluate to a constant value. It can be a numeric literal, or another kind of numeric expression involving operators, casts, and function return values. You cannot refer to a column or use a subquery. Currently, we do not detect whether the expression in LIMIT clause is foldable or not. If non-foldable, we might issue a strange error message. For example,

SELECT * FROM testData LIMIT rand() > 0.2

Then, a misleading error message is issued, like

assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

This PR detects it and then issues a meaningful error message.

How was this patch tested?

Added test cases.

@SparkQA
Copy link

SparkQA commented Jul 3, 2016

Test build #61678 has finished for PR 14034 at commit bdf4e56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

cc @cloud-fan @davies Thanks!

@gatorsmile gatorsmile changed the title [16355] [16354] [SQL] Fix Bugs When LIMIT/TABLESAMPLE is Zero or Negative [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When LIMIT/TABLESAMPLE is Zero or Negative Jul 4, 2016
s"number_rows:$numRows")
}

case l: LocalLimit =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about merging the two cases to case l @ (_: LocalLimit | _: GlobalLimit) => to remove the duplication (or at least introduce a local method).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we can merge them, but, yeah, we can create a local function for it. Thank you!

@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61736 has finished for PR 14034 at commit 3c402d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -46,6 +46,15 @@ trait CheckAnalysis extends PredicateHelper {
}).length > 1
}

private def checkLimitClause(limitExpr: Expression): Unit = {
val numRows = limitExpr.eval().asInstanceOf[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the limit expression guaranteed to be literal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Users can input an expression here. For example,

assertEqual(s"$sql limit cast(9 / 4 as int)", plan.limit(Cast(Literal(9) / 4, IntegerType)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, but it's still foldable. Is it possible it's non-foldable?

Copy link
Member Author

@gatorsmile gatorsmile Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support non-foldable limit clauses.

object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case _ => Nil
}
}

case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil

But,,, we do not issue an exception if users do it. Thus, the error we got is strange:

assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do it in this PR. Thank you for your review! : )

@gatorsmile gatorsmile changed the title [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When LIMIT/TABLESAMPLE is Zero or Negative [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When LIMIT/TABLESAMPLE is Non-foldable, Zero or Negative Jul 5, 2016
@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61746 has finished for PR 14034 at commit 5b36fbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes(0).equals(BigInt(96)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you equals? Would === not work here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just following what the other test cases did. Sure, we can change all of them. Let me do it. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61761 has finished for PR 14034 at commit a2a828f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

ping @cloud-fan : )

@@ -660,7 +660,12 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
if (sizeInBytes == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a special case for limit. How about we make it more clear? e.g.

val sizeInBytes = if (limit == 0) {
  1
} else {
  ...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let me fix it.

arrayData.collect().take(1).map(Row.fromTuple).toSeq)

checkAnswer(
sql("SELECT * FROM mapData LIMIT 1"),
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}

test("non-foldable expressions in LIMIT") {
val e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT key > 3")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if the type is wrong? e.g. LIMIT true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good question! : ) Now, the exception we issued is not good:

java.lang.Boolean cannot be cast to java.lang.Integer
java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)

Let me fix it and throw a more reasonable exception:

number_rows in limit clause cannot be cast to integer:true;

case o: Int if o >= 0 => // OK
case o: Int => failAnalysis(
s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o")
case o => failAnalysis(
Copy link
Contributor

@cloud-fan cloud-fan Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we allow other integral type? e.g. byte, long, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, we can do it, but we need to fix the other parts. Let me try it. Thanks!

Copy link
Member Author

@gatorsmile gatorsmile Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to do it in different ways, but the best way I found is to introduce an Analyzer rule.

We have one rule in Optimizer and one rule in Planner for Limit. Both assume the limitExpr is an Integer type. It looks ugly to convert the type everywhere. To support different numeric types, we need to convert these foldable numeric values to int in Analyzer, I think. Let me upload a fix at first. You can judge whether this is a good way or not. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 8, 2016

Test build #61963 has finished for PR 14034 at commit f600ba4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2016

Test build #61974 has finished for PR 14034 at commit 1abdbb9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2016

Test build #61973 has finished for PR 14034 at commit 8fd72f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2016

Test build #61997 has finished for PR 14034 at commit 3036847.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please

object ResolveLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case g @ GlobalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) =>
g.copy(limitExpr = Literal(limitExpr.eval().asInstanceOf[Number].intValue(), IntegerType))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way is like

g.copy(limitExpr = Literal(Cast(limitExpr, IntegerType).eval(), IntegerType))

Not sure which one is better here.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62002 has finished for PR 14034 at commit 3036847.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62006 has finished for PR 14034 at commit d135b77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -2045,6 +2047,21 @@ object EliminateUnions extends Rule[LogicalPlan] {
}

/**
* Converts foldable numeric expressions to integers in [[GlobalLimit]] and [[LocalLimit]] operators
*/
object ResolveLimits extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying this out! Yea looks like it's doable. Let's remove it first and do it in follow-ups, to make this PR surgical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create a JIRA first and discuss with others to decide if it's useful to support all integral types in limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me revert it back. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62027 has finished for PR 14034 at commit 028aa79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case o: Int => failAnalysis(
s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o")
case o => failAnalysis(
s"""number_rows in limit clause cannot be cast to integer:\"$o\".""")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot be cast to integer -> must be integer? e.g. byte is castable to int.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

"Limit:(testdata.`key` > 3)"))
}

test("Limit: unable to evaluate and cast expressions in limit clauses to Int") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update the test name, we don't try to cast the limit expression to int type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will change it. Thanks!

@cloud-fan
Copy link
Contributor

LGTM except some style comments, thanks for working on it!

@SparkQA
Copy link

SparkQA commented Jul 10, 2016

Test build #62058 has finished for PR 14034 at commit 01137dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

you missed one comment : https://github.com/apache/spark/pull/14034/files#r70183958 :)

@gatorsmile
Copy link
Member Author

uh... Thanks! Let me do it now.

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@gatorsmile
Copy link
Member Author

Thank you!

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62069 has finished for PR 14034 at commit dec5ad9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62073 has finished for PR 14034 at commit 2e6f8d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62078 has finished for PR 14034 at commit 2e6f8d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62080 has finished for PR 14034 at commit d66870b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in e226278 Jul 11, 2016
@cloud-fan
Copy link
Contributor

thanks, merging to master and 2.0!

asfgit pushed a commit that referenced this pull request Jul 11, 2016
…n-foldable, Zero or Negative

#### What changes were proposed in this pull request?
**Issue 1:** When a query containing LIMIT/TABLESAMPLE 0, the statistics could be zero. Results are correct but it could cause a huge performance regression. For example,
```Scala
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
  .createOrReplaceTempView("test")
val df1 = spark.table("test")
val df2 = spark.table("test").limit(0)
val df = df1.join(df2, Seq("k"), "left")
```
The statistics of both `df` and `df2` are zero. The statistics values should never be zero; otherwise `sizeInBytes` of `BinaryNode` will also be zero (product of children). This PR is to increase it to `1` when the num of rows is equal to 0.

**Issue 2:** When a query containing negative LIMIT/TABLESAMPLE, we should issue exceptions. Negative values could break the implementation assumption of multiple parts. For example, statistics calculation.  Below is the example query.
```SQL
SELECT * FROM testData TABLESAMPLE (-1 rows)
SELECT * FROM testData LIMIT -1
```
This PR is to issue an appropriate exception in this case.

**Issue 3:** Spark SQL follows the restriction of LIMIT clause in Hive. The argument to the LIMIT clause must evaluate to a constant value. It can be a numeric literal, or another kind of numeric expression involving operators, casts, and function return values. You cannot refer to a column or use a subquery. Currently, we do not detect whether the expression in LIMIT clause is foldable or not. If non-foldable, we might issue a strange error message. For example,
```SQL
SELECT * FROM testData LIMIT rand() > 0.2
```
Then, a misleading error message is issued, like
```
assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]
```
This PR detects it and then issues a meaningful error message.

#### How was this patch tested?
Added test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14034 from gatorsmile/limit.

(cherry picked from commit e226278)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants