Skip to content

Commit 9f6be88

Browse files
peter-tothLuciferYang
authored andcommitted
[SPARK-43199][SQL] Make InlineCTE idempotent
### What changes were proposed in this pull request? This PR fixes `InlineCTE`'s idempotence. E.g. the following query: ``` WITH x(r) AS (SELECT random()), y(r) AS (SELECT * FROM x), z(r) AS (SELECT * FROM x) SELECT * FROM z ``` currently breaks it because we take into account the reference to `x` from `y` when deciding about not inlining `x` in the first round: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.InlineCTE === WithCTE WithCTE :- CTERelationDef 0, false :- CTERelationDef 0, false : +- Project [rand()apache#218 AS r#219] : +- Project [rand()apache#218 AS r#219] : +- Project [random(2957388522017368375) AS rand()apache#218] : +- Project [random(2957388522017368375) AS rand()apache#218] : +- OneRowRelation : +- OneRowRelation !:- CTERelationDef 1, false +- Project [r#222] !: +- Project [r#219 AS r#221] +- Project [r#220 AS r#222] !: +- Project [r#219] +- Project [r#220] !: +- CTERelationRef 0, true, [r#219] +- CTERelationRef 0, true, [r#220] !:- CTERelationDef 2, false !: +- Project [r#220 AS r#222] !: +- Project [r#220] !: +- CTERelationRef 0, true, [r#220] !+- Project [r#222] ! +- CTERelationRef 2, true, [r#222] ``` But in the next round we inline `x` because `y` was removed due to lack of references: ``` Once strategy's idempotence is broken for batch Inline CTE !WithCTE Project [r#222] !:- CTERelationDef 0, false +- Project [r#220 AS r#222] !: +- Project [rand()apache#218 AS r#219] +- Project [r#220] !: +- Project [random(2957388522017368375) AS rand()apache#218] +- Project [r#225 AS r#220] !: +- OneRowRelation +- Project [rand()apache#218 AS r#225] !+- Project [r#222] +- Project [random(2957388522017368375) AS rand()apache#218] ! +- Project [r#220 AS r#222] +- OneRowRelation ! +- Project [r#220] ! +- CTERelationRef 0, true, [r#220] ``` ### Why are the changes needed? We use `InlineCTE` as an idempotent rule in the `Optimizer`, `CheckAnalysis` and `ProgressReporter`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new UT. Closes apache#40856 from peter-toth/SPARK-43199-make-inlinecte-idempotent. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 9489513 commit 9f6be88

File tree

3 files changed

+77
-24
lines changed

3 files changed

+77
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
145145

