Skip to content

[SPARK-24983][SQL] limit number of leaf expressions in a single project when collapse project to prevent driver oom #29094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

constzhou
Copy link

@constzhou constzhou commented Jul 14, 2020

What changes were proposed in this pull request?

Spark sql catalyst optimizer has a rule CollapseProject, it will try to collapse multi projects to a single one if possible. But the collapsed project may have more leaf expressions than the origin project. In the worst case the number of leaf expressions will grow
exponentially, and will lead to driver oom. We could limit the max number of leaf expression a single project can have to prevent this. The logical is that if the project has more than 1000 leaf expressions, there is a risk to continue collapse it. So we think this single project is very complex and we stop collapse it. But other simple project can still collapse.

Why are the changes needed?

refer to https://issues.apache.org/jira/browse/SPARK-24983 or the unit test added, collapse project optimizer may cause driver oom if leaf expressions grow exponentially.
As we can see in JIRA there is a closed PR: #21993 intend to fix this. There is a comment that we should black list "case when" in collapse project, but any kind of sql may lead to the memory problem(like the ut case), and there is a workaround to exclude the collapse project optimizer, but I think this rule is necessary which could improve performance. And it is hard to decide whether we should exclude this rule ahead of time.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add a unit test to check whether the query collapse to two project. And all the existing UT passed in CollapseProjectSuite

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@constzhou
Copy link
Author

can anyone help to review this?

@@ -697,16 +697,21 @@ object ColumnPruning extends Rule[LogicalPlan] {
* `GlobalLimit(LocalLimit)` pattern is also considered.
*/
object CollapseProject extends Rule[LogicalPlan] {
// If number of leaf expressions exceed MAX_LEAF_SIZE, do not collapse to prevent driver oom
// due to a single large project.
private val MAX_LEAF_SIZE = 1000

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p1 @ Project(_, p2: Project) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you can simplify it like this?;

  private def hasTooManyExprs(exprs: Seq[Expression]): Boolean = {
    var numExprs = 0
    exprs.foreach { _.foreach { _ => numExprs += 1 } }
    numExprs > SQLConf.get.XXXX
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    case p1 @ Project(_, p2: Project) if hasTooManyExprs(p2.projectList) => // skip
      p1

    case p1 @ Project(_, p2: Project) =>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can simplify the condition check logic, do you suggest to add a new SQLConf instead of the hard limit? And for the case statement there is already a condition check called 'haveCommonNonDeterministicOutput', so I put them together. Also the same for 'case p @ Project(_, agg: Aggregate)'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, if we add this logic, I think we need a conf for that.

for( _ <- 1 to 10) {
// after n iterations the number of leaf expressions will be 2^{n+1}
// => after 10 iterations we would end up with more than 1000 leaf expressions
query = query.select(('a + 'b).as('a), ('a - 'b).as('b))
Copy link
Member

@maropu maropu Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue can happens in ProjectExec even if the issue of CollapseProject fixed?

case Project(fields, child) =>
collectProjectsAndFilters(child) match {
case Some((_, filters, other, aliases)) =>
// Follow CollapseProject and only keep going if the collected Projects
// do not have common non-deterministic expressions.
if (!hasCommonNonDeterministic(fields, aliases)) {
val substitutedFields =
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
Some((Some(substitutedFields), filters, other, collectAliases(substitutedFields)))

scala> Seq((1, 2)).toDF("a", "b").write.saveAsTable("a")
scala> var query = spark.table("a")
scala> for( _ <- 1 to 10) {
     |   query = query.select(('a + 'b).as('a), ('a - 'b).as('b))
     | }

scala> query.explain(true)
== Parsed Logical Plan ==
...

== Analyzed Logical Plan ==
...

== Optimized Logical Plan ==
Project [(a#49 + b#50) AS a#53, (a#49 - b#50) AS b#54]
+- Project [(a#45 + b#46) AS a#49, (a#45 - b#46) AS b#50]
   +- Project [(a#41 + b#42) AS a#45, (a#41 - b#42) AS b#46]
      +- Project [(a#37 + b#38) AS a#41, (a#37 - b#38) AS b#42]
         +- Project [(a#33 + b#34) AS a#37, (a#33 - b#34) AS b#38]
            +- Project [(a#29 + b#30) AS a#33, (a#29 - b#30) AS b#34]
               +- Project [(a#25 + b#26) AS a#29, (a#25 - b#26) AS b#30]
                  +- Project [(a#21 + b#22) AS a#25, (a#21 - b#22) AS b#26]
                     +- Project [(a#17 + b#18) AS a#21, (a#17 - b#18) AS b#22]
                        +- Project [(a#13 + b#14) AS a#17, (a#13 - b#14) AS b#18]
                           +- Relation[a#13,b#14] parquet

== Physical Plan ==
*(1) Project [((((((((((a#13 + b#14) AS a#17 + ...
  // too many expressions...

+- *(1) ColumnarToRow
   +- FileScan parquet default.a[a#13,b#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/a], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding, my test case is :

var query = spark.range(5).withColumn("new_column", 'id + 5 as "plus5").toDF("a","b")
for( a <- 1 to 10) {query = query.select(('a + 'b).as('a), ('a - 'b).as('b))}
query.explain(true)

And it works for both Optimized Logical Plan and Physical Plan.

I notice the difference is that my data type is bigint: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint], it seems the project will not collapse

I test the case above and the problem exist for Physical Plan, so we also add a check for that?

@maropu
Copy link
Member

maropu commented Jul 22, 2020

cc: @gatorsmile

@constzhou
Copy link
Author

code changed. Please feel free to comment. Thanks! @maropu

@constzhou
Copy link
Author

@maropu could you help to review it again? Thanks!

@github-actions
Copy link

github-actions bot commented Nov 8, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 8, 2020
@github-actions github-actions bot closed this Nov 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants