Skip to content

Commit fe78a31

Browse files
committed
move ReuseMap to catalyst util, add docs
1 parent 6f6aab0 commit fe78a31

File tree

2 files changed

+73
-34
lines changed

2 files changed

+73
-34
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.util
19+
20+
import scala.collection.mutable.Map
21+
22+
import org.apache.spark.sql.catalyst.plans.QueryPlan
23+
import org.apache.spark.sql.types.StructType
24+
25+
/**
26+
* Map of canonicalized plans that can be used to find reuse possibilities.
27+
*
28+
* To avoid costly canonicalization of a plan:
29+
* - we use its schema first to check if it can be replaced to a reused one at all
30+
* - we insert it into the map of canonicalized plans only when at least 2 have the same schema
31+
*/
32+
class ReuseMap[T <: QueryPlan[_]] {
33+
// scalastyle:off structural.type
34+
private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]()
35+
// scalastyle:on structural.type
36+
37+
/**
38+
* Find a matching plan with the same canonicalized form in the map or add the new plan to the
39+
* map otherwise.
40+
*
41+
* @param plan the input plan
42+
* @return the matching plan or the input plan
43+
*/
44+
def lookup(plan: T): T = {
45+
val (firstSameSchemaPlan, sameResultPlans) = map.getOrElseUpdate(plan.schema, plan -> Map())
46+
if (firstSameSchemaPlan ne plan) {
47+
if (sameResultPlans.isEmpty) {
48+
sameResultPlans += firstSameSchemaPlan.canonicalized -> firstSameSchemaPlan
49+
}
50+
sameResultPlans.getOrElseUpdate(plan.canonicalized, plan)
51+
} else {
52+
plan
53+
}
54+
}
55+
56+
/**
57+
* Find a matching plan with the same canonicalized form in the map and apply `f` on it or add
58+
* the new plan to the map otherwise.
59+
*
60+
* @param plan the input plan
61+
* @param f the function to apply
62+
* @return the matching plan with `f` applied or the input plan
63+
*/
64+
def addOrElse[T2 >: T](plan: T, f: T => T2): T2 = {
65+
val found = lookup(plan)
66+
if (found eq plan) {
67+
plan
68+
} else {
69+
f(found)
70+
}
71+
}
72+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/Reuse.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,18 @@
1717

1818
package org.apache.spark.sql.execution.reuse
1919

20-
import scala.collection.mutable.Map
21-
import scala.language.existentials
22-
23-
import org.apache.spark.sql.catalyst.plans.QueryPlan
2420
import org.apache.spark.sql.catalyst.rules.Rule
2521
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
2622
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
2723
import org.apache.spark.sql.internal.SQLConf
28-
import org.apache.spark.sql.types.StructType
24+
import org.apache.spark.sql.util.ReuseMap
2925

3026
/**
3127
* Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
3228
* use the same exhange or subquery for all the references.
3329
*/
3430
case class ReuseExchangeAndSubquery(conf: SQLConf) extends Rule[SparkPlan] {
3531

36-
private class ReuseMap[T <: QueryPlan[_]] {
37-
// To avoid costly canonicalization of an exchange or a subquery:
38-
// - we use its schema first to check if it can be replaced to a reused one at all
39-
// - we insert it into the map of canonicalized plans only when at least 2 have the same schema
40-
private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]()
41-
42-
def lookup(plan: T): T = {
43-
val (firstSameSchemaPlan, sameResultPlans) = map.getOrElseUpdate(plan.schema, plan -> Map())
44-
if (firstSameSchemaPlan.ne(plan)) {
45-
if (sameResultPlans.isEmpty) {
46-
sameResultPlans +=
47-
firstSameSchemaPlan.canonicalized -> firstSameSchemaPlan
48-
}
49-
sameResultPlans.getOrElseUpdate(plan.canonicalized, plan)
50-
} else {
51-
plan
52-
}
53-
}
54-
55-
def addOrElse[T2 >: T](plan: T, f: T => T2): T2 = {
56-
val found = lookup(plan)
57-
if (found eq plan) {
58-
plan
59-
} else {
60-
f(found)
61-
}
62-
}
63-
}
64-
6532
def apply(plan: SparkPlan): SparkPlan = {
6633
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
6734
val exchanges = new ReuseMap[Exchange]()

0 commit comments

Comments
 (0)