Skip to content

Commit ca782e7

Browse files
committed
SPARK-30598: Detect equijoins better
1 parent cfb1706 commit ca782e7

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
185185
// Find equi-join predicates that can be evaluated before the join, and thus can be used
186186
// as join keys.
187187
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
188-
val joinKeys = predicates.flatMap {
188+
val explicitJoinKeys = predicates.flatMap {
189189
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
190190
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
191191
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
@@ -203,6 +203,27 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
203203
)
204204
case other => None
205205
}
206+
207+
val literalEqualities = predicates.collect {
208+
case EqualTo(l, r: Literal) if canEvaluate(l, left) && l.deterministic =>
209+
r -> (Some(l), None)
210+
case EqualTo(l, r: Literal) if canEvaluate(l, right) && l.deterministic =>
211+
r -> (None, Some(l))
212+
case EqualTo(l: Literal, r) if canEvaluate(r, left) && r.deterministic =>
213+
l -> (Some(r), None)
214+
case EqualTo(l: Literal, r) if canEvaluate(r, right) && r.deterministic =>
215+
l -> (None, Some(r))
216+
}.groupBy(_._1).mapValues { v =>
217+
val (l, r) = v.map(_._2).unzip
218+
(l.flatten, r.flatten)
219+
}
220+
221+
val implicitJoinKeys = literalEqualities.values.flatMap {
222+
case (xs, ys) => for { x <- xs; y <- ys } yield (x, y)
223+
}
224+
225+
val joinKeys = (explicitJoinKeys.toSet ++ implicitJoinKeys).toSeq
226+
206227
val otherPredicates = predicates.filterNot {
207228
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
208229
case Equality(l, r) =>

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,4 +1082,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
10821082
assert(df2.join(df1, "id").collect().isEmpty)
10831083
}
10841084
}
1085+
1086+
test("Detect equijoins better") {
1087+
val df1 = Seq((1, 1), (2, 2)).toDF("c1", "c2")
1088+
val df2 = Seq((2, 2), (3, 3)).toDF("c1", "c2")
1089+
1090+
val explicitConstraints = df1("c1") === 2 && df2("c1") === 2
1091+
val implicitConstraints = df1("c1") === df2("c1")
1092+
1093+
val explicitDF = df1.join(df2, explicitConstraints && implicitConstraints, "FullOuter")
1094+
val implicitDF = df1.join(df2, explicitConstraints, "FullOuter")
1095+
1096+
checkAnswer(explicitDF, implicitDF)
1097+
assert(
1098+
explicitDF.queryExecution.sparkPlan === implicitDF.queryExecution.sparkPlan,
1099+
"Explicit and implicit plans should match.")
1100+
1101+
val explicitConstraints2 =
1102+
df1("c1") === 2 && df1("c2") === 2 && df2("c1") === 2 && df2("c2") === 2
1103+
val implicitConstraints2 = df1("c1") === df2("c1") && df1("c1") === df2("c2") &&
1104+
df1("c2") === df2("c1") && df1("c2") === df2("c2")
1105+
1106+
val explicitDF2 = df1.join(df2, explicitConstraints2 && implicitConstraints2, "FullOuter")
1107+
val implicitDF2 = df1.join(df2, explicitConstraints2, "FullOuter")
1108+
1109+
checkAnswer(explicitDF2, implicitDF2)
1110+
assert(
1111+
explicitDF2.queryExecution.sparkPlan === implicitDF2.queryExecution.sparkPlan,
1112+
"Explicit and implicit plans should match.")
1113+
}
10851114
}

0 commit comments

Comments
 (0)