Skip to content

Commit fedc9a6

Browse files
committed
fix
1 parent 58bc261 commit fedc9a6

File tree

17 files changed

+422
-80
lines changed

17 files changed

+422
-80
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.analysis
2020
import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
23-
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE}
23+
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.catalyst.trees.TreePattern._
2626
import org.apache.spark.sql.catalyst.util.TypeUtils._
2727
import org.apache.spark.sql.errors.QueryCompilationErrors
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy}
2930

3031
/**
@@ -54,16 +55,40 @@ object CTESubstitution extends Rule[LogicalPlan] {
5455
if (!plan.containsPattern(UNRESOLVED_WITH)) {
5556
return plan
5657
}
58+
59+
val forceInline = if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) {
60+
// The legacy behavior always inlines the CTE relations for queries in commands.
61+
plan.exists {
62+
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
63+
case _ => false
64+
}
65+
} else {
66+
val commands = plan.collect {
67+
case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c
68+
}
69+
if (commands.length == 1) {
70+
// If there is only one command and it's `CTEInChildren`, we can resolve
71+
// CTE normally and don't need to force inline.
72+
!commands.head.isInstanceOf[CTEInChildren]
73+
} else if (commands.length > 1) {
74+
// This can happen with the multi-insert statement. We should fall back to
75+
// the legacy behavior.
76+
true
77+
} else {
78+
false
79+
}
80+
}
81+
5782
val cteDefs = ArrayBuffer.empty[CTERelationDef]
5883
val (substituted, firstSubstituted) =
5984
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
6085
case LegacyBehaviorPolicy.EXCEPTION =>
6186
assertNoNameConflictsInCTE(plan)
62-
traverseAndSubstituteCTE(plan, Seq.empty, cteDefs)
87+
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
6388
case LegacyBehaviorPolicy.LEGACY =>
6489
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
6590
case LegacyBehaviorPolicy.CORRECTED =>
66-
traverseAndSubstituteCTE(plan, Seq.empty, cteDefs)
91+
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
6792
}
6893
if (cteDefs.isEmpty) {
6994
substituted
@@ -129,7 +154,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
129154
plan.resolveOperatorsUp {
130155
case UnresolvedWith(child, relations) =>
131156
val resolvedCTERelations =
132-
resolveCTERelations(relations, isLegacy = true, Seq.empty, cteDefs)
157+
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
133158
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
134159
}
135160
}
@@ -166,25 +191,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
166191
* SELECT * FROM t
167192
* )
168193
* @param plan the plan to be traversed
194+
* @param forceInline always inline the CTE relations if this is true
169195
* @param outerCTEDefs already resolved outer CTE definitions with names
170196
* @param cteDefs all accumulated CTE definitions
171197
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
172198
* where CTE definitions will be gathered to
173199
*/
174200
private def traverseAndSubstituteCTE(
175201
plan: LogicalPlan,
202+
forceInline: Boolean,
176203
outerCTEDefs: Seq[(String, CTERelationDef)],
177204
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
178205
var firstSubstituted: Option[LogicalPlan] = None
179206
val newPlan = plan.resolveOperatorsDownWithPruning(
180207
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
181208
case UnresolvedWith(child: LogicalPlan, relations) =>
182209
val resolvedCTERelations =
183-
resolveCTERelations(relations, isLegacy = false, outerCTEDefs, cteDefs) ++
210+
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
184211
outerCTEDefs
185212
val substituted = substituteCTE(
186-
traverseAndSubstituteCTE(child, resolvedCTERelations, cteDefs)._1,
187-
false,
213+
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
214+
forceInline,
188215
resolvedCTERelations)
189216
if (firstSubstituted.isEmpty) {
190217
firstSubstituted = Some(substituted)
@@ -202,9 +229,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
202229
private def resolveCTERelations(
203230
relations: Seq[(String, SubqueryAlias)],
204231
isLegacy: Boolean,
232+
forceInline: Boolean,
205233
outerCTEDefs: Seq[(String, CTERelationDef)],
206234
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
207-
var resolvedCTERelations = if (isLegacy) {
235+
val alwaysInline = isLegacy || forceInline
236+
var resolvedCTERelations = if (alwaysInline) {
208237
Seq.empty
209238
} else {
210239
outerCTEDefs
@@ -227,12 +256,12 @@ object CTESubstitution extends Rule[LogicalPlan] {
227256
// WITH t3 AS (SELECT * FROM t1)
228257
// )
229258
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
230-
traverseAndSubstituteCTE(relation, resolvedCTERelations, cteDefs)._1
259+
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
231260
}
232261
// CTE definition can reference a previous one
233-
val substituted = substituteCTE(innerCTEResolved, isLegacy, resolvedCTERelations)
262+
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
234263
val cteRelation = CTERelationDef(substituted)
235-
if (!(isLegacy)) {
264+
if (!alwaysInline) {
236265
cteDefs += cteRelation
237266
}
238267
// Prepending new CTEs makes sure that those have higher priority over outer ones.
@@ -271,18 +300,18 @@ object CTESubstitution extends Rule[LogicalPlan] {
271300
}
272301

273302
/**
274-
* Finds all logical nodes that should have `WithCTE` in their children like
275-
* `InsertIntoStatement`, put `WithCTE` on top of the children and don't place `WithCTE`
276-
* on top of the plan. If there are no such nodes, put `WithCTE` on the top.
303+
* For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its
304+
* children. There are two reasons:
305+
* 1. Some rules will pattern match the root command nodes, and we should keep command
306+
* as the root node to not break them.
307+
* 2. `Dataset` eagerly executes the commands inside a query plan. However, the CTE
308+
* references inside commands will be invalid if we execute the command alone, as
309+
* the CTE definitions are outside of the command.
277310
*/
278311
private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = {
279-
val withCTE = WithCTE(p, cteDefs)
280-
var onTop = true
281-
val newPlan = p.resolveOperatorsDown {
282-
case cteInChildren: CTEInChildren =>
283-
onTop = false
284-
cteInChildren.withCTE(withCTE)
312+
p match {
313+
case c: CTEInChildren => c.withCTEDefs(cteDefs)
314+
case _ => WithCTE(p, cteDefs)
285315
}
286-
if (onTop) withCTE else WithCTE(newPlan, cteDefs)
287316
}
288317
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi
887887
}
888888
}
889889

890+
/**
891+
* The logical node which is able to place the `WithCTE` node on its children.
892+
*/
893+
trait CTEInChildren extends LogicalPlan {
894+
def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
895+
withNewChildren(children.map(WithCTE(_, cteDefs)))
896+
}
897+
}
898+
899+
890900
case class WithWindowDefinition(
891901
windowDefinitions: Map[String, WindowSpecDefinition],
892902
child: LogicalPlan) extends UnaryNode {
@@ -896,15 +906,6 @@ case class WithWindowDefinition(
896906
copy(child = newChild)
897907
}
898908

899-
/**
900-
* The logical node is able to insert the given `WithCTE` into its children.
901-
*/
902-
trait CTEInChildren extends LogicalPlan {
903-
def withCTE(withCTE: WithCTE): LogicalPlan = {
904-
withNewChildren(children.map(withCTE.withNewPlan))
905-
}
906-
}
907-
908909
/**
909910
* @param order The ordering expressions
910911
* @param global True means global sorting apply for entire data set,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -393,15 +393,13 @@ case class WriteDelta(
393393
}
394394

395395
trait V2CreateTableAsSelectPlan
396-
extends V2CreateTablePlan
396+
extends V2CreateTablePlan
397397
with AnalysisOnlyCommand
398398
with CTEInChildren {
399399
def query: LogicalPlan
400400

401-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
402-
withNameAndQuery(
403-
newName = this.name,
404-
newQuery = withCTE.copy(plan = this.query))
401+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
402+
withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs))
405403
}
406404

407405
override lazy val resolved: Boolean = childrenResolved && {
@@ -1250,10 +1248,8 @@ case class AlterViewAs(
12501248
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
12511249
copy(child = newLeft, query = newRight)
12521250

1253-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
1254-
withNewChildrenInternal(
1255-
newLeft = this.left,
1256-
newRight = withCTE.copy(plan = this.right))
1251+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
1252+
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
12571253
}
12581254
}
12591255

@@ -1275,10 +1271,8 @@ case class CreateView(
12751271
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
12761272
copy(child = newLeft, query = newRight)
12771273

1278-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
1279-
withNewChildrenInternal(
1280-
newLeft = this.left,
1281-
newRight = withCTE.copy(plan = this.right))
1274+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
1275+
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
12821276
}
12831277
}
12841278

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3759,6 +3759,14 @@ object SQLConf {
37593759
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
37603760
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
37613761

3762+
val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands")
3763+
.internal()
3764+
.doc("If true, always inline the CTE relations for the queries in commands. This is the " +
3765+
"legacy behavior which may produce incorrect results because Spark may evaluate a CTE " +
3766+
"relation more than once, even if it's nondeterministic.")
3767+
.booleanConf
3768+
.createWithDefault(false)
3769+
37623770
val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
37633771
.internal()
37643772
.doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.catalog._
22-
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
22+
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
2323
import org.apache.spark.sql.errors.QueryExecutionErrors
2424
import org.apache.spark.sql.execution.datasources._
2525

@@ -77,7 +77,7 @@ case class InsertIntoDataSourceDirCommand(
7777
Seq.empty[Row]
7878
}
7979

80-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
81-
copy(query = withCTE.copy(plan = this.query))
80+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
81+
copy(query = WithCTE(query, cteDefs))
8282
}
8383
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.net.URI
2121

2222
import org.apache.spark.sql._
2323
import org.apache.spark.sql.catalyst.catalog._
24-
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
24+
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
2525
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils}
2626
import org.apache.spark.sql.errors.QueryCompilationErrors
2727
import org.apache.spark.sql.execution.CommandExecutionMode
@@ -234,7 +234,7 @@ case class CreateDataSourceTableAsSelectCommand(
234234
}
235235
}
236236

237-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
238-
copy(query = withCTE.copy(plan = this.query))
237+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
238+
copy(query = WithCTE(query, cteDefs))
239239
}
240240
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,8 +748,8 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan)
748748
result.toSeq
749749
}
750750

751-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
752-
copy(plan = withCTE.copy(plan = this.plan))
751+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
752+
copy(plan = WithCTE(plan, cteDefs))
753753
}
754754
}
755755

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType}
2929
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
3030
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression}
31-
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, LogicalPlan, Project, View, WithCTE}
31+
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
3232
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
3333
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
3434
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -216,8 +216,8 @@ case class CreateViewCommand(
216216
)
217217
}
218218

219-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
220-
copy(plan = withCTE.copy(plan = this.plan))
219+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
220+
copy(plan = WithCTE(plan, cteDefs))
221221
}
222222
}
223223

@@ -312,8 +312,8 @@ case class AlterViewAsCommand(
312312
session.sessionState.catalog.alterTable(updatedViewMeta)
313313
}
314314

315-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
316-
copy(query = withCTE.copy(plan = this.query))
315+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
316+
copy(query = WithCTE(query, cteDefs))
317317
}
318318
}
319319

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
1919

2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.plans.QueryPlan
22-
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
22+
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
2323
import org.apache.spark.sql.execution.command.LeafRunnableCommand
2424
import org.apache.spark.sql.sources.InsertableRelation
2525

@@ -48,7 +48,7 @@ case class InsertIntoDataSourceCommand(
4848
Seq.empty[Row]
4949
}
5050

51-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
52-
copy(query = withCTE.copy(plan = this.query))
51+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
52+
copy(query = WithCTE(query, cteDefs))
5353
}
5454
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT
2525
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2626
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
2727
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
28-
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
28+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2929
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3030
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3131
import org.apache.spark.sql.execution.SparkPlan
@@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
5757
catalogTable: Option[CatalogTable],
5858
fileIndex: Option[FileIndex],
5959
outputColumnNames: Seq[String])
60-
extends V1WriteCommand with CTEInChildren {
60+
extends V1WriteCommand {
6161

6262
private lazy val parameters = CaseInsensitiveMap(options)
6363

@@ -277,8 +277,4 @@ case class InsertIntoHadoopFsRelationCommand(
277277

278278
override protected def withNewChildInternal(
279279
newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = newChild)
280-
281-
override def withCTE(withCTE: WithCTE): LogicalPlan = {
282-
withNewChildInternal(withCTE.copy(plan = this.query))
283-
}
284280
}

0 commit comments

Comments
 (0)