Skip to content

Commit d28d573

Browse files
rxingatorsmile
authored andcommitted
[SPARK-21619][SQL] Fail the execution of canonicalized plans explicitly
## What changes were proposed in this pull request? Canonicalized plans are not supposed to be executed. I ran into a case in which there's some code that accidentally calls execute on a canonicalized plan. This patch throws a more explicit exception when that happens. ## How was this patch tested? Added a test case in SparkPlanSuite. Author: Reynold Xin <rxin@databricks.com> Closes #18828 from rxin/SPARK-21619.
1 parent c42d208 commit d28d573

File tree

14 files changed

+86
-18
lines changed

14 files changed

+86
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ case class HiveTableRelation(
438438

439439
def isPartitioned: Boolean = partitionCols.nonEmpty
440440

441-
override lazy val canonicalized: HiveTableRelation = copy(
441+
override def doCanonicalize(): HiveTableRelation = copy(
442442
tableMeta = tableMeta.copy(
443443
storage = CatalogStorageFormat.empty,
444444
createTime = -1
@@ -448,7 +448,8 @@ case class HiveTableRelation(
448448
},
449449
partitionCols = partitionCols.zipWithIndex.map {
450450
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
451-
})
451+
}
452+
)
452453

453454
override def computeStats(): Statistics = {
454455
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
180180

181181
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
182182

183+
/**
184+
* A private mutable variable to indicate whether this plan is the result of canonicalization.
185+
* This is used solely for making sure we wouldn't execute a canonicalized plan.
186+
* See [[canonicalized]] on how this is set.
187+
*/
188+
@transient private var _isCanonicalizedPlan: Boolean = false
189+
190+
protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan
191+
183192
/**
184193
* Returns a plan where a best effort attempt has been made to transform `this` in a way
185194
* that preserves the result but removes cosmetic variations (case sensitivity, ordering for
@@ -188,10 +197,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
188197
* Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
189198
* result.
190199
*
191-
* Some nodes should overwrite this to provide proper canonicalize logic, but they should remove
192-
* expressions cosmetic variations themselves.
200+
* Plan nodes that require special canonicalization should override [[doCanonicalize()]].
201+
* They should remove expressions cosmetic variations themselves.
202+
*/
203+
@transient final lazy val canonicalized: PlanType = {
204+
var plan = doCanonicalize()
205+
// If the plan has not been changed due to canonicalization, make a copy of it so we don't
206+
// mutate the original plan's _isCanonicalizedPlan flag.
207+
if (plan eq this) {
208+
plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
209+
}
210+
plan._isCanonicalizedPlan = true
211+
plan
212+
}
213+
214+
/**
215+
* Defines how the canonicalization should work for the current plan.
193216
*/
194-
lazy val canonicalized: PlanType = {
217+
protected def doCanonicalize(): PlanType = {
195218
val canonicalizedChildren = children.map(_.canonicalized)
196219
var id = -1
197220
mapExpressions {
@@ -213,7 +236,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
213236
}.withNewChildren(canonicalizedChildren)
214237
}
215238

216-
217239
/**
218240
* Returns true when the given query plan will return the same results as this query plan.
219241
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ case class SubqueryAlias(
760760
child: LogicalPlan)
761761
extends UnaryNode {
762762

763-
override lazy val canonicalized: LogicalPlan = child.canonicalized
763+
override def doCanonicalize(): LogicalPlan = child.canonicalized
764764

765765
override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
766766
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())
4141

4242
override def output: Seq[Attribute] = child.output
4343

44-
override lazy val canonicalized: LogicalPlan = child.canonicalized
44+
override def doCanonicalize(): LogicalPlan = child.canonicalized
4545
}
4646

4747

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ case class RowDataSourceScanExec(
139139
}
140140

141141
// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
142-
override lazy val canonicalized: SparkPlan =
142+
override def doCanonicalize(): SparkPlan =
143143
copy(
144144
fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
145145
rdd = null,
@@ -522,7 +522,7 @@ case class FileSourceScanExec(
522522
}
523523
}
524524

525-
override lazy val canonicalized: FileSourceScanExec = {
525+
override def doCanonicalize(): FileSourceScanExec = {
526526
FileSourceScanExec(
527527
relation,
528528
output.map(QueryPlan.normalizeExprId(_, output)),

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
111111
* Concrete implementations of SparkPlan should override `doExecute`.
112112
*/
113113
final def execute(): RDD[InternalRow] = executeQuery {
114+
if (isCanonicalizedPlan) {
115+
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
116+
}
114117
doExecute()
115118
}
116119

@@ -121,6 +124,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
121124
* Concrete implementations of SparkPlan should override `doExecuteBroadcast`.
122125
*/
123126
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
127+
if (isCanonicalizedPlan) {
128+
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
129+
}
124130
doExecuteBroadcast()
125131
}
126132

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
286286
* Create a [[ClearCacheCommand]] logical plan.
287287
*/
288288
override def visitClearCache(ctx: ClearCacheContext): LogicalPlan = withOrigin(ctx) {
289-
ClearCacheCommand
289+
ClearCacheCommand()
290290
}
291291

292292
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
350350
override lazy val metrics = Map(
351351
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
352352

353-
override lazy val canonicalized: SparkPlan = {
353+
override def doCanonicalize(): SparkPlan = {
354354
RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
355355
}
356356

sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,13 @@ case class UncacheTableCommand(
6666
/**
6767
* Clear all cached data from the in-memory cache.
6868
*/
69-
case object ClearCacheCommand extends RunnableCommand {
69+
case class ClearCacheCommand() extends RunnableCommand {
7070

7171
override def run(sparkSession: SparkSession): Seq[Row] = {
7272
sparkSession.catalog.clearCache()
7373
Seq.empty[Row]
7474
}
75+
76+
/** [[org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy()]] does not support 0-arg ctor. */
77+
override def makeCopy(newArgs: Array[AnyRef]): ClearCacheCommand = ClearCacheCommand()
7578
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ case class LogicalRelation(
3535
extends LeafNode with MultiInstanceRelation {
3636

3737
// Only care about relation when canonicalizing.
38-
override lazy val canonicalized: LogicalPlan = copy(
38+
override def doCanonicalize(): LogicalPlan = copy(
3939
output = output.map(QueryPlan.normalizeExprId(_, output)),
4040
catalogTable = None)
4141

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ case class BroadcastExchangeExec(
4848

4949
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
5050

51-
override lazy val canonicalized: SparkPlan = {
51+
override def doCanonicalize(): SparkPlan = {
5252
BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
5353
}
5454

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
5050
extends LeafExecNode {
5151

5252
// Ignore this wrapper for canonicalizing.
53-
override lazy val canonicalized: SparkPlan = child.canonicalized
53+
override def doCanonicalize(): SparkPlan = child.canonicalized
5454

5555
def doExecute(): RDD[InternalRow] = {
5656
child.execute()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.test.SharedSQLContext
22+
23+
class SparkPlanSuite extends QueryTest with SharedSQLContext {
24+
25+
test("SPARK-21619 execution of a canonicalized plan should fail") {
26+
val plan = spark.range(10).queryExecution.executedPlan.canonicalized
27+
28+
intercept[IllegalStateException] { plan.execute() }
29+
intercept[IllegalStateException] { plan.executeCollect() }
30+
intercept[IllegalStateException] { plan.executeCollectPublic() }
31+
intercept[IllegalStateException] { plan.executeToIterator() }
32+
intercept[IllegalStateException] { plan.executeBroadcast() }
33+
intercept[IllegalStateException] { plan.executeTake(1) }
34+
}
35+
36+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,11 @@ case class HiveTableScanExec(
203203
}
204204
}
205205

206-
override lazy val canonicalized: HiveTableScanExec = {
206+
override def doCanonicalize(): HiveTableScanExec = {
207207
val input: AttributeSeq = relation.output
208208
HiveTableScanExec(
209209
requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
210-
relation.canonicalized,
210+
relation.canonicalized.asInstanceOf[HiveTableRelation],
211211
QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
212212
}
213213

0 commit comments

Comments
 (0)