Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter push down #1641

Closed
Tracked by #3
penghuo opened this issue May 18, 2023 · 1 comment
Closed
Tracked by #3

Filter push down #1641

penghuo opened this issue May 18, 2023 · 1 comment

Comments

@penghuo
Copy link
Collaborator

penghuo commented May 18, 2023

Predicate Push Down

Spark provide predicates: Array[Predicate] to DataSource, DataSource decide which predicate could be pushed down. I will use example to describe what predicate is provide by Spark.

  • value = 1
[
  (value IS NOT NULL),
  (value = 1)
]
  • value = 1 and id = ‘a’
[
  (value IS NOT NULL),
  (id IS NOT NULL),
  (value = 1),
  (id = 'a')
]
  • value = 1 or value = 2
OR(
  value = 1,
  value = 2
)
  • NOT (value = 1 or value = 2).
💡 Spark does not optimize the expression to (value ≠ 1 AND value ≠ 2)
[
  (value IS NOT NULL),
  (NOT (value = 1)),
  (NOT (value = 2))
]

Spark SQL in ANSI mode

Spark SQL has an ANSI mode configuration spark.sql.ansi.enabled, by default is false.

spark.conf.set("spark.sql.ansi.enabled", "true")

The ANSI mode could change the filter push down logic. The following example explain it.

  • value + 1 = 2, ANSI mode = false
💡 Spark does not normalize the expression to value = 1. it does not push down the expression also
[
  (value IS NOT NULL)
]
  • value + 1 = 2, ANSI mode = true
[
  (value IS NOT NULL)
  (value + 1 = 2)
]

The V2ExpressionBuilder class decide which expression could be push down. The V2ExpressionBuilder is not stable yet. More expression push down is adding to it.

@penghuo
Copy link
Collaborator Author

penghuo commented May 18, 2023

How to support pushdown.

For instance, push down, array_contains(aIntArray, 500)

Solutions

There are 2 possible solutions, (1) Create UDF and register through FunctionCatalog. (2) Add query optimization rule to rewrite Filter. The PR below is solution (1).

  • PR penghuo@c64440f
    • The PR required SPARK 3.4.0, which is not supported by AWS EMR now.
    • The flint.array_contains(aInt, 1) = 1 grammer is not idea, actually we expected using flint.array_contains(aInt, 1), but SPARK V2ExpressionBuilder can not rewrite it as a Predicate.

Demo

###
PUT {{baseUrl}}/t001
Content-Type: application/json

{
  "mappings": {
    "dynamic": false,
  "properties": {
    "aInt": {
      "type": "integer"
    },
    "aString": {
      "type": "keyword"
    },
    "aText": {
      "type": "text"
    }
  }
  }
}
###
POST {{baseUrl}}/t001/_bulk
Content-Type: application/x-ndjson

{ "create" : { "_id" : "1" } }
{"aInt": [1,2,3],"aString": "a","aText": "i am first"}
{ "create" : { "_id" : "2" } }
{"aInt": [4,5,6],"aString": "b","aText": "i am second"}


import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

spark.conf.set("spark.sql.catalog.flint", classOf[org.apache.spark.sql.flint.FlintCatalog].getName)

val schema = StructType(Seq(StructField("aInt", ArrayType(IntegerType), nullable = true)))
val openSearchOptions = Map("host" -> "localhost", "port" -> "9200")

val sql = new SQLContext(sc)
val df = sql.read.format("flint").options(openSearchOptions).schema(schema).load("t001")

df.filter("flint.array_contains(aInt, 1) = 1").show

+---------+
|     aInt|
+---------+
|[1, 2, 3]|
+---------+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants