Skip to content

Commit ad5b7cf

Browse files
viiryarxin
authored andcommitted
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Refactor filter pushdown for JDBCRDD and add few filters
This patch refactors the filter pushdown for JDBCRDD and also adds few filters. Added filters are basically from #10468 with some refactoring. Test cases are from #10468. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #10470 from viirya/refactor-jdbc-filter.
1 parent a59a357 commit ad5b7cf

File tree

2 files changed

+45
-31
lines changed

2 files changed

+45
-31
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ private[sql] object JDBCRDD extends Logging {
179179
case stringValue: String => s"'${escapeSql(stringValue)}'"
180180
case timestampValue: Timestamp => "'" + timestampValue + "'"
181181
case dateValue: Date => "'" + dateValue + "'"
182-
case arrayValue: Array[Object] => arrayValue.map(compileValue).mkString(", ")
182+
case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
183183
case _ => value
184184
}
185185

@@ -188,24 +188,41 @@ private[sql] object JDBCRDD extends Logging {
188188

189189
/**
190190
* Turns a single Filter into a String representing a SQL expression.
191-
* Returns null for an unhandled filter.
191+
* Returns None for an unhandled filter.
192192
*/
193-
private def compileFilter(f: Filter): String = f match {
194-
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
195-
case Not(f) => s"(NOT (${compileFilter(f)}))"
196-
case LessThan(attr, value) => s"$attr < ${compileValue(value)}"
197-
case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}"
198-
case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}"
199-
case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}"
200-
case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'"
201-
case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'"
202-
case StringContains(attr, value) => s"${attr} LIKE '%${value}%'"
203-
case IsNull(attr) => s"$attr IS NULL"
204-
case IsNotNull(attr) => s"$attr IS NOT NULL"
205-
case In(attr, value) => s"$attr IN (${compileValue(value)})"
206-
case Or(f1, f2) => s"(${compileFilter(f1)}) OR (${compileFilter(f2)})"
207-
case And(f1, f2) => s"(${compileFilter(f1)}) AND (${compileFilter(f2)})"
208-
case _ => null
193+
private def compileFilter(f: Filter): Option[String] = {
194+
Option(f match {
195+
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
196+
case LessThan(attr, value) => s"$attr < ${compileValue(value)}"
197+
case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}"
198+
case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}"
199+
case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}"
200+
case IsNull(attr) => s"$attr IS NULL"
201+
case IsNotNull(attr) => s"$attr IS NOT NULL"
202+
case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'"
203+
case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'"
204+
case StringContains(attr, value) => s"${attr} LIKE '%${value}%'"
205+
case In(attr, value) => s"$attr IN (${compileValue(value)})"
206+
case Not(f) => compileFilter(f).map(p => s"(NOT ($p))").getOrElse(null)
207+
case Or(f1, f2) =>
208+
// We can't compile Or filter unless both sub-filters are compiled successfully.
209+
// It applies too for the following And filter.
210+
// If we can make sure compileFilter supports all filters, we can remove this check.
211+
val or = Seq(f1, f2).map(compileFilter(_)).flatten
212+
if (or.size == 2) {
213+
or.map(p => s"($p)").mkString(" OR ")
214+
} else {
215+
null
216+
}
217+
case And(f1, f2) =>
218+
val and = Seq(f1, f2).map(compileFilter(_)).flatten
219+
if (and.size == 2) {
220+
and.map(p => s"($p)").mkString(" AND ")
221+
} else {
222+
null
223+
}
224+
case _ => null
225+
})
209226
}
210227

211228
/**
@@ -307,25 +324,21 @@ private[sql] class JDBCRDD(
307324
/**
308325
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
309326
*/
310-
private val filterWhereClause: String = {
311-
val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null)
312-
if (filterStrings.size > 0) {
313-
val sb = new StringBuilder("WHERE ")
314-
filterStrings.foreach(x => sb.append(x).append(" AND "))
315-
sb.substring(0, sb.length - 5)
316-
} else ""
317-
}
327+
private val filterWhereClause: String =
328+
filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ")
318329

319330
/**
320331
* A WHERE clause representing both `filters`, if any, and the current partition.
321332
*/
322333
private def getWhereClause(part: JDBCPartition): String = {
323334
if (part.whereClause != null && filterWhereClause.length > 0) {
324-
filterWhereClause + " AND " + part.whereClause
335+
"WHERE " + filterWhereClause + " AND " + part.whereClause
325336
} else if (part.whereClause != null) {
326337
"WHERE " + part.whereClause
338+
} else if (filterWhereClause.length > 0) {
339+
"WHERE " + filterWhereClause
327340
} else {
328-
filterWhereClause
341+
""
329342
}
330343
}
331344

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class JDBCSuite extends SparkFunSuite
190190
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
191191
.collect().size == 2)
192192
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
193-
.collect().size === 2)
193+
.collect().size == 2)
194194
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
195195
.collect().size == 2)
196196
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
@@ -453,8 +453,8 @@ class JDBCSuite extends SparkFunSuite
453453
}
454454

455455
test("compile filters") {
456-
val compileFilter = PrivateMethod[String]('compileFilter)
457-
def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f)
456+
val compileFilter = PrivateMethod[Option[String]]('compileFilter)
457+
def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("")
458458
assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3")
459459
assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))")
460460
assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def")))
@@ -473,6 +473,7 @@ class JDBCSuite extends SparkFunSuite
473473
=== "(NOT (col1 IN ('mno', 'pqr')))")
474474
assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL")
475475
assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL")
476+
assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) === "")
476477
}
477478

478479
test("Dialect unregister") {

0 commit comments

Comments
 (0)