Skip to content

[SPARK-28220][SQL] Improve PropagateEmptyRelation to support join with false condition #31857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
* - Join with false condition.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/
Expand Down Expand Up @@ -71,24 +73,32 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
// Joins on empty LocalRelations generated from streaming sources are not eliminated
// as stateful streaming joins need to perform other state management operations other than
// just processing the input data.
case p @ Join(_, _, joinType, _, _)
case p @ Join(_, _, joinType, conditionOpt, _)
if !p.children.exists(_.isStreaming) =>
val isLeftEmpty = isEmptyLocalRelation(p.left)
val isRightEmpty = isEmptyLocalRelation(p.right)
if (isLeftEmpty || isRightEmpty) {
val isFalseCondition = conditionOpt match {
case Some(FalseLiteral) => true
case _ => false
}
if (isLeftEmpty || isRightEmpty || isFalseCondition) {
joinType match {
case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
case LeftSemi if isRightEmpty => empty(p)
case LeftAnti if isRightEmpty => p.left
case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
case LeftAnti if isRightEmpty | isFalseCondition => p.left
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
case LeftOuter | FullOuter if isRightEmpty =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isRightEmpty => empty(p)
case RightOuter | FullOuter if isLeftEmpty =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case LeftOuter if isFalseCondition =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isFalseCondition =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case _ => p
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -155,6 +156,28 @@ class PropagateEmptyRelationSuite extends PlanTest {
}
}

test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") {
val testcases = Seq(
(Inner, Some(LocalRelation('a.int, 'b.int))),
(Cross, Some(LocalRelation('a.int, 'b.int))),
(LeftOuter,
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
(RightOuter,
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
(FullOuter, None),
(LeftAnti, Some(testRelation1)),
(LeftSemi, Some(LocalRelation('a.int)))
)

testcases.foreach { case (jt, answer) =>
val query = testRelation1.join(testRelation2, joinType = jt, condition = Some(FalseLiteral))
val optimized = Optimize.execute(query.analyze)
val correctAnswer =
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
comparePlans(optimized, correctAnswer)
}
}

test("propagate empty relation through UnaryNode") {
val query = testRelation1
.where(false)
Expand Down