Skip to content

Commit dbd8606

Browse files
committed
[SPARK-29375][SPARK-28940][SQL] Whole plan exchange and subquery reuse
Change-Id: Icb229a5b2c775c8c796420134115e3886c1526fe
1 parent 9b79251 commit dbd8606

File tree

9 files changed

+235
-96
lines changed

9 files changed

+235
-96
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,13 @@ object QueryPlan extends PredicateHelper {
336336
* `Attribute`, and replace it with `BoundReference` will cause error.
337337
*/
338338
def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
339+
type T2 = QueryPlan[_]
339340
e.transformUp {
340-
case s: PlanExpression[QueryPlan[_] @unchecked] =>
341+
case s: PlanExpression[T2 @unchecked] =>
341342
// Normalize the outer references in the subquery plan.
342343
val normalizedPlan = s.plan.transformAllExpressions {
343344
case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
344-
}
345+
}.canonicalized.asInstanceOf[T2]
345346
s.withNewPlan(normalizedPlan)
346347

347348
case ar: AttributeReference =>

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
3535
import org.apache.spark.sql.catalyst.util.truncatedString
3636
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
3737
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
38-
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
38+
import org.apache.spark.sql.execution.exchange.EnsureRequirements
39+
import org.apache.spark.sql.execution.reuse.WholePlanReuse
3940
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
4041
import org.apache.spark.sql.internal.SQLConf
4142
import org.apache.spark.sql.streaming.OutputMode
@@ -127,7 +128,7 @@ class QueryExecution(
127128

128129
protected def preparations: Seq[Rule[SparkPlan]] = {
129130
QueryExecution.preparations(sparkSession,
130-
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
131+
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)
131132
}
132133

133134
private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
@@ -326,7 +327,8 @@ object QueryExecution {
326327
*/
327328
private[execution] def preparations(
328329
sparkSession: SparkSession,
329-
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
330+
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
331+
subquery: Boolean): Seq[Rule[SparkPlan]] = {
330332
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
331333
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
332334
adaptiveExecutionRule.toSeq ++
@@ -336,10 +338,12 @@ object QueryExecution {
336338
EnsureRequirements(sparkSession.sessionState.conf),
337339
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
338340
sparkSession.sessionState.columnarRules),
339-
CollapseCodegenStages(sparkSession.sessionState.conf),
340-
ReuseExchange(sparkSession.sessionState.conf),
341-
ReuseSubquery(sparkSession.sessionState.conf)
342-
)
341+
CollapseCodegenStages(sparkSession.sessionState.conf)) ++
342+
(if (subquery) {
343+
Nil
344+
} else {
345+
Seq(WholePlanReuse(sparkSession.sessionState.conf))
346+
})
343347
}
344348

345349
/**
@@ -370,7 +374,7 @@ object QueryExecution {
370374
* Prepare the [[SparkPlan]] for execution.
371375
*/
372376
def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {
373-
prepareForExecution(preparations(spark), plan)
377+
prepareForExecution(preparations(spark, subquery = true), plan)
374378
}
375379

376380
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,12 @@
1717

1818
package org.apache.spark.sql.execution.exchange
1919

20-
import scala.collection.mutable
21-
import scala.collection.mutable.ArrayBuffer
22-
2320
import org.apache.spark.broadcast
2421
import org.apache.spark.rdd.RDD
2522
import org.apache.spark.sql.catalyst.InternalRow
2623
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder}
2724
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
28-
import org.apache.spark.sql.catalyst.rules.Rule
2925
import org.apache.spark.sql.execution._
30-
import org.apache.spark.sql.internal.SQLConf
31-
import org.apache.spark.sql.types.StructType
3226
import org.apache.spark.sql.vectorized.ColumnarBatch
3327

