-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24983][SQL] limit number of leaf expressions in a single project when collapse project to prevent driver oom #29094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
can anyone help to review this? |
@@ -697,16 +697,21 @@ object ColumnPruning extends Rule[LogicalPlan] { | |||
* `GlobalLimit(LocalLimit)` pattern is also considered. | |||
*/ | |||
object CollapseProject extends Rule[LogicalPlan] { | |||
// If number of leaf expressions exceed MAX_LEAF_SIZE, do not collapse to prevent driver oom | |||
// due to a single large project. | |||
private val MAX_LEAF_SIZE = 1000 | |||
|
|||
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | |||
case p1 @ Project(_, p2: Project) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you can simplify it like this?;
private def hasTooManyExprs(exprs: Seq[Expression]): Boolean = {
var numExprs = 0
exprs.foreach { _.foreach { _ => numExprs += 1 } }
numExprs > SQLConf.get.XXXX
}
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p1 @ Project(_, p2: Project) if hasTooManyExprs(p2.projectList) => // skip
p1
case p1 @ Project(_, p2: Project) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can simplify the condition check logic, do you suggest to add a new SQLConf instead of the hard limit? And for the case statement there is already a condition check called 'haveCommonNonDeterministicOutput', so I put them together. Also the same for 'case p @ Project(_, agg: Aggregate)'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, if we add this logic, I think we need a conf for that.
for( _ <- 1 to 10) { | ||
// after n iterations the number of leaf expressions will be 2^{n+1} | ||
// => after 10 iterations we would end up with more than 1000 leaf expressions | ||
query = query.select(('a + 'b).as('a), ('a - 'b).as('b)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same issue can happens in ProjectExec
even if the issue of CollapseProject
fixed?
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
Lines 127 to 135 in 4da93b0
case Project(fields, child) => | |
collectProjectsAndFilters(child) match { | |
case Some((_, filters, other, aliases)) => | |
// Follow CollapseProject and only keep going if the collected Projects | |
// do not have common non-deterministic expressions. | |
if (!hasCommonNonDeterministic(fields, aliases)) { | |
val substitutedFields = | |
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] | |
Some((Some(substitutedFields), filters, other, collectAliases(substitutedFields))) |
scala> Seq((1, 2)).toDF("a", "b").write.saveAsTable("a")
scala> var query = spark.table("a")
scala> for( _ <- 1 to 10) {
| query = query.select(('a + 'b).as('a), ('a - 'b).as('b))
| }
scala> query.explain(true)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Project [(a#49 + b#50) AS a#53, (a#49 - b#50) AS b#54]
+- Project [(a#45 + b#46) AS a#49, (a#45 - b#46) AS b#50]
+- Project [(a#41 + b#42) AS a#45, (a#41 - b#42) AS b#46]
+- Project [(a#37 + b#38) AS a#41, (a#37 - b#38) AS b#42]
+- Project [(a#33 + b#34) AS a#37, (a#33 - b#34) AS b#38]
+- Project [(a#29 + b#30) AS a#33, (a#29 - b#30) AS b#34]
+- Project [(a#25 + b#26) AS a#29, (a#25 - b#26) AS b#30]
+- Project [(a#21 + b#22) AS a#25, (a#21 - b#22) AS b#26]
+- Project [(a#17 + b#18) AS a#21, (a#17 - b#18) AS b#22]
+- Project [(a#13 + b#14) AS a#17, (a#13 - b#14) AS b#18]
+- Relation[a#13,b#14] parquet
== Physical Plan ==
*(1) Project [((((((((((a#13 + b#14) AS a#17 + ...
// too many expressions...
+- *(1) ColumnarToRow
+- FileScan parquet default.a[a#13,b#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/a], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reminding, my test case is :
var query = spark.range(5).withColumn("new_column", 'id + 5 as "plus5").toDF("a","b")
for( a <- 1 to 10) {query = query.select(('a + 'b).as('a), ('a - 'b).as('b))}
query.explain(true)
And it works for both Optimized Logical Plan and Physical Plan.
I notice the difference is that my data type is bigint: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint], it seems the project will not collapse
I test the case above and the problem exist for Physical Plan, so we also add a check for that?
cc: @gatorsmile |
code changed. Please feel free to comment. Thanks! @maropu |
@maropu could you help to review it again? Thanks! |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Spark sql catalyst optimizer has a rule CollapseProject, it will try to collapse multi projects to a single one if possible. But the collapsed project may have more leaf expressions than the origin project. In the worst case the number of leaf expressions will grow
exponentially, and will lead to driver oom. We could limit the max number of leaf expression a single project can have to prevent this. The logical is that if the project has more than 1000 leaf expressions, there is a risk to continue collapse it. So we think this single project is very complex and we stop collapse it. But other simple project can still collapse.
Why are the changes needed?
refer to https://issues.apache.org/jira/browse/SPARK-24983 or the unit test added, collapse project optimizer may cause driver oom if leaf expressions grow exponentially.
As we can see in JIRA there is a closed PR: #21993 intend to fix this. There is a comment that we should black list "case when" in collapse project, but any kind of sql may lead to the memory problem(like the ut case), and there is a workaround to exclude the collapse project optimizer, but I think this rule is necessary which could improve performance. And it is hard to decide whether we should exclude this rule ahead of time.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add a unit test to check whether the query collapse to two project. And all the existing UT passed in CollapseProjectSuite