Skip to content

Commit 636119c

Browse files
committed
[SPARK-31607][SQL] Improve the perf of CTESubstitution
### What changes were proposed in this pull request? In `CTESubstitution`, resolve CTE relations first, then traverse the main plan only once to substitute CTE relations. ### Why are the changes needed? Currently we will traverse the main query many times (if there are many CTE relations), which can be pretty slow if the main query is large. ### Does this PR introduce any user-facing change? No ### How was this patch tested? local perf test ``` scala> :pa // Entering paste mode (ctrl-D to finish) def test(i: Int): Unit = 1.to(i).foreach { _ => spark.sql(""" with t1 as (select 1), t2 as (select 1), t3 as (select 1), t4 as (select 1), t5 as (select 1), t6 as (select 1), t7 as (select 1), t8 as (select 1), t9 as (select 1) select * from t1, t2, t3, t4, t5, t6, t7, t8, t9""").queryExecution.assertAnalyzed() } // Exiting paste mode, now interpreting. test: (i: Int)Unit scala> test(10000) scala> println(org.apache.spark.sql.catalyst.rules.RuleExecutor.dumpTimeSpent) ``` The result before this patch ``` Rule Effective Time / Total Time Effective Runs / Total Runs CTESubstitution 3328796344 / 3924576425 10000 / 20000 ``` The result after this patch ``` Rule Effective Time / Total Time Effective Runs / Total Runs CTESubstitution 1503085936 / 2091992092 10000 / 20000 ``` About 2 times faster. Closes #28407 from cloud-fan/cte. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 7195a18 commit 636119c

File tree

1 file changed

+29
-21
lines changed

1 file changed

+29
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,8 @@ object CTESubstitution extends Rule[LogicalPlan] {
8787
private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
8888
plan.resolveOperatorsUp {
8989
case With(child, relations) =>
90-
// substitute CTE expressions right-to-left to resolve references to previous CTEs:
91-
// with a as (select * from t), b as (select * from a) select * from b
92-
relations.foldRight(child) {
93-
case ((cteName, ctePlan), currentPlan) => substituteCTE(currentPlan, cteName, ctePlan)
94-
}
90+
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true)
91+
substituteCTE(child, resolvedCTERelations)
9592
}
9693
}
9794

@@ -139,18 +136,8 @@ object CTESubstitution extends Rule[LogicalPlan] {
139136
private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
140137
plan.resolveOperatorsUp {
141138
case With(child: LogicalPlan, relations) =>
142-
// Substitute CTE definitions from last to first as a CTE definition can reference a
143-
// previous one
144-
relations.foldRight(child) {
145-
case ((cteName, ctePlan), currentPlan) =>
146-
// A CTE definition might contain an inner CTE that has priority, so traverse and
147-
// substitute CTE defined in ctePlan.
148-
// A CTE definition might not be used at all or might be used multiple times. To avoid
149-
// computation if it is not used and to avoid multiple recomputation if it is used
150-
// multiple times we use a lazy construct with call-by-name parameter passing.
151-
lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan)
152-
substituteCTE(currentPlan, cteName, substitutedCTEPlan)
153-
}
139+
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false)
140+
substituteCTE(child, resolvedCTERelations)
154141

155142
case other =>
156143
other.transformExpressions {
@@ -159,17 +146,38 @@ object CTESubstitution extends Rule[LogicalPlan] {
159146
}
160147
}
161148

149+
private def resolveCTERelations(
150+
relations: Seq[(String, SubqueryAlias)],
151+
isLegacy: Boolean): Seq[(String, LogicalPlan)] = {
152+
val resolvedCTERelations = new mutable.ArrayBuffer[(String, LogicalPlan)](relations.size)
153+
for ((name, relation) <- relations) {
154+
val innerCTEResolved = if (isLegacy) {
155+
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
156+
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
157+
// Analyzer will run this rule multiple times until all `With` nodes are resolved.
158+
relation
159+
} else {
160+
// A CTE definition might contain an inner CTE that has a higher priority, so traverse and
161+
// substitute CTE defined in `relation` first.
162+
traverseAndSubstituteCTE(relation)
163+
}
164+
// CTE definition can reference a previous one
165+
resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations))
166+
}
167+
resolvedCTERelations
168+
}
169+
162170
private def substituteCTE(
163171
plan: LogicalPlan,
164-
cteName: String,
165-
ctePlan: => LogicalPlan): LogicalPlan =
172+
cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan =
166173
plan resolveOperatorsUp {
167-
case UnresolvedRelation(Seq(table)) if plan.conf.resolver(cteName, table) => ctePlan
174+
case u @ UnresolvedRelation(Seq(table)) =>
175+
cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u)
168176

169177
case other =>
170178
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
171179
other transformExpressions {
172-
case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan))
180+
case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteRelations))
173181
}
174182
}
175183
}

0 commit comments

Comments
 (0)