Skip to content

Commit 5362f08

Browse files
c21cloud-fan
authored andcommitted
[SPARK-34593][SQL] Preserve broadcast nested loop join partitioning and ordering
### What changes were proposed in this pull request? `BroadcastNestedLoopJoinExec` does not preserve `outputPartitioning` and `outputOrdering` right now. But it can preserve the streamed side partitioning and ordering when possible. This can help avoid shuffle and sort in later stage, if there's join and aggregation in the query. See example queries in added unit test in `JoinSuite.scala`. In addition, fix a bunch of minor places in `BroadcastNestedLoopJoinExec.scala` for better style and readability. ### Why are the changes needed? Avoid shuffle and sort for certain complicated query shape. Better query performance can be achieved. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `JoinSuite.scala`. Closes #31708 from c21/nested-join. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 4e43819 commit 5362f08

File tree

10 files changed

+699
-632
lines changed

10 files changed

+699
-632
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ case class BroadcastNestedLoopJoinExec(
4141
override lazy val metrics = Map(
4242
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
4343

44-
/** BuildRight means the right relation <=> the broadcast relation. */
44+
/** BuildRight means the right relation is the broadcast relation. */
4545
private val (streamed, broadcast) = buildSide match {
4646
case BuildRight => (left, right)
4747
case BuildLeft => (right, left)
4848
}
4949

5050
override def simpleStringWithNodeId(): String = {
5151
val opId = ExplainUtils.getOpId(this)
52-
s"$nodeName $joinType ${buildSide} ($opId)".trim
52+
s"$nodeName $joinType $buildSide ($opId)".trim
5353
}
5454

5555
override def requiredChildDistribution: Seq[Distribution] = buildSide match {
@@ -59,10 +59,22 @@ case class BroadcastNestedLoopJoinExec(
5959
UnspecifiedDistribution :: BroadcastDistribution(IdentityBroadcastMode) :: Nil
6060
}
6161

62+
override def outputPartitioning: Partitioning = (joinType, buildSide) match {
63+
case (_: InnerLike, _) | (LeftOuter, BuildRight) | (RightOuter, BuildLeft) |
64+
(LeftSemi, BuildRight) | (LeftAnti, BuildRight) => streamed.outputPartitioning
65+
case _ => UnknownPartitioning(left.outputPartitioning.numPartitions)
66+
}
67+
68+
override def outputOrdering: Seq[SortOrder] = (joinType, buildSide) match {
69+
case (_: InnerLike, _) | (LeftOuter, BuildRight) | (RightOuter, BuildLeft) |
70+
(LeftSemi, BuildRight) | (LeftAnti, BuildRight) => streamed.outputOrdering
71+
case _ => Nil
72+
}
73+
6274
private[this] def genResultProjection: UnsafeProjection = joinType match {
63-
case LeftExistence(j) =>
75+
case LeftExistence(_) =>
6476
UnsafeProjection.create(output, output)
65-
case other =>
77+
case _ =>
6678
// Always put the stream side on left to simplify implementation
6779
// both of left and right side could be null
6880
UnsafeProjection.create(
@@ -183,7 +195,7 @@ case class BroadcastNestedLoopJoinExec(
183195
* The implementation for these joins:
184196
*
185197
* LeftSemi with BuildRight
186-
* Anti with BuildRight
198+
* LeftAnti with BuildRight
187199
*/
188200
private def leftExistenceJoin(
189201
relation: Broadcast[Array[InternalRow]],
@@ -238,7 +250,6 @@ case class BroadcastNestedLoopJoinExec(
238250
* ExistenceJoin with BuildLeft
239251
*/
240252
private def defaultJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
241-
/** All rows that either match both-way, or rows from streamed joined with nulls. */
242253
val streamRdd = streamed.execute()
243254

244255
val matchedBuildRows = streamRdd.mapPartitionsInternal { streamedIter =>
@@ -275,7 +286,7 @@ case class BroadcastNestedLoopJoinExec(
275286
i += 1
276287
}
277288
return sparkContext.makeRDD(buf)
278-
case j: ExistenceJoin =>
289+
case _: ExistenceJoin =>
279290
val buf: CompactBuffer[InternalRow] = new CompactBuffer()
280291
var i = 0
281292
val rel = relation.value
@@ -296,7 +307,7 @@ case class BroadcastNestedLoopJoinExec(
296307
i += 1
297308
}
298309
return sparkContext.makeRDD(notMatched)
299-
case o =>
310+
case _ =>
300311
}
301312

302313
val notMatchedBroadcastRows: Seq[InternalRow] = {
@@ -358,7 +369,7 @@ case class BroadcastNestedLoopJoinExec(
358369
leftExistenceJoin(broadcastedRelation, exists = true)
359370
case (LeftAnti, BuildRight) =>
360371
leftExistenceJoin(broadcastedRelation, exists = false)
361-
case (j: ExistenceJoin, BuildRight) =>
372+
case (_: ExistenceJoin, BuildRight) =>
362373
existenceJoin(broadcastedRelation)
363374
case _ =>
364375
/**

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q61.sf100/explain.txt

Lines changed: 74 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,73 @@
11
== Physical Plan ==
2-
* Sort (70)
3-
+- Exchange (69)
4-
+- * Project (68)
5-
+- BroadcastNestedLoopJoin Inner BuildRight (67)
6-
:- * HashAggregate (47)
7-
: +- Exchange (46)
8-
: +- * HashAggregate (45)
9-
: +- * Project (44)
10-
: +- * BroadcastHashJoin Inner BuildRight (43)
11-
: :- * Project (31)
12-
: : +- * BroadcastHashJoin Inner BuildRight (30)
13-
: : :- * Project (24)
14-
: : : +- * BroadcastHashJoin Inner BuildRight (23)
15-
: : : :- * Project (17)
16-
: : : : +- * BroadcastHashJoin Inner BuildRight (16)
17-
: : : : :- * Project (10)
18-
: : : : : +- * BroadcastHashJoin Inner BuildRight (9)
19-
: : : : : :- * Filter (3)
20-
: : : : : : +- * ColumnarToRow (2)
21-
: : : : : : +- Scan parquet default.store_sales (1)
22-
: : : : : +- BroadcastExchange (8)
23-
: : : : : +- * Project (7)
24-
: : : : : +- * Filter (6)
25-
: : : : : +- * ColumnarToRow (5)
26-
: : : : : +- Scan parquet default.date_dim (4)
27-
: : : : +- BroadcastExchange (15)
28-
: : : : +- * Project (14)
29-
: : : : +- * Filter (13)
30-
: : : : +- * ColumnarToRow (12)
31-
: : : : +- Scan parquet default.item (11)
32-
: : : +- BroadcastExchange (22)
33-
: : : +- * Project (21)
34-
: : : +- * Filter (20)
35-
: : : +- * ColumnarToRow (19)
36-
: : : +- Scan parquet default.promotion (18)
37-
: : +- BroadcastExchange (29)
38-
: : +- * Project (28)
39-
: : +- * Filter (27)
40-
: : +- * ColumnarToRow (26)
41-
: : +- Scan parquet default.store (25)
42-
: +- BroadcastExchange (42)
43-
: +- * Project (41)
44-
: +- * BroadcastHashJoin Inner BuildRight (40)
45-
: :- * Filter (34)
46-
: : +- * ColumnarToRow (33)
47-
: : +- Scan parquet default.customer (32)
48-
: +- BroadcastExchange (39)
49-
: +- * Project (38)
50-
: +- * Filter (37)
51-
: +- * ColumnarToRow (36)
52-
: +- Scan parquet default.customer_address (35)
53-
+- BroadcastExchange (66)
54-
+- * HashAggregate (65)
55-
+- Exchange (64)
56-
+- * HashAggregate (63)
57-
+- * Project (62)
58-
+- * BroadcastHashJoin Inner BuildRight (61)
59-
:- * Project (59)
60-
: +- * BroadcastHashJoin Inner BuildRight (58)
61-
: :- * Project (56)
62-
: : +- * BroadcastHashJoin Inner BuildRight (55)
63-
: : :- * Project (53)
64-
: : : +- * BroadcastHashJoin Inner BuildRight (52)
65-
: : : :- * Filter (50)
66-
: : : : +- * ColumnarToRow (49)
67-
: : : : +- Scan parquet default.store_sales (48)
68-
: : : +- ReusedExchange (51)
69-
: : +- ReusedExchange (54)
70-
: +- ReusedExchange (57)
71-
+- ReusedExchange (60)
2+
* Sort (69)
3+
+- * Project (68)
4+
+- BroadcastNestedLoopJoin Inner BuildRight (67)
5+
:- * HashAggregate (47)
6+
: +- Exchange (46)
7+
: +- * HashAggregate (45)
8+
: +- * Project (44)
9+
: +- * BroadcastHashJoin Inner BuildRight (43)
10+
: :- * Project (31)
11+
: : +- * BroadcastHashJoin Inner BuildRight (30)
12+
: : :- * Project (24)
13+
: : : +- * BroadcastHashJoin Inner BuildRight (23)
14+
: : : :- * Project (17)
15+
: : : : +- * BroadcastHashJoin Inner BuildRight (16)
16+
: : : : :- * Project (10)
17+
: : : : : +- * BroadcastHashJoin Inner BuildRight (9)
18+
: : : : : :- * Filter (3)
19+
: : : : : : +- * ColumnarToRow (2)
20+
: : : : : : +- Scan parquet default.store_sales (1)
21+
: : : : : +- BroadcastExchange (8)
22+
: : : : : +- * Project (7)
23+
: : : : : +- * Filter (6)
24+
: : : : : +- * ColumnarToRow (5)
25+
: : : : : +- Scan parquet default.date_dim (4)
26+
: : : : +- BroadcastExchange (15)
27+
: : : : +- * Project (14)
28+
: : : : +- * Filter (13)
29+
: : : : +- * ColumnarToRow (12)
30+
: : : : +- Scan parquet default.item (11)
31+
: : : +- BroadcastExchange (22)
32+
: : : +- * Project (21)
33+
: : : +- * Filter (20)
34+
: : : +- * ColumnarToRow (19)
35+
: : : +- Scan parquet default.promotion (18)
36+
: : +- BroadcastExchange (29)
37+
: : +- * Project (28)
38+
: : +- * Filter (27)
39+
: : +- * ColumnarToRow (26)
40+
: : +- Scan parquet default.store (25)
41+
: +- BroadcastExchange (42)
42+
: +- * Project (41)
43+
: +- * BroadcastHashJoin Inner BuildRight (40)
44+
: :- * Filter (34)
45+
: : +- * ColumnarToRow (33)
46+
: : +- Scan parquet default.customer (32)
47+
: +- BroadcastExchange (39)
48+
: +- * Project (38)
49+
: +- * Filter (37)
50+
: +- * ColumnarToRow (36)
51+
: +- Scan parquet default.customer_address (35)
52+
+- BroadcastExchange (66)
53+
+- * HashAggregate (65)
54+
+- Exchange (64)
55+
+- * HashAggregate (63)
56+
+- * Project (62)
57+
+- * BroadcastHashJoin Inner BuildRight (61)
58+
:- * Project (59)
59+
: +- * BroadcastHashJoin Inner BuildRight (58)
60+
: :- * Project (56)
61+
: : +- * BroadcastHashJoin Inner BuildRight (55)
62+
: : :- * Project (53)
63+
: : : +- * BroadcastHashJoin Inner BuildRight (52)
64+
: : : :- * Filter (50)
65+
: : : : +- * ColumnarToRow (49)
66+
: : : : +- Scan parquet default.store_sales (48)
67+
: : : +- ReusedExchange (51)
68+
: : +- ReusedExchange (54)
69+
: +- ReusedExchange (57)
70+
+- ReusedExchange (60)
7271

7372

7473
(1) Scan parquet default.store_sales
@@ -121,15 +120,15 @@ Input [7]: [ss_item_sk#1, ss_customer_sk#2, ss_store_sk#3, ss_promo_sk#4, ss_ext
121120
Output [2]: [i_item_sk#12, i_category#13]
122121
Batched: true
123122
Location [not included in comparison]/{warehouse_dir}/item]
124-
PushedFilters: [IsNotNull(i_category), EqualTo(i_category,Jewelry), IsNotNull(i_item_sk)]
123+
PushedFilters: [IsNotNull(i_category), EqualTo(i_category,Jewelry ), IsNotNull(i_item_sk)]
125124
ReadSchema: struct<i_item_sk:int,i_category:string>
126125

127126
(12) ColumnarToRow [codegen id : 2]
128127
Input [2]: [i_item_sk#12, i_category#13]
129128

130129
(13) Filter [codegen id : 2]
131130
Input [2]: [i_item_sk#12, i_category#13]
132-
Condition : ((isnotnull(i_category#13) AND (i_category#13 = Jewelry)) AND isnotnull(i_item_sk#12))
131+
Condition : ((isnotnull(i_category#13) AND (i_category#13 = Jewelry )) AND isnotnull(i_item_sk#12))
133132

134133
(14) Project [codegen id : 2]
135134
Output [1]: [i_item_sk#12]
@@ -378,21 +377,17 @@ Join condition: None
378377
Output [3]: [promotions#33, total#38, CheckOverflow((promote_precision(CheckOverflow((promote_precision(cast(promotions#33 as decimal(15,4))) / promote_precision(cast(total#38 as decimal(15,4)))), DecimalType(35,20), true)) * 100.00000000000000000000), DecimalType(38,19), true) AS ((CAST(promotions AS DECIMAL(15,4)) / CAST(total AS DECIMAL(15,4))) * 100)#40]
379378
Input [2]: [promotions#33, total#38]
380379

381-
(69) Exchange
382-
Input [3]: [promotions#33, total#38, ((CAST(promotions AS DECIMAL(15,4)) / CAST(total AS DECIMAL(15,4))) * 100)#40]
383-
Arguments: rangepartitioning(promotions#33 ASC NULLS FIRST, total#38 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#41]
384-
385-
(70) Sort [codegen id : 17]
380+
(69) Sort [codegen id : 16]
386381
Input [3]: [promotions#33, total#38, ((CAST(promotions AS DECIMAL(15,4)) / CAST(total AS DECIMAL(15,4))) * 100)#40]
387382
Arguments: [promotions#33 ASC NULLS FIRST, total#38 ASC NULLS FIRST], true, 0
388383

389384
===== Subqueries =====
390385

391386
Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#6 IN dynamicpruning#7
392-
ReusedExchange (71)
387+
ReusedExchange (70)
393388

394389

395-
(71) ReusedExchange [Reuses operator id: 8]
390+
(70) ReusedExchange [Reuses operator id: 8]
396391
Output [1]: [d_date_sk#8]
397392

398393
Subquery:2 Hosting operator id = 48 Hosting Expression = ss_sold_date_sk#6 IN dynamicpruning#7

0 commit comments

Comments
 (0)