@@ -31,59 +31,54 @@ import org.apache.spark.sql.types.StructType
31
31
*/
32
32
case class ReuseExchangeAndSubquery (conf : SQLConf ) extends Rule [SparkPlan ] {
33
33
34
+ private class ReuseCache [T <: SparkPlan ] {
35
+ // To avoid costly canonicalization of an exchange or a subquery:
36
+ // - we use its schema first to check if it can be replaced to a reused one at all
37
+ // - we insert it into the map of canonicalized plans only when at least 2 have the same schema
38
+ private val cache = Map [StructType , (T , Map [SparkPlan , T ])]()
39
+
40
+ def lookup (plan : T ): T = {
41
+ val (firstSameSchemaPlan, sameResultPlans) = cache.getOrElseUpdate(plan.schema, plan -> Map ())
42
+ if (firstSameSchemaPlan.ne(plan)) {
43
+ if (sameResultPlans.isEmpty) {
44
+ sameResultPlans += firstSameSchemaPlan.canonicalized -> firstSameSchemaPlan
45
+ }
46
+ sameResultPlans.getOrElseUpdate(plan.canonicalized, plan)
47
+ } else {
48
+ plan
49
+ }
50
+ }
51
+ }
52
+
34
53
def apply (plan : SparkPlan ): SparkPlan = {
35
54
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
36
- // To avoid costly canonicalization of an exchange or a subquery:
37
- // - we use its schema first to check if it can be replaced to a reused one at all
38
- // - we insert it into the map of canonicalized plans only when at least 2 have the same
39
- // schema
40
- val exchanges = Map [StructType , (Exchange , Map [SparkPlan , Exchange ])]()
41
- val subqueries = Map [StructType , (BaseSubqueryExec , Map [SparkPlan , BaseSubqueryExec ])]()
55
+ val exchanges = new ReuseCache [Exchange ]()
56
+ val subqueries = new ReuseCache [BaseSubqueryExec ]()
42
57
43
58
def reuse (plan : SparkPlan ): SparkPlan = plan.transformUp {
44
59
case exchange : Exchange if conf.exchangeReuseEnabled =>
45
- val (firstSameSchemaExchange, sameResultExchanges) =
46
- exchanges.getOrElseUpdate(exchange.schema, exchange -> Map ())
47
- if (firstSameSchemaExchange.ne(exchange)) {
48
- if (sameResultExchanges.isEmpty) {
49
- sameResultExchanges +=
50
- firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
51
- }
52
- val sameResultExchange =
53
- sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
54
- if (sameResultExchange.ne(exchange)) {
55
- ReusedExchangeExec (exchange.output, sameResultExchange)
56
- } else {
57
- exchange
58
- }
60
+ val cached = exchanges.lookup(exchange)
61
+ if (cached.ne(exchange)) {
62
+ ReusedExchangeExec (exchange.output, cached)
59
63
} else {
60
64
exchange
61
65
}
62
66
63
67
case other => other.transformExpressionsUp {
64
68
case sub : ExecSubqueryExpression =>
65
69
val subquery = reuse(sub.plan).asInstanceOf [BaseSubqueryExec ]
66
- if (conf.subqueryReuseEnabled) {
67
- val (firstSameSchemaSubquery, sameResultSubqueries) =
68
- subqueries.getOrElseUpdate(subquery.schema, subquery -> Map ())
69
- if (firstSameSchemaSubquery.ne(subquery)) {
70
- if (sameResultSubqueries.isEmpty) {
71
- sameResultSubqueries +=
72
- firstSameSchemaSubquery.canonicalized -> firstSameSchemaSubquery
73
- }
74
- val sameResultSubquery =
75
- sameResultSubqueries.getOrElseUpdate(subquery.canonicalized, subquery)
76
- if (sameResultSubquery.ne(subquery)) {
77
- sub.withNewPlan(ReusedSubqueryExec (sameResultSubquery))
70
+ sub.withNewPlan(
71
+ if (conf.subqueryReuseEnabled) {
72
+ val cached = subqueries.lookup(subquery)
73
+ if (cached.ne(subquery)) {
74
+ ReusedSubqueryExec (cached)
78
75
} else {
79
- sub.withNewPlan( subquery)
76
+ subquery
80
77
}
81
78
} else {
82
- sub.withNewPlan( subquery)
79
+ subquery
83
80
}
84
- } else {
85
- sub.withNewPlan(subquery)
86
- }
81
+ )
87
82
}
88
83
}
89
84
0 commit comments