Skip to content

Commit 17b7cce

Browse files
committed
address review comments
1 parent f51e31d commit 17b7cce

File tree

7 files changed

+64
-42
lines changed

7 files changed

+64
-42
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.plans.logical._
21+
import org.apache.spark.sql.catalyst.rules.Rule
22+
23+
/**
24+
* Replaces [[ResolvedHint]] operators from the plan. Move the [[HintInfo]] to associated [[Join]]
25+
* operators, otherwise remove it if no [[Join]] operator is matched.
26+
*/
27+
object EliminateResolvedHint extends Rule[LogicalPlan] {
28+
// This is also called in the beginning of the optimization phase, and as a result
29+
// is using transformUp rather than resolveOperators.
30+
def apply(plan: LogicalPlan): LogicalPlan = {
31+
val pulledUp = plan transformUp {
32+
case j: Join =>
33+
val leftHint = mergeHints(collectHints(j.left))
34+
val rightHint = mergeHints(collectHints(j.right))
35+
j.copy(hint = JoinHint(leftHint, rightHint))
36+
}
37+
pulledUp.transform {
38+
case h: ResolvedHint => h.child
39+
}
40+
}
41+
42+
private def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = {
43+
hints.reduceOption((h1, h2) => HintInfo(
44+
broadcast = h1.broadcast || h2.broadcast))
45+
}
46+
47+
private def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
48+
plan match {
49+
case h: ResolvedHint => collectHints(h.child) :+ h.hints
50+
case u: UnaryNode => collectHints(u.child)
51+
// TODO revisit this logic:
52+
// except and intersect are semi/anti-joins which won't return more data then
53+
// their left argument, so the broadcast hint should be propagated here
54+
case i: Intersect => collectHints(i.left)
55+
case e: Except => collectHints(e.left)
56+
case _ => Seq.empty
57+
}
58+
}
59+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
193193
*/
194194
def nonExcludableRules: Seq[String] =
195195
EliminateDistinct.ruleName ::
196+
EliminateResolvedHint.ruleName ::
196197
EliminateSubqueryAliases.ruleName ::
197198
EliminateView.ruleName ::
198199
ReplaceExpressions.ruleName ::

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ object PhysicalOperation extends PredicateHelper {
100100
* value).
101101
*/
102102
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
103-
/** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
103+
/** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild, joinHint) */
104104
type ReturnType =
105105
(JoinType, Seq[Expression], Seq[Expression],
106106
Option[Expression], LogicalPlan, LogicalPlan, JoinHint)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.expressions.Attribute
21-
import org.apache.spark.sql.catalyst.rules.Rule
2221

2322
/**
2423
* A general hint for the child that is not yet resolved. This node is generated by the parser and
@@ -36,6 +35,7 @@ case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan
3635

3736
/**
3837
* A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]].
38+
* This node will be eliminated before optimization starts.
3939
*/
4040
case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())
4141
extends UnaryNode {
@@ -80,41 +80,3 @@ case class HintInfo(broadcast: Boolean = false) {
8080
if (hints.isEmpty) "none" else hints.mkString("(", ", ", ")")
8181
}
8282
}
83-
84-
/**
85-
* Replaces [[ResolvedHint]] operators from the plan. Move the [[HintInfo]] to associated [[Join]]
86-
* operators, otherwise remove it if no [[Join]] operator is matched.
87-
*/
88-
object EliminateResolvedHint extends Rule[LogicalPlan] {
89-
// This is also called in the beginning of the optimization phase, and as a result
90-
// is using transformUp rather than resolveOperators.
91-
def apply(plan: LogicalPlan): LogicalPlan = {
92-
val pulledUp = plan transformUp {
93-
case j: Join =>
94-
val leftHint = mergeHints(collectHints(j.left))
95-
val rightHint = mergeHints(collectHints(j.right))
96-
j.copy(hint = JoinHint(leftHint, rightHint))
97-
}
98-
pulledUp.transform {
99-
case h: ResolvedHint => h.child
100-
}
101-
}
102-
103-
def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = {
104-
hints.reduceOption((h1, h2) => HintInfo(
105-
broadcast = h1.broadcast || h2.broadcast))
106-
}
107-
108-
def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
109-
plan match {
110-
case h: ResolvedHint => collectHints(h.child) :+ h.hints
111-
case u: UnaryNode => collectHints(u.child)
112-
// TODO revisit this logic:
113-
// except and intersect are semi/anti-joins which won't return more data then
114-
// their left argument, so the broadcast hint should be propagated here
115-
case i: Intersect => collectHints(i.left)
116-
case e: Except => collectHints(e.left)
117-
case _ => Seq.empty
118-
}
119-
}
120-
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
3434
import org.apache.spark.sql.catalyst.errors._
3535
import org.apache.spark.sql.catalyst.expressions._
3636
import org.apache.spark.sql.catalyst.plans.JoinType
37-
import org.apache.spark.sql.catalyst.plans.logical.JoinHint
3837
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
3938
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
4039
import org.apache.spark.sql.catalyst.util.truncatedString

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
2121
import org.apache.spark.sql.catalyst.dsl.plans._
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
2323
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, PlanTest}
24-
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, EliminateResolvedHint, LocalRelation, LogicalPlan}
24+
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan}
2525
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2626
import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan}
2727
import org.apache.spark.sql.internal.SQLConf.{CBO_ENABLED, JOIN_REORDER_ENABLED}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans
2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.sql.catalyst.dsl.expressions._
2222
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
2324
import org.apache.spark.sql.catalyst.plans.logical._
2425
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2526
import org.apache.spark.sql.catalyst.util._

0 commit comments

Comments
 (0)