Skip to content

Commit 61a99f6

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-4937][SQL] Normalizes conjunctions and disjunctions to eliminate common predicates
This PR is a simplified version of several filter optimization rules introduced in #3778 authored by scwf. Newly introduced optimizations include: 1. `a && a` => `a` 2. `a || a` => `a` 3. `(a || b || c || ...) && (a || b || d || ...)` => `a && b && (c || d || ...)` The 3rd rule is particularly useful for optimizing the following query, which is planned into a cartesian product ```sql SELECT * FROM t1, t2 WHERE (t1.key = t2.key AND t1.value > 10) OR (t1.key = t2.key AND t2.value < 20) ``` to the following one, which is planned into an equi-join: ```sql SELECT * FROM t1, t2 WHERE t1.key = t2.key AND (t1.value > 10 OR t2.value < 20) ``` The example above is quite artificial, but common predicates are likely to appear in real life complex queries (like the one mentioned in #3778). A difference between this PR and #3778 is that these optimizations are not limited to `Filter`, but are generalized to all logical plan nodes. Thanks to scwf for bringing up these optimizations, and chenghao-intel for the generalization suggestion. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3784) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3784 from liancheng/normalize-filters and squashes the following commits: caca560 [Cheng Lian] Moves filter normalization into BooleanSimplification rule 4ab3a58 [Cheng Lian] Fixes test failure, adds more tests 5d54349 [Cheng Lian] Fixes typo in comment 2abbf8e [Cheng Lian] Forgot our sacred Apache licence header... cf95639 [Cheng Lian] Adds an optimization rule for filter normalization
1 parent a75dd83 commit 61a99f6

File tree

4 files changed

+110
-8
lines changed

4 files changed

+110
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import scala.collection.immutable.HashSet
2120
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
2221
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2322
import org.apache.spark.sql.catalyst.types.BooleanType
@@ -48,6 +47,14 @@ trait PredicateHelper {
4847
}
4948
}
5049

