Skip to content

Commit 3308496

Browse files
maryannxueJackey Lee
authored andcommitted
[SPARK-26065][SQL] Change query hint from a LogicalPlan to a field
## What changes were proposed in this pull request? The existing query hint implementation relies on a logical plan node `ResolvedHint` to store query hints in logical plans, and on `Statistics` in physical plans. Since `ResolvedHint` is not really a logical operator and can break the pattern matching for existing and future optimization rules, it is a issue to the Optimizer as the old `AnalysisBarrier` was to the Analyzer. Given the fact that all our query hints are either 1) a join hint, i.e., broadcast hint; or 2) a re-partition hint, which is indeed an operator, we only need to add a hint field on the Join plan and that will be a good enough solution for the current hint usage. This PR is to let `Join` node have a hint for its left sub-tree and another hint for its right sub-tree and each hint is a merged result of all the effective hints specified in the corresponding sub-tree. The "effectiveness" of a hint, i.e., whether that hint should be propagated to the `Join` node, is currently consistent with the hint propagation rules originally implemented in the `Statistics` approach. Note that the `ResolvedHint` node still has to live through the analysis stage because of the `Dataset` interface, but it will be got rid of and moved to the `Join` node in the "pre-optimization" stage. This PR also introduces a change in how hints work with join reordering. Before this PR, hints would stop join reordering. For example, in "a.join(b).join(c).hint("broadcast").join(d)", the broadcast hint would stop d from participating in the cost-based join reordering while still allowing reordering from under the hint node. After this PR, though, the broadcast hint will not interfere with join reordering at all, and after reordering if a relation associated with a hint stays unchanged or equivalent to the original relation, the hint will be retained, otherwise will be discarded. For example, the original plan is like "a.join(b).hint("broadcast").join(c).hint("broadcast").join(d)", thus the join order is "a JOIN b JOIN c JOIN d". So if after reordering the join order becomes "a JOIN b JOIN (c JOIN d)", the plan will be like "a.join(b).hint("broadcast").join(c.join(d))"; but if after reordering the join order becomes "a JOIN c JOIN b JOIN d", the plan will be like "a.join(c).join(b).hint("broadcast").join(d)". ## How was this patch tested? Added new tests. Closes apache#23036 from maryannxue/query-hint. Authored-by: maryannxue <maryannxue@apache.org> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
1 parent 695db1e commit 3308496

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+680
-283
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ class Analyzer(
943943
failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF")
944944

945945
// To resolve duplicate expression IDs for Join and Intersect
946-
case j @ Join(left, right, _, _) if !j.duplicateResolved =>
946+
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
947947
j.copy(right = dedupRight(left, right))
948948
case i @ Intersect(left, right, _) if !i.duplicateResolved =>
949949
i.copy(right = dedupRight(left, right))
@@ -2249,13 +2249,14 @@ class Analyzer(
22492249
*/
22502250
object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
22512251
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
2252-
case j @ Join(left, right, UsingJoin(joinType, usingCols), _)
2252+
case j @ Join(left, right, UsingJoin(joinType, usingCols), _, hint)
22532253
if left.resolved && right.resolved && j.duplicateResolved =>
2254-
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
2255-
case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
2254+
commonNaturalJoinProcessing(left, right, joinType, usingCols, None, hint)
2255+
case j @ Join(left, right, NaturalJoin(joinType), condition, hint)
2256+
if j.resolvedExceptNatural =>
22562257
// find common column names from both sides
22572258
val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
2258-
commonNaturalJoinProcessing(left, right, joinType, joinNames, condition)
2259+
commonNaturalJoinProcessing(left, right, joinType, joinNames, condition, hint)
22592260
}
22602261
}
22612262

@@ -2360,7 +2361,8 @@ class Analyzer(
23602361
right: LogicalPlan,
23612362
joinType: JoinType,
23622363
joinNames: Seq[String],
2363-
condition: Option[Expression]) = {
2364+
condition: Option[Expression],
2365+
hint: JoinHint) = {
23642366
val leftKeys = joinNames.map { keyName =>
23652367
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
23662368
throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the left " +
@@ -2401,7 +2403,7 @@ class Analyzer(
24012403
sys.error("Unsupported natural join type " + joinType)
24022404
}
24032405
// use Project to trim unnecessary fields
2404-
Project(projectList, Join(left, right, joinType, newCondition))
2406+
Project(projectList, Join(left, right, joinType, newCondition, hint))
24052407
}
24062408

24072409
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ trait CheckAnalysis extends PredicateHelper {
172172
failAnalysis("Null-aware predicate sub-queries cannot be used in nested " +
173173
s"conditions: $condition")
174174

175-
case j @ Join(_, _, _, Some(condition)) if condition.dataType != BooleanType =>
175+
case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType =>
176176
failAnalysis(
177177
s"join condition '${condition.sql}' " +
178178
s"of type ${condition.dataType.catalogString} is not a boolean.")
@@ -609,7 +609,7 @@ trait CheckAnalysis extends PredicateHelper {
609609
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
610610

611611
// Join can host correlated expressions.
612-
case j @ Join(left, right, joinType, _) =>
612+
case j @ Join(left, right, joinType, _, _) =>
613613
joinType match {
614614
// Inner join, like Filter, can be anywhere.
615615
case _: InnerLike =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
4141
*/
4242
def isWatermarkInJoinKeys(plan: LogicalPlan): Boolean = {
4343
plan match {
44-
case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) =>
44+
case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _, _) =>
4545
(leftKeys ++ rightKeys).exists {
4646
case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
4747
case _ => false

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ object UnsupportedOperationChecker {
229229
throwError("dropDuplicates is not supported after aggregation on a " +
230230
"streaming DataFrame/Dataset")
231231

232-
case Join(left, right, joinType, condition) =>
232+
case Join(left, right, joinType, condition, _) =>
233233

234234
joinType match {
235235

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ package object dsl {
325325
otherPlan: LogicalPlan,
326326
joinType: JoinType = Inner,
327327
condition: Option[Expression] = None): LogicalPlan =
328-
Join(logicalPlan, otherPlan, joinType, condition)
328+
Join(logicalPlan, otherPlan, joinType, condition, JoinHint.NONE)
329329

330330
def cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: Encoder](
331331
otherPlan: LogicalPlan,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper}
2424
import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, JoinType}
25-
import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project}
25+
import org.apache.spark.sql.catalyst.plans.logical._
2626
import org.apache.spark.sql.catalyst.rules.Rule
2727
import org.apache.spark.sql.internal.SQLConf
2828

@@ -31,6 +31,40 @@ import org.apache.spark.sql.internal.SQLConf
3131
* Cost-based join reorder.
3232
* We may have several join reorder algorithms in the future. This class is the entry of these
3333
* algorithms, and chooses which one to use.
34+
*
35+
* Note that join strategy hints, e.g. the broadcast hint, do not interfere with the reordering.
36+
* Such hints will be applied on the equivalent counterparts (i.e., join between the same relations
37+
* regardless of the join order) of the original nodes after reordering.
38+
* For example, the plan before reordering is like:
39+
*
40+
* Join
41+
* / \
42+
* Hint1 t4
43+
* /
44+
* Join
45+
* / \
46+
* Join t3
47+
* / \
48+
* Hint2 t2
49+
* /
50+
* t1
51+
*
52+
* The original join order as illustrated above is "((t1 JOIN t2) JOIN t3) JOIN t4", and after
53+
* reordering, the new join order is "((t1 JOIN t3) JOIN t2) JOIN t4", so the new plan will be like:
54+
*
55+
* Join
56+
* / \
57+
* Hint1 t4
58+
* /
59+
* Join
60+
* / \
61+
* Join t2
62+
* / \
63+
* t1 t3
64+
*
65+
* "Hint1" is applied on "(t1 JOIN t3) JOIN t2" as it is equivalent to the original hinted node,
66+
* "(t1 JOIN t2) JOIN t3"; while "Hint2" has disappeared from the new plan since there is no
67+
* equivalent node to "t1 JOIN t2".
3468
*/
3569
object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
3670

@@ -40,24 +74,30 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
4074
if (!conf.cboEnabled || !conf.joinReorderEnabled) {
4175
plan
4276
} else {
77+
// Use a map to track the hints on the join items.
78+
val hintMap = new mutable.HashMap[AttributeSet, HintInfo]
4379
val result = plan transformDown {
4480
// Start reordering with a joinable item, which is an InnerLike join with conditions.
45-
case j @ Join(_, _, _: InnerLike, Some(cond)) =>
46-
reorder(j, j.output)
47-
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond)))
81+
case j @ Join(_, _, _: InnerLike, Some(cond), _) =>
82+
reorder(j, j.output, hintMap)
83+
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), _))
4884
if projectList.forall(_.isInstanceOf[Attribute]) =>
49-
reorder(p, p.output)
85+
reorder(p, p.output, hintMap)
5086
}
51-
52-
// After reordering is finished, convert OrderedJoin back to Join
53-
result transformDown {
54-
case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond)
87+
// After reordering is finished, convert OrderedJoin back to Join.
88+
result transform {
89+
case OrderedJoin(left, right, jt, cond) =>
90+
val joinHint = JoinHint(hintMap.get(left.outputSet), hintMap.get(right.outputSet))
91+
Join(left, right, jt, cond, joinHint)
5592
}
5693
}
5794
}
5895

