Skip to content

[SPARK-38404][SQL][3.3] Improve CTE resolution when a nested CTE references an outer CTE #37760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
Expand Down Expand Up @@ -55,27 +55,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
}
val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef]
val cteDefs = ArrayBuffer.empty[CTERelationDef]
val (substituted, lastSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan, isCommand, cteDefs)
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, isCommand, cteDefs)
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq lastSubstituted.get) {
WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
WithCTE(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq lastSubstituted.get =>
done = true
WithCTE(p, cteDefs.sortBy(_.id).toSeq)
WithCTE(p, cteDefs.toSeq)
}
}
}
Expand All @@ -98,7 +98,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
val resolver = conf.resolver
plan match {
case UnresolvedWith(child, relations) =>
val newNames = mutable.ArrayBuffer.empty[String]
val newNames = ArrayBuffer.empty[String]
newNames ++= outerCTERelationNames
relations.foreach {
case (name, relation) =>
Expand All @@ -121,11 +121,11 @@ object CTESubstitution extends Rule[LogicalPlan] {

private def legacyTraverseAndSubstituteCTE(
plan: LogicalPlan,
cteDefs: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsUp {
case UnresolvedWith(child, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, isCommand = false, cteDefs)
resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
}
}
Expand Down Expand Up @@ -170,21 +170,23 @@ object CTESubstitution extends Rule[LogicalPlan] {
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @return the plan where CTE substitution is applied
* @param isCommand if this is a command
* @param outerCTEDefs already resolved outer CTE definitions with names
* @param cteDefs all accumulated CTE definitions
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
* where CTE definitions will be gathered to
*/
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
isCommand: Boolean,
cteDefs: mutable.ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
var lastSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, isCommand, cteDefs)
if (!isCommand) {
cteDefs ++= resolvedCTERelations.map(_._2)
}
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs)
lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations))
lastSubstituted.get

Expand All @@ -200,10 +202,14 @@ object CTESubstitution extends Rule[LogicalPlan] {
relations: Seq[(String, SubqueryAlias)],
isLegacy: Boolean,
isCommand: Boolean,
cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
val resolvedCTERelations = new mutable.ArrayBuffer[(String, CTERelationDef)](relations.size)
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
var resolvedCTERelations = if (isLegacy || isCommand) {
Seq.empty
} else {
outerCTEDefs
}
for ((name, relation) <- relations) {
val lastCTEDefCount = cteDefs.length
val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
Expand All @@ -221,31 +227,18 @@ object CTESubstitution extends Rule[LogicalPlan] {
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1
}

if (cteDefs.length > lastCTEDefCount) {
// We have added more CTE relations to the `cteDefs` from the inner CTE, and these relations
// should also be substituted with `resolvedCTERelations` as inner CTE relation can refer to
// outer CTE relation. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t3 AS (SELECT * FROM t1)
// )
for (i <- lastCTEDefCount until cteDefs.length) {
val substituted =
substituteCTE(cteDefs(i).child, isLegacy || isCommand, resolvedCTERelations.toSeq)
cteDefs(i) = cteDefs(i).copy(child = substituted)
}
traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1
}

// CTE definition can reference a previous one
val substituted =
substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations.toSeq)
val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations)
val cteRelation = CTERelationDef(substituted)
resolvedCTERelations += (name -> cteRelation)
if (!(isLegacy || isCommand)) {
cteDefs += cteRelation
}
// Prepending new CTEs makes sure that those have higher priority over outer ones.
resolvedCTERelations +:= (name -> cteRelation)
}
resolvedCTERelations.toSeq
resolvedCTERelations
}

private def substituteCTE(
Expand Down
13 changes: 12 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,15 @@ WITH abc AS (SELECT 1)
SELECT (
WITH aBc AS (SELECT 2)
SELECT * FROM aBC
);
);

-- SPARK-38404: CTE in CTE definition references outer
WITH
t1 AS (SELECT 1),
t2 AS (
WITH t3 AS (
SELECT * FROM t1
)
SELECT * FROM t3
)
SELECT * FROM t2;
19 changes: 18 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 16
-- Number of queries: 17


-- !query
Expand Down Expand Up @@ -219,3 +219,20 @@ SELECT (
struct<scalarsubquery():int>
-- !query output
1


-- !query
WITH
t1 AS (SELECT 1),
t2 AS (
WITH t3 AS (
SELECT * FROM t1
)
SELECT * FROM t3
)
SELECT * FROM t2
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: t1; line 5 pos 20
18 changes: 17 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 16
-- Number of queries: 17


-- !query
Expand Down Expand Up @@ -227,3 +227,19 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Name aBc is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.


-- !query
WITH
t1 AS (SELECT 1),
t2 AS (
WITH t3 AS (
SELECT * FROM t1
)
SELECT * FROM t3
)
SELECT * FROM t2
-- !query schema
struct<1:int>
-- !query output
1
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 16
-- Number of queries: 17


-- !query
Expand Down Expand Up @@ -219,3 +219,19 @@ SELECT (
struct<scalarsubquery():int>
-- !query output
2


-- !query
WITH
t1 AS (SELECT 1),
t2 AS (
WITH t3 AS (
SELECT * FROM t1
)
SELECT * FROM t3
)
SELECT * FROM t2
-- !query schema
struct<1:int>
-- !query output
1