-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-38404][SQL] Improve CTE resolution when a nested CTE references an outer CTE #36146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.analysis | ||
|
||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression | ||
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} | ||
|
@@ -55,27 +55,27 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
case _: Command | _: ParsedStatement | _: InsertIntoDir => true | ||
case _ => false | ||
} | ||
val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef] | ||
val cteDefs = ArrayBuffer.empty[CTERelationDef] | ||
val (substituted, lastSubstituted) = | ||
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { | ||
case LegacyBehaviorPolicy.EXCEPTION => | ||
assertNoNameConflictsInCTE(plan) | ||
traverseAndSubstituteCTE(plan, isCommand, cteDefs) | ||
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) | ||
case LegacyBehaviorPolicy.LEGACY => | ||
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None) | ||
case LegacyBehaviorPolicy.CORRECTED => | ||
traverseAndSubstituteCTE(plan, isCommand, cteDefs) | ||
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) | ||
} | ||
if (cteDefs.isEmpty) { | ||
substituted | ||
} else if (substituted eq lastSubstituted.get) { | ||
WithCTE(substituted, cteDefs.sortBy(_.id).toSeq) | ||
WithCTE(substituted, cteDefs.toSeq) | ||
} else { | ||
var done = false | ||
substituted.resolveOperatorsWithPruning(_ => !done) { | ||
case p if p eq lastSubstituted.get => | ||
done = true | ||
WithCTE(p, cteDefs.sortBy(_.id).toSeq) | ||
WithCTE(p, cteDefs.toSeq) | ||
} | ||
} | ||
} | ||
|
@@ -98,7 +98,7 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
val resolver = conf.resolver | ||
plan match { | ||
case UnresolvedWith(child, relations) => | ||
val newNames = mutable.ArrayBuffer.empty[String] | ||
val newNames = ArrayBuffer.empty[String] | ||
newNames ++= outerCTERelationNames | ||
relations.foreach { | ||
case (name, relation) => | ||
|
@@ -121,11 +121,11 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
|
||
private def legacyTraverseAndSubstituteCTE( | ||
plan: LogicalPlan, | ||
cteDefs: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = { | ||
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = { | ||
plan.resolveOperatorsUp { | ||
case UnresolvedWith(child, relations) => | ||
val resolvedCTERelations = | ||
resolveCTERelations(relations, isLegacy = true, isCommand = false, cteDefs) | ||
resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs) | ||
substituteCTE(child, alwaysInline = true, resolvedCTERelations) | ||
} | ||
} | ||
|
@@ -170,21 +170,23 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
* SELECT * FROM t | ||
* ) | ||
* @param plan the plan to be traversed | ||
* @return the plan where CTE substitution is applied | ||
* @param isCommand if this is a command | ||
* @param outerCTEDefs already resolved outer CTE definitions with names | ||
* @param cteDefs all accumulated CTE definitions | ||
* @return the plan where CTE substitution is applied and optionally the last substituted `With` | ||
* where CTE definitions will be gathered to | ||
*/ | ||
private def traverseAndSubstituteCTE( | ||
plan: LogicalPlan, | ||
isCommand: Boolean, | ||
cteDefs: mutable.ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { | ||
outerCTEDefs: Seq[(String, CTERelationDef)], | ||
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { | ||
var lastSubstituted: Option[LogicalPlan] = None | ||
val newPlan = plan.resolveOperatorsUpWithPruning( | ||
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { | ||
case UnresolvedWith(child: LogicalPlan, relations) => | ||
val resolvedCTERelations = | ||
resolveCTERelations(relations, isLegacy = false, isCommand, cteDefs) | ||
if (!isCommand) { | ||
cteDefs ++= resolvedCTERelations.map(_._2) | ||
} | ||
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) | ||
lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations)) | ||
lastSubstituted.get | ||
|
||
|
@@ -200,10 +202,14 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
relations: Seq[(String, SubqueryAlias)], | ||
isLegacy: Boolean, | ||
isCommand: Boolean, | ||
cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { | ||
val resolvedCTERelations = new mutable.ArrayBuffer[(String, CTERelationDef)](relations.size) | ||
outerCTEDefs: Seq[(String, CTERelationDef)], | ||
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { | ||
var resolvedCTERelations = if (isLegacy || isCommand) { | ||
Seq.empty | ||
} else { | ||
outerCTEDefs | ||
} | ||
for ((name, relation) <- relations) { | ||
val lastCTEDefCount = cteDefs.length | ||
val innerCTEResolved = if (isLegacy) { | ||
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner | ||
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations. | ||
|
@@ -221,31 +227,18 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
// WITH t3 AS (SELECT * FROM t1) | ||
// ) | ||
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`. | ||
traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1 | ||
} | ||
|
||
if (cteDefs.length > lastCTEDefCount) { | ||
// We have added more CTE relations to the `cteDefs` from the inner CTE, and these relations | ||
// should also be substituted with `resolvedCTERelations` as inner CTE relation can refer to | ||
// outer CTE relation. For example: | ||
// WITH t1 AS (SELECT 1) | ||
// t2 AS ( | ||
// WITH t3 AS (SELECT * FROM t1) | ||
// ) | ||
for (i <- lastCTEDefCount until cteDefs.length) { | ||
val substituted = | ||
substituteCTE(cteDefs(i).child, isLegacy || isCommand, resolvedCTERelations.toSeq) | ||
cteDefs(i) = cteDefs(i).copy(child = substituted) | ||
} | ||
traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1 | ||
} | ||
|
||
// CTE definition can reference a previous one | ||
val substituted = | ||
substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations.toSeq) | ||
val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations) | ||
val cteRelation = CTERelationDef(substituted) | ||
resolvedCTERelations += (name -> cteRelation) | ||
if (!(isLegacy || isCommand)) { | ||
cteDefs += cteRelation | ||
} | ||
// Prepending new CTEs makes sure that those have higher priority over outer ones. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about priority between CTE relations at the same level? Previously we append new CTE relations to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems safer to keep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we currently allow duplicate names at a given level:
But if we allowed this construct I would expect There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense |
||
resolvedCTERelations +:= (name -> cteRelation) | ||
} | ||
resolvedCTERelations.toSeq | ||
resolvedCTERelations | ||
} | ||
|
||
private def substituteCTE( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the order is guaranteed by other changes in this PR so we are safe to remove this
sortBy
here?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because we add new CTE defs to
cteDefs
immediately after creation: https://github.com/apache/spark/pull/36146/files#diff-4d16a733f8741de9a4b839ee7c356c3e9b439b4facc70018f5741da1e930c6a8R234-R236