3428
/**
@@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
9589
|""".stripMargin
9690
}
9791
}
98-
99-
/**
100-
* Find out duplicated exchanges in the spark plan, then use the same exchange for all the
101-
* references.
102-
*/
103-
case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
104-
105-
def apply(plan: SparkPlan): SparkPlan = {
106-
if (!conf.exchangeReuseEnabled) {
107-
return plan
108-
}
109-
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
110-
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
111-
112-
// Replace a Exchange duplicate with a ReusedExchange
113-
def reuse: PartialFunction[Exchange, SparkPlan] = {
114-
case exchange: Exchange =>
115-
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
116-
val samePlan = sameSchema.find { e =>
117-
exchange.sameResult(e)
118-
}
119-
if (samePlan.isDefined) {
120-
// Keep the output of this exchange, the following plans require that to resolve
121-
// attributes.
122-
ReusedExchangeExec(exchange.output, samePlan.get)
123-
} else {
124-
sameSchema += exchange
125-
exchange
126-
}
127-
}
128-
129-
plan transformUp {
130-
case exchange: Exchange => reuse(exchange)
131-
} transformAllExpressions {
132-
// Lookup inside subqueries for duplicate exchanges
133-
case in: InSubqueryExec =>
134-
val newIn = in.plan.transformUp {
135-
case exchange: Exchange => reuse(exchange)
136-
}
137-
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
138-
}
139-
}
140-
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.reuse
19+
20+
import scala.collection.mutable.Map
21+
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
24+
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
25+
import org.apache.spark.sql.internal.SQLConf
26+
import org.apache.spark.sql.types.StructType
27+
28+
/**
29+
* Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
30+
* use the same exhange or subquery for all the references.
31+
*/
32+
case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] {
33+
34+
def apply(plan: SparkPlan): SparkPlan = {
35+
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
36+
// To avoid costly canonicalization of an exchange or a subquery:
37+
// - we use its schema first to check if it can be replaced to a reused one at all
38+
// - we insert it into the map of canonicalized plans only when at least 2 have the same
39+
// schema
40+
val exchanges = Map[StructType, (Exchange, Map[SparkPlan, Exchange])]()
41+
val subqueries = Map[StructType, (BaseSubqueryExec, Map[SparkPlan, BaseSubqueryExec])]()
42+
43+
def reuse(plan: SparkPlan): SparkPlan = plan.transformUp {
44+
case exchange: Exchange if conf.exchangeReuseEnabled =>
45+
val (firstSameSchemaExchange, sameResultExchanges) =
46+
exchanges.getOrElseUpdate(exchange.schema, exchange -> Map())
47+
if (firstSameSchemaExchange.ne(exchange)) {
48+
if (sameResultExchanges.isEmpty) {
49+
sameResultExchanges +=
50+
firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
51+
}
52+
val sameResultExchange =
53+
sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
54+
if (sameResultExchange.ne(exchange)) {
55+
ReusedExchangeExec(exchange.output, sameResultExchange)
56+
} else {
57+
exchange
58+
}
59+
} else {
60+
exchange
61+
}
62+
63+
case other => other.transformExpressionsUp {
64+
case sub: ExecSubqueryExpression =>
65+
val subquery = reuse(sub.plan).asInstanceOf[BaseSubqueryExec]
66+
if (conf.subqueryReuseEnabled) {
67+
val (firstSameSchemaSubquery, sameResultSubqueries) =
68+
subqueries.getOrElseUpdate(subquery.schema, subquery -> Map())
69+
if (firstSameSchemaSubquery.ne(subquery)) {
70+
if (sameResultSubqueries.isEmpty) {
71+
sameResultSubqueries +=
72+
firstSameSchemaSubquery.canonicalized -> firstSameSchemaSubquery
73+
}
74+
val sameResultSubquery =
75+
sameResultSubqueries.getOrElseUpdate(subquery.canonicalized, subquery)
76+
if (sameResultSubquery.ne(subquery)) {
77+
sub.withNewPlan(ReusedSubqueryExec(sameResultSubquery))
78+
} else {
79+
sub.withNewPlan(subquery)
80+
}
81+
} else {
82+
sub.withNewPlan(subquery)
83+
}
84+
} else {
85+
sub.withNewPlan(subquery)
86+
}
87+
}
88+
}
89+
90+
reuse(plan)
91+
} else {
92+
plan
93+
}
94+
}
95+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import scala.collection.mutable
21-
import scala.collection.mutable.ArrayBuffer
22-
2320
import org.apache.spark.broadcast.Broadcast
2421
import org.apache.spark.sql.SparkSession
2522
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
26-
import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
23+
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
2724
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2825
import org.apache.spark.sql.catalyst.rules.Rule
2926
import org.apache.spark.sql.internal.SQLConf
@@ -197,30 +194,3 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
197194
}
198195
}
199196
}
200-
201-
/**
202-
* Find out duplicated subqueries in the spark plan, then use the same subquery result for all the
203-
* references.
204-
*/
205-
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
206-
207-
def apply(plan: SparkPlan): SparkPlan = {
208-
if (!conf.subqueryReuseEnabled) {
209-
return plan
210-
}
211-
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.
212-
val subqueries = mutable.HashMap[StructType, ArrayBuffer[BaseSubqueryExec]]()
213-
plan transformAllExpressions {
214-
case sub: ExecSubqueryExpression =>
215-
val sameSchema =
216-
subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[BaseSubqueryExec]())
217-
val sameResult = sameSchema.find(_.sameResult(sub.plan))
218-
if (sameResult.isDefined) {
219-
sub.withNewPlan(ReusedSubqueryExec(sameResult.get))
220-
} else {
221-
sameSchema += sub.plan
222-
sub
223-
}
224-
}
225-
}
226-
}

sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1235,8 +1235,13 @@ abstract class DynamicPartitionPruningSuiteBase
12351235
val plan = df.queryExecution.executedPlan
12361236
val countSubqueryBroadcasts =
12371237
plan.collectWithSubqueries({ case _: SubqueryBroadcastExec => 1 }).sum
1238+
val countReusedSubqueryBroadcasts =
1239+
plan.collectWithSubqueries({
1240+
case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
1241+
}).sum
12381242

1239-
assert(countSubqueryBroadcasts == 2)
1243+
assert(countSubqueryBroadcasts == 1)
1244+
assert(countReusedSubqueryBroadcasts == 1)
12401245
}
12411246
}
12421247

@@ -1280,6 +1285,53 @@ abstract class DynamicPartitionPruningSuiteBase
12801285
)
12811286
}
12821287
}
1288+
1289+
test("Subquery reuse across the whole plan") {
1290+
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
1291+
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
1292+
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
1293+
withTable("df1", "df2") {
1294+
spark.range(1000)
1295+
.select(col("id"), col("id").as("k"))
1296+
.write
1297+
.partitionBy("k")
1298+
.format(tableFormat)
1299+
.mode("overwrite")
1300+
.saveAsTable("df1")
1301+
1302+
spark.range(100)
1303+
.select(col("id"), col("id").as("k"))
1304+
.write
1305+
.partitionBy("k")
1306+
.format(tableFormat)
1307+
.mode("overwrite")
1308+
.saveAsTable("df2")
1309+
1310+
val df = sql(
1311+
"""
1312+
|SELECT df1.id, df2.k
1313+
|FROM df1 JOIN df2 ON df1.k = df2.k
1314+
|WHERE df2.id < (SELECT max(id) FROM df2 WHERE id <= 2)
1315+
|""".stripMargin)
1316+
1317+
checkPartitionPruningPredicate(df, true, false)
1318+
1319+
checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
1320+
1321+
val plan = df.queryExecution.executedPlan
1322+
1323+
val subqueryIds = plan.collectWithSubqueries { case s: SubqueryExec => s.id }
1324+
val reusedSubqueryIds = plan.collectWithSubqueries {
1325+
case rs: ReusedSubqueryExec => rs.child.id
1326+
}
1327+
1328+
assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
1329+
assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly")
1330+
assert(reusedSubqueryIds.forall(subqueryIds.contains(_)),
1331+
"ReusedSubqueryExec should reuse an existing subquery")
1332+
}
1333+
}
1334+
}
12831335
}
12841336

12851337
class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,4 +1646,25 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
16461646
checkAnswer(df, df2)
16471647
checkAnswer(df, Nil)
16481648
}
1649+
1650+
test("Subquery reuse across the whole plan") {
1651+
val df = sql(
1652+
"""
1653+
|SELECT (SELECT avg(key) FROM testData), (SELECT (SELECT avg(key) FROM testData))
1654+
|FROM testData
1655+
|LIMIT 1
1656+
""".stripMargin)
1657+
1658+
val plan = df.queryExecution.executedPlan
1659+
1660+
val subqueryIds = plan.collectWithSubqueries { case s: SubqueryExec => s.id }
1661+
val reusedSubqueryIds = plan.collectWithSubqueries {
1662+
case rs: ReusedSubqueryExec => rs.child.id
1663+
}
1664+
1665+
assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
1666+
assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly")
1667+
assert(reusedSubqueryIds.forall(subqueryIds.contains(_)),
1668+
"ReusedSubqueryExec should reuse an existing subquery")
1669+
}
16491670
}

0 commit comments

Comments
 (0)