-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join #41398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ea32dff
532964b
6089beb
505e234
2940070
d325d97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP | |
assertShuffleHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) | ||
assertShuffleHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this situation, t1 is smaller than t2, so it now picks t1. Before it was not possible to pick t1 and so t2 was picked. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I meant that the original test coverage ( |
||
assertShuffleHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft) | ||
|
||
// Determine build side based on hint | ||
assertShuffleHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft) | ||
assertShuffleHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight) | ||
|
||
// Shuffle-hash hint prioritized over shuffle-replicate-nl hint | ||
assertShuffleHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)), | ||
|
@@ -507,8 +513,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP | |
BuildLeft) | ||
|
||
// Shuffle-hash hint specified but not doable | ||
assertBroadcastHashJoin( | ||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight) | ||
szehon-ho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertBroadcastNLJoin( | ||
sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft) | ||
} | ||
|
@@ -606,13 +610,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP | |
withLogAppender(hintAppender, level = Some(Level.WARN)) { | ||
assertShuffleMergeJoin( | ||
df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType)) | ||
} | ||
|
||
val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) | ||
.filter(_.contains("is not supported in the query:")) | ||
assert(logs.size === 1) | ||
logs.foreach(log => | ||
assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) | ||
} | ||
|
||
Seq("left_semi", "left_anti").foreach { joinType => | ||
val hintAppender = new LogAppender(s"join hint build side check for $joinType") | ||
withLogAppender(hintAppender, level = Some(Level.WARN)) { | ||
assertShuffleMergeJoin( | ||
df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType)) | ||
} | ||
|
||
val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) | ||
.filter(_.contains("is not supported in the query:")) | ||
assert(logs.size === 2) | ||
assert(logs.size === 1) | ||
logs.foreach(log => | ||
assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) | ||
} | ||
|
@@ -622,8 +638,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP | |
withLogAppender(hintAppender, level = Some(Level.WARN)) { | ||
assertBroadcastHashJoin( | ||
df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight) | ||
assertShuffleHashJoin( | ||
df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight) | ||
} | ||
|
||
val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) | ||
|
@@ -636,12 +650,10 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP | |
withLogAppender(hintAppender, level = Some(Level.WARN)) { | ||
assertShuffleMergeJoin( | ||
df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType)) | ||
assertShuffleMergeJoin( | ||
df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType)) | ||
huaxingao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) | ||
.filter(_.contains("is not supported in the query:")) | ||
assert(logs.size === 2) | ||
assert(logs.size === 1) | ||
logs.foreach(log => | ||
assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join."))) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1249,6 +1249,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan | |
} | ||
} | ||
|
||
test("SPARK-36612: Support left outer join build left or right outer join build right in " + | ||
"shuffled hash join") { | ||
val inputDFs = Seq( | ||
// Test unique join key | ||
(spark.range(10).selectExpr("id as k1"), | ||
spark.range(30).selectExpr("id as k2"), | ||
$"k1" === $"k2"), | ||
// Test non-unique join key | ||
(spark.range(10).selectExpr("id % 5 as k1"), | ||
spark.range(30).selectExpr("id % 5 as k2"), | ||
$"k1" === $"k2"), | ||
// Test empty build side | ||
(spark.range(10).selectExpr("id as k1").filter("k1 < -1"), | ||
spark.range(30).selectExpr("id as k2"), | ||
$"k1" === $"k2"), | ||
// Test empty stream side | ||
(spark.range(10).selectExpr("id as k1"), | ||
spark.range(30).selectExpr("id as k2").filter("k2 < -1"), | ||
$"k1" === $"k2"), | ||
// Test empty build and stream side | ||
(spark.range(10).selectExpr("id as k1").filter("k1 < -1"), | ||
spark.range(30).selectExpr("id as k2").filter("k2 < -1"), | ||
$"k1" === $"k2"), | ||
// Test string join key | ||
(spark.range(10).selectExpr("cast(id * 3 as string) as k1"), | ||
spark.range(30).selectExpr("cast(id as string) as k2"), | ||
$"k1" === $"k2"), | ||
// Test build side at right | ||
(spark.range(30).selectExpr("cast(id / 3 as string) as k1"), | ||
spark.range(10).selectExpr("cast(id as string) as k2"), | ||
$"k1" === $"k2"), | ||
// Test NULL join key | ||
(spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), | ||
spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), | ||
$"k1" === $"k2"), | ||
(spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), | ||
spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), | ||
$"k1" === $"k2"), | ||
// Test multiple join keys | ||
(spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( | ||
"value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), | ||
spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( | ||
"value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), | ||
$"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") | ||
) | ||
|
||
// test left outer with left side build | ||
inputDFs.foreach { case (df1, df2, joinExprs) => | ||
val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") | ||
assert(collect(smjDF.queryExecution.executedPlan) { | ||
case _: SortMergeJoinExec => true }.size === 1) | ||
val smjResult = smjDF.collect() | ||
|
||
val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") | ||
assert(collect(shjDF.queryExecution.executedPlan) { | ||
case _: ShuffledHashJoinExec => true | ||
}.size === 1) | ||
// Same result between shuffled hash join and sort merge join | ||
checkAnswer(shjDF, smjResult) | ||
} | ||
|
||
// test right outer with right side build | ||
inputDFs.foreach { case (df2, df1, joinExprs) => | ||
val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") | ||
assert(collect(smjDF.queryExecution.executedPlan) { | ||
case _: SortMergeJoinExec => true }.size === 1) | ||
val smjResult = smjDF.collect() | ||
|
||
val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") | ||
assert(collect(shjDF.queryExecution.executedPlan) { | ||
case _: ShuffledHashJoinExec => true | ||
}.size === 1) | ||
Comment on lines
+1320
to
+1323
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to verify build side of |
||
// Same result between shuffled hash join and sort merge join | ||
checkAnswer(shjDF, smjResult) | ||
} | ||
} | ||
|
||
test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") { | ||
val inputDFs = Seq( | ||
// Test empty build side for inner join | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments to explain why the ordering can't be preserved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added a comment as per my understanding (please let me know if I misunderstand something)
My thought was, because the second iteration on the build-side (for outer-join semantic) is on a hashedRelation, the result cannot be in order.