Skip to content

Commit 35a69cd

Browse files
committed
removed isnotnull of compound expressions from constraints.
added a support for operator Not for all the binary operators.
1 parent 9cba8a3 commit 35a69cd

File tree

3 files changed

+33
-30
lines changed

3 files changed

+33
-30
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -608,12 +608,12 @@ object NullPropagation extends Rule[LogicalPlan] {
608608
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
609609
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
610610
case filter @ Filter(condition, child) =>
611-
// We generate a list of additional isNotNull filters from the operator's existing
612-
// non-compound constraints but remove those that are either already part of the filter
613-
// condition or are part of the operator's child constraints.
614-
val newIsNotNullConstraints =
615-
filter.constraints.filter(isNullFilteringForNonCompoundExpr) --
616-
(child.constraints ++ splitConjunctivePredicates(condition))
611+
// We generate a list of additional isNotNull filters from the operator's existing constraints
612+
// but remove those that are either already part of the filter condition or are part of the
613+
// operator's child constraints.
614+
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
615+
(child.constraints ++ splitConjunctivePredicates(condition))
616+
617617
if (newIsNotNullConstraints.nonEmpty) {
618618
Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
619619
} else {
@@ -622,11 +622,11 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
622622

623623
case join @ Join(left, right, joinType, condition) =>
624624
val leftIsNotNullConstraints = join.constraints
625-
.filter(isNullFilteringForNonCompoundExpr)
625+
.filter(_.isInstanceOf[IsNotNull])
626626
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
627627
val rightIsNotNullConstraints =
628628
join.constraints
629-
.filter(isNullFilteringForNonCompoundExpr)
629+
.filter(_.isInstanceOf[IsNotNull])
630630
.filter(_.references.subsetOf(right.outputSet)) -- right.constraints
631631
val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
632632
Filter(leftIsNotNullConstraints.reduce(And), left)
@@ -644,12 +644,6 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
644644
join
645645
}
646646
}
647-
private def isNullFilteringForNonCompoundExpr(exp: Expression): Boolean = {
648-
exp match {
649-
case c: IsNotNull if c.child.isInstanceOf[AttributeReference] => true
650-
case _ => false
651-
}
652-
}
653647
}
654648

655649
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
4646
private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = {
4747
// Currently we only propagate constraints if the condition consists of equality
4848
// and ranges. For all other cases, we return an empty set of constraints
49-
constraints.map {
50-
case EqualTo(l, r) =>
51-
Set(IsNotNull(l), IsNotNull(r))
52-
case GreaterThan(l, r) =>
53-
Set(IsNotNull(l), IsNotNull(r))
54-
case GreaterThanOrEqual(l, r) =>
55-
Set(IsNotNull(l), IsNotNull(r))
56-
case LessThan(l, r) =>
57-
Set(IsNotNull(l), IsNotNull(r))
58-
case LessThanOrEqual(l, r) =>
59-
Set(IsNotNull(l), IsNotNull(r))
60-
case Not(EqualTo(l, r)) =>
61-
Set(IsNotNull(l), IsNotNull(r))
62-
case _ =>
63-
Set.empty[Expression]
64-
}.foldLeft(Set.empty[Expression])(_ union _.toSet)
49+
var isNotNullConstraints = Set.empty[Expression]
50+
constraints.collect {
51+
case b @ BinaryComparison(l, r) if !b.isInstanceOf[EqualNullSafe] =>
52+
if (l.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(l)
53+
if (r.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(r)
54+
case Not(b @ BinaryComparison(l, r)) if !b.isInstanceOf[EqualNullSafe] =>
55+
if (l.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(l)
56+
if (r.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(r)
57+
}
58+
isNotNullConstraints
6559
}
6660

6761
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,19 @@ class ConstraintPropagationSuite extends SparkFunSuite {
217217
IsNotNull(resolveColumn(tr, "a")),
218218
IsNotNull(resolveColumn(tr, "b")))))
219219
}
220+
221+
test("IsNotNull constraints of compound expressions in filters") {
222+
val tr = LocalRelation('a.int, 'b.string, 'c.int)
223+
verifyConstraints(tr
224+
.where('a.attr + 'c.attr > 10).analyze.constraints,
225+
ExpressionSet(Seq(resolveColumn(tr, "a") + resolveColumn(tr, "c") > 10)))
226+
}
227+
228+
test("IsNotNull constraints of BinaryComparison in Not in filters") {
229+
val tr = LocalRelation('a.int, 'b.string, 'c.int)
230+
verifyConstraints(tr
231+
.where(!('a.attr < 10)).analyze.constraints,
232+
ExpressionSet(Seq(IsNotNull(resolveColumn(tr, "a")),
233+
Not(resolveColumn(tr, "a") < 10))))
234+
}
220235
}

0 commit comments

Comments
 (0)