146146
def checkAnalysis(plan: LogicalPlan): Unit = {
147147
val inlineCTE = InlineCTE(alwaysInline = true)
148-
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
148+
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])]
149149
inlineCTE.buildCTEMap(plan, cteMap)
150-
cteMap.values.foreach { case (relation, refCount) =>
150+
cteMap.values.foreach { case (relation, refCount, _) =>
151151
// If a CTE relation is never used, it will disappear after inline. Here we explicitly check
152152
// analysis for it, to make sure the entire query plan is valid.
153153
if (refCount == 0) checkAnalysis0(relation.child)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {
4242

4343
override def apply(plan: LogicalPlan): LogicalPlan = {
4444
if (!plan.isInstanceOf[Subquery] && plan.containsPattern(CTE)) {
45-
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
45+
val cteMap = mutable.SortedMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])]
4646
buildCTEMap(plan, cteMap)
47+
cleanCTEMap(cteMap)
4748
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
4849
val inlined = inlineCTE(plan, cteMap, notInlined)
4950
// CTEs in SQL Commands have been inlined by `CTESubstitution` already, so it is safe to add
@@ -68,50 +69,91 @@ case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {
6869
cteDef.child.exists(_.expressions.exists(_.isInstanceOf[OuterReference]))
6970
}
7071

72+
/**
73+
* Accumulates all the CTEs from a plan into a special map.
74+
*
75+
* @param plan The plan to collect the CTEs from
76+
* @param cteMap A mutable map that accumulates the CTEs and their reference information by CTE
77+
* ids. The value of the map is tuple whose elements are:
78+
* - The CTE definition
79+
* - The number of incoming references to the CTE. This includes references from
80+
* other CTEs and regular places.
81+
* - A mutable inner map that tracks outgoing references (counts) to other CTEs.
82+
* @param outerCTEId While collecting the map we use this optional CTE id to identify the
83+
* current outer CTE.
84+
*/
7185
def buildCTEMap(
7286
plan: LogicalPlan,
73-
cteMap: mutable.HashMap[Long, (CTERelationDef, Int)]): Unit = {
87+
cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])],
88+
outerCTEId: Option[Long] = None): Unit = {
7489
plan match {
75-
case WithCTE(_, cteDefs) =>
90+
case WithCTE(child, cteDefs) =>
91+
cteDefs.foreach { cteDef =>
92+
cteMap(cteDef.id) = (cteDef, 0, mutable.Map.empty.withDefaultValue(0))
93+
}
7694
cteDefs.foreach { cteDef =>
77-
cteMap.put(cteDef.id, (cteDef, 0))
95+
buildCTEMap(cteDef, cteMap, Some(cteDef.id))
7896
}
97+
buildCTEMap(child, cteMap, outerCTEId)
7998

8099
case ref: CTERelationRef =>
81-
val (cteDef, refCount) = cteMap(ref.cteId)
82-
cteMap.update(ref.cteId, (cteDef, refCount + 1))
100+
val (cteDef, refCount, refMap) = cteMap(ref.cteId)
101+
cteMap(ref.cteId) = (cteDef, refCount + 1, refMap)
102+
outerCTEId.foreach { cteId =>
103+
val (_, _, outerRefMap) = cteMap(cteId)
104+
outerRefMap(ref.cteId) += 1
105+
}
83106

84107
case _ =>
85-
}
86-
87-
if (plan.containsPattern(CTE)) {
88-
plan.children.foreach { child =>
89-
buildCTEMap(child, cteMap)
90-
}
108+
if (plan.containsPattern(CTE)) {
109+
plan.children.foreach { child =>
110+
buildCTEMap(child, cteMap, outerCTEId)
111+
}
91112

92-
plan.expressions.foreach { expr =>
93-
if (expr.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
94-
expr.foreach {
95-
case e: SubqueryExpression =>
96-
buildCTEMap(e.plan, cteMap)
97-
case _ =>
113+
plan.expressions.foreach { expr =>
114+
if (expr.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
115+
expr.foreach {
116+
case e: SubqueryExpression => buildCTEMap(e.plan, cteMap, outerCTEId)
117+
case _ =>
118+
}
119+
}
98120
}
99121
}
122+
}
123+
}
124+
125+
/**
126+
* Cleans the CTE map by removing those CTEs that are not referenced at all and corrects those
127+
* CTE's reference counts where the removed CTE referred to.
128+
*
129+
* @param cteMap A mutable map that accumulates the CTEs and their reference information by CTE
130+
* ids. Needs to be sorted to speed up cleaning.
131+
*/
132+
private def cleanCTEMap(
133+
cteMap: mutable.SortedMap[Long, (CTERelationDef, Int, mutable.Map[Long, Int])]
134+
) = {
135+
cteMap.keys.toSeq.reverse.foreach { currentCTEId =>
136+
val (_, currentRefCount, refMap) = cteMap(currentCTEId)
137+
if (currentRefCount == 0) {
138+
refMap.foreach { case (referencedCTEId, uselessRefCount) =>
139+
val (cteDef, refCount, refMap) = cteMap(referencedCTEId)
140+
cteMap(referencedCTEId) = (cteDef, refCount - uselessRefCount, refMap)
141+
}
100142
}
101143
}
102144
}
103145

104146
private def inlineCTE(
105147
plan: LogicalPlan,
106-
cteMap: mutable.HashMap[Long, (CTERelationDef, Int)],
148+
cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])],
107149
notInlined: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
108150
plan match {
109151
case WithCTE(child, cteDefs) =>
110152
cteDefs.foreach { cteDef =>
111-
val (cte, refCount) = cteMap(cteDef.id)
153+
val (cte, refCount, refMap) = cteMap(cteDef.id)
112154
if (refCount > 0) {
113155
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, notInlined))
114-
cteMap.update(cteDef.id, (inlined, refCount))
156+
cteMap(cteDef.id) = (inlined, refCount, refMap)
115157
if (!shouldInline(inlined, refCount)) {
116158
notInlined.append(inlined)
117159
}
@@ -120,7 +162,7 @@ case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {
120162
inlineCTE(child, cteMap, notInlined)
121163

122164
case ref: CTERelationRef =>
123-
val (cteDef, refCount) = cteMap(ref.cteId)
165+
val (cteDef, refCount, _) = cteMap(ref.cteId)
124166
if (shouldInline(cteDef, refCount)) {
125167
if (ref.outputSet == cteDef.outputSet) {
126168
cteDef.child

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4648,6 +4648,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
46484648
sql("SELECT /*+ hash(t2) */ * FROM t1 join t2 on c1 = c2")
46494649
}
46504650
}
4651+
4652+
test("SPARK-43199: InlineCTE is idempotent") {
4653+
sql(
4654+
"""
4655+
|WITH
4656+
| x(r) AS (SELECT random()),
4657+
| y(r) AS (SELECT * FROM x),
4658+
| z(r) AS (SELECT * FROM x)
4659+
|SELECT * FROM z
4660+
|""".stripMargin).collect()
4661+
}
46514662
}
46524663

46534664
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)