50+
protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
51+
condition match {
52+
case Or(cond1, cond2) =>
53+
splitDisjunctivePredicates(cond1) ++ splitDisjunctivePredicates(cond2)
54+
case other => other :: Nil
55+
}
56+
}
57+
5158
/**
5259
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
5360
* can be used to determine when is is acceptable to move expression evaluation within a query

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,16 @@ object OptimizeIn extends Rule[LogicalPlan] {
294294
}
295295

296296
/**
297-
* Simplifies boolean expressions where the answer can be determined without evaluating both sides.
297+
* Simplifies boolean expressions:
298+
*
299+
* 1. Simplifies expressions whose answer can be determined without evaluating both sides.
300+
* 2. Eliminates / extracts common factors.
301+
* 3. Removes `Not` operator.
302+
*
298303
* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
299304
* is only safe when evaluations of expressions does not result in side effects.
300305
*/
301-
object BooleanSimplification extends Rule[LogicalPlan] {
306+
object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
302307
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
303308
case q: LogicalPlan => q transformExpressionsUp {
304309
case and @ And(left, right) =>
@@ -307,7 +312,9 @@ object BooleanSimplification extends Rule[LogicalPlan] {
307312
case (l, Literal(true, BooleanType)) => l
308313
case (Literal(false, BooleanType), _) => Literal(false)
309314
case (_, Literal(false, BooleanType)) => Literal(false)
310-
case (_, _) => and
315+
// a && a && a ... => a
316+
case _ if splitConjunctivePredicates(and).distinct.size == 1 => left
317+
case _ => and
311318
}
312319

313320
case or @ Or(left, right) =>
@@ -316,7 +323,19 @@ object BooleanSimplification extends Rule[LogicalPlan] {
316323
case (_, Literal(true, BooleanType)) => Literal(true)
317324
case (Literal(false, BooleanType), r) => r
318325
case (l, Literal(false, BooleanType)) => l
319-
case (_, _) => or
326+
// a || a || a ... => a
327+
case _ if splitDisjunctivePredicates(or).distinct.size == 1 => left
328+
// (a && b && c && ...) || (a && b && d && ...) => a && b && (c || d || ...)
329+
case _ =>
330+
val lhsSet = splitConjunctivePredicates(left).toSet
331+
val rhsSet = splitConjunctivePredicates(right).toSet
332+
val common = lhsSet.intersect(rhsSet)
333+
334+
(lhsSet.diff(common).reduceOption(And) ++ rhsSet.diff(common).reduceOption(And))
335+
.reduceOption(Or)
336+
.map(_ :: common.toList)
337+
.getOrElse(common.toList)
338+
.reduce(And)
320339
}
321340

322341
case not @ Not(exp) =>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
21+
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or}
22+
import org.apache.spark.sql.catalyst.plans.PlanTest
23+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
24+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
25+
26+
// For implicit conversions
27+
import org.apache.spark.sql.catalyst.dsl.expressions._
28+
import org.apache.spark.sql.catalyst.dsl.plans._
29+
30+
class NormalizeFiltersSuite extends PlanTest {
31+
object Optimize extends RuleExecutor[LogicalPlan] {
32+
val batches = Seq(
33+
Batch("AnalysisNodes", Once,
34+
EliminateAnalysisOperators),
35+
Batch("NormalizeFilters", FixedPoint(100),
36+
BooleanSimplification,
37+
SimplifyFilters))
38+
}
39+
40+
val relation = LocalRelation('a.int, 'b.int, 'c.string)
41+
42+
def checkExpression(original: Expression, expected: Expression): Unit = {
43+
val actual = Optimize(relation.where(original)).collect { case f: Filter => f.condition }.head
44+
val result = (actual, expected) match {
45+
case (And(l1, r1), And(l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
46+
case (Or (l1, r1), Or (l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
47+
case (lhs, rhs) => lhs fastEquals rhs
48+
}
49+
50+
assert(result, s"$actual isn't equivalent to $expected")
51+
}
52+
53+
test("a && a => a") {
54+
checkExpression('a === 1 && 'a === 1, 'a === 1)
55+
checkExpression('a === 1 && 'a === 1 && 'a === 1, 'a === 1)
56+
}
57+
58+
test("a || a => a") {
59+
checkExpression('a === 1 || 'a === 1, 'a === 1)
60+
checkExpression('a === 1 || 'a === 1 || 'a === 1, 'a === 1)
61+
}
62+
63+
test("(a && b) || (a && c) => a && (b || c)") {
64+
checkExpression(
65+
('a === 1 && 'a < 10) || ('a > 2 && 'a === 1),
66+
('a === 1) && ('a < 10 || 'a > 2))
67+
68+
checkExpression(
69+
('a < 1 && 'b > 2 && 'c.isNull) || ('a < 1 && 'c === "hello" && 'b > 2),
70+
('c.isNull || 'c === "hello") && 'a < 1 && 'b > 2)
71+
}
72+
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,20 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be
105105

106106
test(query) {
107107
val schemaRdd = sql(query)
108-
assertResult(expectedQueryResult.toArray, "Wrong query result") {
108+
val queryExecution = schemaRdd.queryExecution
109+
110+
assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") {
109111
schemaRdd.collect().map(_.head).toArray
110112
}
111113

112114
val (readPartitions, readBatches) = schemaRdd.queryExecution.executedPlan.collect {
113115
case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value)
114116
}.head
115117

116-
assert(readBatches === expectedReadBatches, "Wrong number of read batches")
117-
assert(readPartitions === expectedReadPartitions, "Wrong number of read partitions")
118+
assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution")
119+
assert(
120+
readPartitions === expectedReadPartitions,
121+
s"Wrong number of read partitions: $queryExecution")
118122
}
119123
}
120124
}

0 commit comments

Comments
 (0)