Skip to content

Commit 08f6ea3

Browse files
committed
Do not push down join predicate that are ambiguous to both sides
1 parent f3ad94d commit 08f6ea3

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
6969
val canPushDownPredicate = (predicate: Expression) => {
7070
val replaced = replaceAlias(predicate, aliasMap)
7171
predicate.references.nonEmpty &&
72-
replaced.references.subsetOf(agg.child.outputSet ++ rightOp.outputSet)
72+
replaced.references.subsetOf(agg.child.outputSet ++ rightOp.outputSet) &&
73+
// references must not be ambiguous (i.e., not contained in both sides)
74+
!replaced.references.exists(attr =>
75+
agg.child.outputSet.contains(attr) && rightOp.outputSet.contains(attr))
7376
}
7477
val makeJoinCondition = (predicates: Seq[Expression]) => {
7578
replaceAlias(predicates.reduce(And), aliasMap)

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ class DataFrameSuite extends QueryTest
5959
with AdaptiveSparkPlanHelper {
6060
import testImplicits._
6161

62+
test("aggr antijoin") {
63+
val ids = Seq(1, 2, 3).toDF("id").distinct()
64+
val result = ids.withColumn("id", $"id" + 1)
65+
.join(ids, "id", "left_anti").collect()
66+
assert(result.length == 1)
67+
}
68+
6269
test("analysis error should be eagerly reported") {
6370
intercept[Exception] { testData.select("nonExistentName") }
6471
intercept[Exception] {

0 commit comments

Comments
 (0)