Skip to content

Commit 8e23d8d

Browse files
lianchengyhuai
authored andcommitted
[SPARK-12218] Fixes ORC conjunction predicate push down
This PR is a follow-up of PR apache#10362. Two major changes: 1. The fix introduced in apache#10362 is OK for Parquet, but may disable ORC PPD in many cases PR apache#10362 stops converting an `AND` predicate if any branch is inconvertible. On the other hand, `OrcFilters` combines all filters into a single big conjunction first and then tries to convert it into ORC `SearchArgument`. This means, if any filter is inconvertible, no filters can be pushed down. This PR fixes this issue by finding out all convertible filters first before doing the actual conversion. The reason behind the current implementation is mostly due to the limitation of ORC `SearchArgument` builder, which is documented in this PR in detail. 1. Copied the `AND` predicate fix for ORC from apache#10362 to avoid merge conflict. Same as apache#10362, this PR targets master (2.0.0-SNAPSHOT), branch-1.6, and branch-1.5. Author: Cheng Lian <lian@databricks.com> Closes apache#10377 from liancheng/spark-12218.fix-orc-conjunction-ppd.
1 parent 8d49400 commit 8e23d8d

File tree

3 files changed

+112
-30
lines changed

3 files changed

+112
-30
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20-
import org.apache.parquet.filter2.predicate.Operators._
20+
import org.apache.parquet.filter2.predicate.FilterApi._
21+
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
2122
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
2223

23-
import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
24+
import org.apache.spark.sql._
2425
import org.apache.spark.sql.catalyst.dsl.expressions._
2526
import org.apache.spark.sql.catalyst.expressions._
2627
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2728
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
2829
import org.apache.spark.sql.test.SharedSQLContext
30+
import org.apache.spark.sql.types._
2931

3032
/**
3133
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -382,6 +384,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
382384
}
383385
}
384386

387+
test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
388+
val schema = StructType(Seq(
389+
StructField("a", IntegerType, nullable = false),
390+
StructField("b", StringType, nullable = true),
391+
StructField("c", DoubleType, nullable = true)
392+
))
393+
394+
assertResult(Some(and(
395+
lt(intColumn("a"), 10: Integer),
396+
gt(doubleColumn("c"), 1.5: java.lang.Double)))
397+
) {
398+
ParquetFilters.createFilter(
399+
schema,
400+
sources.And(
401+
sources.LessThan("a", 10),
402+
sources.GreaterThan("c", 1.5D)))
403+
}
404+
405+
assertResult(None) {
406+
ParquetFilters.createFilter(
407+
schema,
408+
sources.And(
409+
sources.LessThan("a", 10),
410+
sources.StringContains("b", "prefix")))
411+
}
412+
413+
assertResult(None) {
414+
ParquetFilters.createFilter(
415+
schema,
416+
sources.Not(
417+
sources.And(
418+
sources.GreaterThan("a", 1),
419+
sources.StringContains("b", "prefix"))))
420+
}
421+
}
422+
385423
test("SPARK-11164: test the parquet filter in") {
386424
import testImplicits._
387425
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,47 @@ import org.apache.spark.Logging
2626
import org.apache.spark.sql.sources._
2727

2828
/**
29-
* It may be optimized by push down partial filters. But we are conservative here.
30-
* Because if some filters fail to be parsed, the tree may be corrupted,
31-
* and cannot be used anymore.
29+
* Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down.
30+
*
31+
* Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double-
32+
* checking pattern when converting `And`/`Or`/`Not` filters.
33+
*
34+
* An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't
35+
* build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite
36+
* different from the cases in Spark SQL or Parquet, where complex filters can be easily built using
37+
* existing simpler ones.
38+
*
39+
* The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and
40+
* `startNot()` mutate internal state of the builder instance. This forces us to translate all
41+
* convertible filters with a single builder instance. However, before actually converting a filter,
42+
* we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is
43+
* found, we may already end up with a builder whose internal state is inconsistent.
44+
*
45+
* For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then
46+
* try to convert its children. Say we convert `left` child successfully, but find that `right`
47+
* child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent
48+
* now.
49+
*
50+
* The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
51+
* children with brand new builders, and only do the actual conversion with the right builder
52+
* instance when the children are proven to be convertible.
53+
*
54+
* P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of
55+
* builder methods mentioned above can only be found in test code, where all tested filters are
56+
* known to be convertible.
3257
*/
3358
private[orc] object OrcFilters extends Logging {
3459
def createFilter(filters: Array[Filter]): Option[SearchArgument] = {
60+
// First, tries to convert each filter individually to see whether it's convertible, and then
61+
// collect all convertible ones to build the final `SearchArgument`.
62+
val convertibleFilters = for {
63+
filter <- filters
64+
_ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder())
65+
} yield filter
66+
3567
for {
36-
// Combines all filters with `And`s to produce a single conjunction predicate
37-
conjunction <- filters.reduceOption(And)
68+
// Combines all convertible filters using `And` to produce a single conjunction
69+
conjunction <- convertibleFilters.reduceOption(And)
3870
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
3971
builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder())
4072
} yield builder.build()
@@ -50,28 +82,6 @@ private[orc] object OrcFilters extends Logging {
5082
case _ => false
5183
}
5284

