Skip to content

Commit 6ceb169

Browse files
rxinmarmbrus
authored andcommitted
[SPARK-8300] DataFrame hint for broadcast join.
Users can now do ```scala left.join(broadcast(right), "joinKey") ``` to give the query planner a hint that "right" DataFrame is small and should be broadcasted. Author: Reynold Xin <rxin@databricks.com> Closes #6751 from rxin/broadcastjoin-hint and squashes the following commits: 953eec2 [Reynold Xin] Code review feedback. 88752d8 [Reynold Xin] Fixed import. 8187b88 [Reynold Xin] [SPARK-8300] DataFrame hint for broadcast join.
1 parent f0dcbe8 commit 6ceb169

File tree

4 files changed

+59
-8
lines changed

4 files changed

+59
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ case class Join(
130130
}
131131
}
132132

133+
/**
134+
* A hint for the optimizer that we should broadcast the `child` if used in a join operator.
135+
*/
136+
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
137+
override def output: Seq[Attribute] = child.output
138+
}
139+
140+
133141
case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
134142
override def output: Seq[Attribute] = left.output
135143
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.catalyst.planning._
2222
import org.apache.spark.sql.catalyst.plans._
23-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
2424
import org.apache.spark.sql.catalyst.plans.physical._
2525
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
2626
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
@@ -52,6 +52,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
5252
}
5353
}
5454

55+
/**
56+
* Matches a plan whose output should be small enough to be used in broadcast join.
57+
*/
58+
object CanBroadcast {
59+
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
60+
case BroadcastHint(p) => Some(p)
61+
case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
62+
p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
63+
case _ => None
64+
}
65+
}
66+
5567
/**
5668
* Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
5769
* evaluated by matching hash keys.
@@ -80,15 +92,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
8092
}
8193

8294
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
83-
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
84-
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
85-
right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
95+
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
8696
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
8797

88-
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
89-
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
90-
left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
91-
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
98+
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
99+
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
92100

93101
// If the sort merge join option is set, we want to use sort merge join prior to hashjoin
94102
// for now let's support inner join first, then add outer join
@@ -329,6 +337,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
329337
case e @ EvaluatePython(udf, child, _) =>
330338
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
331339
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
340+
case BroadcastHint(child) => apply(child)
332341
case _ => Nil
333342
}
334343
}

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.Experimental
2424
import org.apache.spark.sql.catalyst.ScalaReflection
2525
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, Star}
2626
import org.apache.spark.sql.catalyst.expressions._
27+
import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
2728
import org.apache.spark.sql.types._
2829
import org.apache.spark.util.Utils
2930

@@ -565,6 +566,22 @@ object functions {
565566
array((colName +: colNames).map(col) : _*)
566567
}
567568

569+
/**
570+
* Marks a DataFrame as small enough for use in broadcast joins.
571+
*
572+
* The following example marks the right DataFrame for broadcast hash join using `joinKey`.
573+
* {{{
574+
* // left and right are DataFrames
575+
* left.join(broadcast(right), "joinKey")
576+
* }}}
577+
*
578+
* @group normal_funcs
579+
* @since 1.5.0
580+
*/
581+
def broadcast(df: DataFrame): DataFrame = {
582+
DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
583+
}
584+
568585
/**
569586
* Returns the first column that is not null.
570587
* {{{

sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql
1919

2020
import org.apache.spark.sql.TestData._
21+
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
2122
import org.apache.spark.sql.functions._
2223

2324
class DataFrameJoinSuite extends QueryTest {
@@ -93,4 +94,20 @@ class DataFrameJoinSuite extends QueryTest {
9394
left.join(right, left("key") === right("key")),
9495
Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
9596
}
97+
98+
test("broadcast join hint") {
99+
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
100+
val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
101+
102+
// equijoin - should be converted into broadcast join
103+
val plan1 = df1.join(broadcast(df2), "key").queryExecution.executedPlan
104+
assert(plan1.collect { case p: BroadcastHashJoin => p }.size === 1)
105+
106+
// no join key -- should not be a broadcast join
107+
val plan2 = df1.join(broadcast(df2)).queryExecution.executedPlan
108+
assert(plan2.collect { case p: BroadcastHashJoin => p }.size === 0)
109+
110+
// planner should not crash without a join
111+
broadcast(df1).queryExecution.executedPlan
112+
}
96113
}

0 commit comments

Comments
 (0)