Skip to content

Commit 560fe1f

Browse files
AngersZhuuuudongjoon-hyun
authored andcommitted
[SPARK-32220][SQL] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result
### What changes were proposed in this pull request? In current Join Hint strategies, if we use SHUFFLE_REPLICATE_NL hint, it will directly convert join to Cartesian Product Join and loss join condition making result not correct. For Example: ``` spark-sql> select * from test4 order by a asc; 1 2 Time taken: 1.063 seconds, Fetched 4 row(s)20/07/08 14:11:25 INFO SparkSQLCLIDriver: Time taken: 1.063 seconds, Fetched 4 row(s) spark-sql>select * from test5 order by a asc 1 2 2 2 Time taken: 1.18 seconds, Fetched 24 row(s)20/07/08 14:13:59 INFO SparkSQLCLIDriver: Time taken: 1.18 seconds, Fetched 24 row(s)spar spark-sql>select /*+ shuffle_replicate_nl(test4) */ * from test4 join test5 where test4.a = test5.a order by test4.a asc ; 1 2 1 2 1 2 2 2 Time taken: 0.351 seconds, Fetched 2 row(s) 20/07/08 14:18:16 INFO SparkSQLCLIDriver: Time taken: 0.351 seconds, Fetched 2 row(s) ``` ### Why are the changes needed? Fix wrong data result ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Added UT Closes #29035 from AngersZhuuuu/SPARK-32220. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 578b90c commit 560fe1f

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
159159
// 4. Pick cartesian product if join type is inner like.
160160
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
161161
// other choice.
162-
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
162+
case p @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
163163
def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
164164
getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
165165
buildSide =>
@@ -199,7 +199,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
199199

200200
def createCartesianProduct() = {
201201
if (joinType.isInstanceOf[InnerLike]) {
202-
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
202+
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), p.condition)))
203203
} else {
204204
None
205205
}

sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,4 +570,31 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
570570
assert(joinHints == expectedHints)
571571
}
572572
}
573+
574+
test("SPARK-32220: Non Cartesian Product Join Result Correct with SHUFFLE_REPLICATE_NL hint") {
575+
withTempView("t1", "t2") {
576+
Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1")
577+
Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2")
578+
val df1 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key = t2.key")
579+
val df2 = sql("SELECT * from t1 join t2 ON t1.key = t2.key")
580+
assert(df1.collect().size == df2.collect().size)
581+
582+
val df3 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2")
583+
val df4 = sql("SELECT * from t1 join t2")
584+
assert(df3.collect().size == df4.collect().size)
585+
586+
val df5 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key < t2.key")
587+
val df6 = sql("SELECT * from t1 join t2 ON t1.key < t2.key")
588+
assert(df5.collect().size == df6.collect().size)
589+
590+
val df7 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key < 2")
591+
val df8 = sql("SELECT * from t1 join t2 ON t1.key < 2")
592+
assert(df7.collect().size == df8.collect().size)
593+
594+
595+
val df9 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t2.key < 2")
596+
val df10 = sql("SELECT * from t1 join t2 ON t2.key < 2")
597+
assert(df9.collect().size == df10.collect().size)
598+
}
599+
}
573600
}

0 commit comments

Comments
 (0)