53-
// lian: I probably missed something here, and had to end up with a pretty weird double-checking
54-
// pattern when converting `And`/`Or`/`Not` filters.
55-
//
56-
// The annoying part is that, `SearchArgument` builder methods like `startAnd()` `startOr()`,
57-
// and `startNot()` mutate internal state of the builder instance. This forces us to translate
58-
// all convertible filters with a single builder instance. However, before actually converting a
59-
// filter, we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible
60-
// filter is found, we may already end up with a builder whose internal state is inconsistent.
61-
//
62-
// For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and
63-
// then try to convert its children. Say we convert `left` child successfully, but find that
64-
// `right` child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is
65-
// inconsistent now.
66-
//
67-
// The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
68-
// children with brand new builders, and only do the actual conversion with the right builder
69-
// instance when the children are proven to be convertible.
70-
//
71-
// P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only.
72-
// Usage of builder methods mentioned above can only be found in test code, where all tested
73-
// filters are known to be convertible.
74-
7585
expression match {
7686
case And(left, right) =>
7787
// At here, it is not safe to just convert one side if we do not understand the
@@ -102,6 +112,10 @@ private[orc] object OrcFilters extends Logging {
102112
negate <- buildSearchArgument(child, builder.startNot())
103113
} yield negate.end()
104114

115+
// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
116+
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
117+
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
118+
105119
case EqualTo(attribute, value) if isSearchableLiteral(value) =>
106120
Some(builder.startAnd().equals(attribute, value).end())
107121

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import java.io.File
2121

2222
import org.scalatest.BeforeAndAfterAll
2323

24-
import org.apache.spark.sql.{QueryTest, Row}
2524
import org.apache.spark.sql.hive.test.TestHiveSingleton
25+
import org.apache.spark.sql.sources._
26+
import org.apache.spark.sql.{QueryTest, Row}
2627

2728
case class OrcData(intField: Int, stringField: String)
2829

@@ -174,4 +175,33 @@ class OrcSourceSuite extends OrcSuite {
174175
|)
175176
""".stripMargin)
176177
}
178+
179+
test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
180+
// The `LessThan` should be converted while the `StringContains` shouldn't
181+
assertResult(
182+
"""leaf-0 = (LESS_THAN a 10)
183+
|expr = leaf-0
184+
""".stripMargin.trim
185+
) {
186+
OrcFilters.createFilter(Array(
187+
LessThan("a", 10),
188+
StringContains("b", "prefix")
189+
)).get.toString
190+
}
191+
192+
// The `LessThan` should be converted while the whole inner `And` shouldn't
193+
assertResult(
194+
"""leaf-0 = (LESS_THAN a 10)
195+
|expr = leaf-0
196+
""".stripMargin.trim
197+
) {
198+
OrcFilters.createFilter(Array(
199+
LessThan("a", 10),
200+
Not(And(
201+
GreaterThan("a", 1),
202+
StringContains("b", "prefix")
203+
))
204+
)).get.toString
205+
}
206+
}
177207
}

0 commit comments

Comments
 (0)