Skip to content

Commit 2f91b01

Browse files
sureshthalamatigatorsmile
authored andcommitted
[SPARK-18141][SQL] Fix to quote column names in the predicate clause of the JDBC RDD generated sql statement
## What changes were proposed in this pull request? SQL query generated for the JDBC data source is not quoting columns in the predicate clause. When the source table has quoted column names, spark jdbc read fails with column not found error incorrectly. Error: org.h2.jdbc.JdbcSQLException: Column "ID" not found; Source SQL statement: SELECT "Name","Id" FROM TEST."mixedCaseCols" WHERE (Id < 1) This PR fixes by quoting column names in the generated SQL for predicate clause when filters are pushed down to the data source. Source SQL statement after the fix: SELECT "Name","Id" FROM TEST."mixedCaseCols" WHERE ("Id" < 1) ## How was this patch tested? Added new test case to the JdbcSuite Author: sureshthalamati <suresh.thalamati@gmail.com> Closes #15662 from sureshthalamati/filter_quoted_cols-SPARK-18141. (cherry picked from commit 70c5549) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
1 parent 2d2e801 commit 2f91b01

File tree

3 files changed

+82
-39
lines changed

3 files changed

+82
-39
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.catalyst.InternalRow
30-
import org.apache.spark.sql.jdbc.JdbcDialects
30+
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
3131
import org.apache.spark.sql.sources._
3232
import org.apache.spark.sql.types._
3333
import org.apache.spark.util.CompletionIterator
@@ -105,37 +105,40 @@ object JDBCRDD extends Logging {
105105
* Turns a single Filter into a String representing a SQL expression.
106106
* Returns None for an unhandled filter.
107107
*/
108-
def compileFilter(f: Filter): Option[String] = {
108+
def compileFilter(f: Filter, dialect: JdbcDialect): Option[String] = {
109+
def quote(colName: String): String = dialect.quoteIdentifier(colName)
110+
109111
Option(f match {
110-
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
112+
case EqualTo(attr, value) => s"${quote(attr)} = ${compileValue(value)}"
111113
case EqualNullSafe(attr, value) =>
112-
s"(NOT ($attr != ${compileValue(value)} OR $attr IS NULL OR " +
113-
s"${compileValue(value)} IS NULL) OR ($attr IS NULL AND ${compileValue(value)} IS NULL))"
114-
case LessThan(attr, value) => s"$attr < ${compileValue(value)}"
115-
case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}"
116-
case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}"
117-
case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}"
118-
case IsNull(attr) => s"$attr IS NULL"
119-
case IsNotNull(attr) => s"$attr IS NOT NULL"
120-
case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'"
121-
case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'"
122-
case StringContains(attr, value) => s"${attr} LIKE '%${value}%'"
114+
val col = quote(attr)
115+
s"(NOT ($col != ${compileValue(value)} OR $col IS NULL OR " +
116+
s"${compileValue(value)} IS NULL) OR ($col IS NULL AND ${compileValue(value)} IS NULL))"
117+
case LessThan(attr, value) => s"${quote(attr)} < ${compileValue(value)}"
118+
case GreaterThan(attr, value) => s"${quote(attr)} > ${compileValue(value)}"
119+
case LessThanOrEqual(attr, value) => s"${quote(attr)} <= ${compileValue(value)}"
120+
case GreaterThanOrEqual(attr, value) => s"${quote(attr)} >= ${compileValue(value)}"
121+
case IsNull(attr) => s"${quote(attr)} IS NULL"
122+
case IsNotNull(attr) => s"${quote(attr)} IS NOT NULL"
123+
case StringStartsWith(attr, value) => s"${quote(attr)} LIKE '${value}%'"
124+
case StringEndsWith(attr, value) => s"${quote(attr)} LIKE '%${value}'"
125+
case StringContains(attr, value) => s"${quote(attr)} LIKE '%${value}%'"
123126
case In(attr, value) if value.isEmpty =>
124-
s"CASE WHEN ${attr} IS NULL THEN NULL ELSE FALSE END"
125-
case In(attr, value) => s"$attr IN (${compileValue(value)})"
126-
case Not(f) => compileFilter(f).map(p => s"(NOT ($p))").getOrElse(null)
127+
s"CASE WHEN ${quote(attr)} IS NULL THEN NULL ELSE FALSE END"
128+
case In(attr, value) => s"${quote(attr)} IN (${compileValue(value)})"
129+
case Not(f) => compileFilter(f, dialect).map(p => s"(NOT ($p))").getOrElse(null)
127130
case Or(f1, f2) =>
128131
// We can't compile Or filter unless both sub-filters are compiled successfully.
129132
// It applies too for the following And filter.
130133
// If we can make sure compileFilter supports all filters, we can remove this check.
131-
val or = Seq(f1, f2).flatMap(compileFilter(_))
134+
val or = Seq(f1, f2).flatMap(compileFilter(_, dialect))
132135
if (or.size == 2) {
133136
or.map(p => s"($p)").mkString(" OR ")
134137
} else {
135138
null
136139
}
137140
case And(f1, f2) =>
138-
val and = Seq(f1, f2).flatMap(compileFilter(_))
141+
val and = Seq(f1, f2).flatMap(compileFilter(_, dialect))
139142
if (and.size == 2) {
140143
and.map(p => s"($p)").mkString(" AND ")
141144
} else {
@@ -214,7 +217,9 @@ private[jdbc] class JDBCRDD(
214217
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
215218
*/
216219
private val filterWhereClause: String =
217-
filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
220+
filters
221+
.flatMap(JDBCRDD.compileFilter(_, JdbcDialects.get(url)))
222+
.map(p => s"($p)").mkString(" AND ")
218223

219224
/**
220225
* A WHERE clause representing both `filters`, if any, and the current partition.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
2323
import org.apache.spark.Partition
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
26+
import org.apache.spark.sql.jdbc.JdbcDialects
2627
import org.apache.spark.sql.sources._
2728
import org.apache.spark.sql.types.StructType
2829

@@ -113,7 +114,7 @@ private[sql] case class JDBCRelation(
113114

114115
// Check if JDBCRDD.compileFilter can accept input filters
115116
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
116-
filters.filter(JDBCRDD.compileFilter(_).isEmpty)
117+
filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
117118
}
118119

119120
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,21 @@ class JDBCSuite extends SparkFunSuite
202202
|partitionColumn '"Dept"', lowerBound '1', upperBound '4', numPartitions '4')
203203
""".stripMargin.replaceAll("\n", " "))
204204

205+
conn.prepareStatement(
206+
"""create table test."mixedCaseCols" ("Name" TEXT(32), "Id" INTEGER NOT NULL)""")
207+
.executeUpdate()
208+
conn.prepareStatement("""insert into test."mixedCaseCols" values ('fred', 1)""").executeUpdate()
209+
conn.prepareStatement("""insert into test."mixedCaseCols" values ('mary', 2)""").executeUpdate()
210+
conn.prepareStatement("""insert into test."mixedCaseCols" values (null, 3)""").executeUpdate()
211+
conn.commit()
212+
213+
sql(
214+
s"""
215+
|CREATE TEMPORARY TABLE mixedCaseCols
216+
|USING org.apache.spark.sql.jdbc
217+
|OPTIONS (url '$url', dbtable 'TEST."mixedCaseCols"', user 'testUser', password 'testPass')
218+
""".stripMargin.replaceAll("\n", " "))
219+
205220
// Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
206221
}
207222

@@ -604,30 +619,32 @@ class JDBCSuite extends SparkFunSuite
604619

605620
test("compile filters") {
606621
val compileFilter = PrivateMethod[Option[String]]('compileFilter)
607-
def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("")
608-
assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3")
609-
assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))")
622+
def doCompileFilter(f: Filter): String =
623+
JDBCRDD invokePrivate compileFilter(f, JdbcDialects.get("jdbc:")) getOrElse("")
624+
assert(doCompileFilter(EqualTo("col0", 3)) === """"col0" = 3""")
625+
assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === """(NOT ("col1" = 'abc'))""")
610626
assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def")))
611-
=== "(col0 = 0) AND (col1 = 'def')")
627+
=== """("col0" = 0) AND ("col1" = 'def')""")
612628
assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi")))
613-
=== "(col0 = 2) OR (col1 = 'ghi')")
614-
assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5")
629+
=== """("col0" = 2) OR ("col1" = 'ghi')""")
630+
assert(doCompileFilter(LessThan("col0", 5)) === """"col0" < 5""")
615631
assert(doCompileFilter(LessThan("col3",
616-
Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'")
617-
assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'")
618-
assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5")
619-
assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3")
620-
assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3")
621-
assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')")
632+
Timestamp.valueOf("1995-11-21 00:00:00.0"))) === """"col3" < '1995-11-21 00:00:00.0'""")
633+
assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04")))
634+
=== """"col4" < '1983-08-04'""")
635+
assert(doCompileFilter(LessThanOrEqual("col0", 5)) === """"col0" <= 5""")
636+
assert(doCompileFilter(GreaterThan("col0", 3)) === """"col0" > 3""")
637+
assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === """"col0" >= 3""")
638+
assert(doCompileFilter(In("col1", Array("jkl"))) === """"col1" IN ('jkl')""")
622639
assert(doCompileFilter(In("col1", Array.empty)) ===
623-
"CASE WHEN col1 IS NULL THEN NULL ELSE FALSE END")
640+
"""CASE WHEN "col1" IS NULL THEN NULL ELSE FALSE END""")
624641
assert(doCompileFilter(Not(In("col1", Array("mno", "pqr"))))
625-
=== "(NOT (col1 IN ('mno', 'pqr')))")
626-
assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL")
627-
assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL")
642+
=== """(NOT ("col1" IN ('mno', 'pqr')))""")
643+
assert(doCompileFilter(IsNull("col1")) === """"col1" IS NULL""")
644+
assert(doCompileFilter(IsNotNull("col1")) === """"col1" IS NOT NULL""")
628645
assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def")))
629-
=== "((NOT (col0 != 'abc' OR col0 IS NULL OR 'abc' IS NULL) "
630-
+ "OR (col0 IS NULL AND 'abc' IS NULL))) AND (col1 = 'def')")
646+
=== """((NOT ("col0" != 'abc' OR "col0" IS NULL OR 'abc' IS NULL) """
647+
+ """OR ("col0" IS NULL AND 'abc' IS NULL))) AND ("col1" = 'def')""")
631648
}
632649

633650
test("Dialect unregister") {
@@ -824,4 +841,24 @@ class JDBCSuite extends SparkFunSuite
824841
val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp")
825842
assert(schema.contains("`order` TEXT"))
826843
}
844+
845+
test("SPARK-18141: Predicates on quoted column names in the jdbc data source") {
846+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id < 1").collect().size == 0)
847+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id <= 1").collect().size == 1)
848+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id > 1").collect().size == 2)
849+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id >= 1").collect().size == 3)
850+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1").collect().size == 1)
851+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id != 2").collect().size == 2)
852+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id <=> 2").collect().size == 1)
853+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name LIKE 'fr%'").collect().size == 1)
854+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name LIKE '%ed'").collect().size == 1)
855+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name LIKE '%re%'").collect().size == 1)
856+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name IS NULL").collect().size == 1)
857+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name IS NOT NULL").collect().size == 2)
858+
assert(sql("SELECT * FROM mixedCaseCols").filter($"Name".isin()).collect().size == 0)
859+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name IN ('mary', 'fred')").collect().size == 2)
860+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name NOT IN ('fred')").collect().size == 1)
861+
assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name = 'mary'").collect().size == 2)
862+
assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id = 2").collect().size == 1)
863+
}
827864
}

0 commit comments

Comments
 (0)