59-
private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
60-
val (items, conditions) = extractInnerJoins(plan)
96+
private def reorder(
97+
plan: LogicalPlan,
98+
output: Seq[Attribute],
99+
hintMap: mutable.HashMap[AttributeSet, HintInfo]): LogicalPlan = {
100+
val (items, conditions) = extractInnerJoins(plan, hintMap)
61101
val result =
62102
// Do reordering if the number of items is appropriate and join conditions exist.
63103
// We also need to check if costs of all items can be evaluated.
@@ -75,27 +115,31 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
75115
* Extracts items of consecutive inner joins and join conditions.
76116
* This method works for bushy trees and left/right deep trees.
77117
*/
78-
private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = {
118+
private def extractInnerJoins(
119+
plan: LogicalPlan,
120+
hintMap: mutable.HashMap[AttributeSet, HintInfo]): (Seq[LogicalPlan], Set[Expression]) = {
79121
plan match {
80-
case Join(left, right, _: InnerLike, Some(cond)) =>
81-
val (leftPlans, leftConditions) = extractInnerJoins(left)
82-
val (rightPlans, rightConditions) = extractInnerJoins(right)
122+
case Join(left, right, _: InnerLike, Some(cond), hint) =>
123+
hint.leftHint.foreach(hintMap.put(left.outputSet, _))
124+
hint.rightHint.foreach(hintMap.put(right.outputSet, _))
125+
val (leftPlans, leftConditions) = extractInnerJoins(left, hintMap)
126+
val (rightPlans, rightConditions) = extractInnerJoins(right, hintMap)
83127
(leftPlans ++ rightPlans, splitConjunctivePredicates(cond).toSet ++
84128
leftConditions ++ rightConditions)
85-
case Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond)))
129+
case Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), _))
86130
if projectList.forall(_.isInstanceOf[Attribute]) =>
87-
extractInnerJoins(j)
131+
extractInnerJoins(j, hintMap)
88132
case _ =>
89133
(Seq(plan), Set())
90134
}
91135
}
92136

93137
private def replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan = plan match {
94-
case j @ Join(left, right, jt: InnerLike, Some(cond)) =>
138+
case j @ Join(left, right, jt: InnerLike, Some(cond), _) =>
95139
val replacedLeft = replaceWithOrderedJoin(left)
96140
val replacedRight = replaceWithOrderedJoin(right)
97141
OrderedJoin(replacedLeft, replacedRight, jt, Some(cond))
98-
case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond))) =>
142+
case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), _)) =>
99143
p.copy(child = replaceWithOrderedJoin(j))
100144
case _ =>
101145
plan
@@ -295,7 +339,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
295339
} else {
296340
(otherPlan, onePlan)
297341
}
298-
val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
342+
val newJoin = Join(left, right, Inner, joinConds.reduceOption(And), JoinHint.NONE)
299343
val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
300344
val remainingConds = conditions -- collectedJoinConds
301345
val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.plans.logical._
21+
import org.apache.spark.sql.catalyst.rules.Rule
22+
23+
/**
24+
* Replaces [[ResolvedHint]] operators from the plan. Move the [[HintInfo]] to associated [[Join]]
25+
* operators, otherwise remove it if no [[Join]] operator is matched.
26+
*/
27+
object EliminateResolvedHint extends Rule[LogicalPlan] {
28+
// This is also called in the beginning of the optimization phase, and as a result
29+
// is using transformUp rather than resolveOperators.
30+
def apply(plan: LogicalPlan): LogicalPlan = {
31+
val pulledUp = plan transformUp {
32+
case j: Join =>
33+
val leftHint = mergeHints(collectHints(j.left))
34+
val rightHint = mergeHints(collectHints(j.right))
35+
j.copy(hint = JoinHint(leftHint, rightHint))
36+
}
37+
pulledUp.transform {
38+
case h: ResolvedHint => h.child
39+
}
40+
}
41+
42+
private def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = {
43+
hints.reduceOption((h1, h2) => HintInfo(
44+
broadcast = h1.broadcast || h2.broadcast))
45+
}
46+
47+
private def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
48+
plan match {
49+
case h: ResolvedHint => collectHints(h.child) :+ h.hints
50+
case u: UnaryNode => collectHints(u.child)
51+
// TODO revisit this logic:
52+
// except and intersect are semi/anti-joins which won't return more data then
53+
// their left argument, so the broadcast hint should be propagated here
54+
case i: Intersect => collectHints(i.left)
55+
case e: Except => collectHints(e.left)
56+
case _ => Seq.empty
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)