Skip to content

Commit 771da83

Browse files
dilipbiswalgatorsmile
authored andcommitted
[SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
## What changes were proposed in this pull request? **Description from JIRA** For the JDBC option `query`, we use the identifier name to start with underscore: s"(${subquery}) _SPARK_GEN_JDBC_SUBQUERY_NAME${curId.getAndIncrement()}". This is not supported by Oracle. The Oracle doesn't seem to support identifier name to start with non-alphabet character (unless it is quoted) and has length restrictions as well. [link](https://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements008.htm) In this PR, the generated alias name 'SPARK_GEN_JDBC_SUBQUERY_NAME<int value>' is fixed to remove "_" prefix and also the alias name is shortened to not exceed the identifier length limit. ## How was this patch tested? Tests are added for MySql, Postgress, Oracle and DB2 to ensure enough coverage. Closes #24532 from dilipbiswal/SPARK-27596. Authored-by: Dilip Biswal <dbiswal@us.ibm.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com> (cherry picked from commit 6001d47) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
1 parent d4eddce commit 771da83

File tree

5 files changed

+108
-1
lines changed

5 files changed

+108
-1
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,30 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
158158
assert(rows(0).getInt(1) == 20)
159159
assert(rows(0).getString(2) == "1")
160160
}
161+
162+
test("query JDBC option") {
163+
val expectedResult = Set(
164+
(42, "fred"),
165+
(17, "dave")
166+
).map { case (x, y) =>
167+
Row(Integer.valueOf(x), String.valueOf(y))
168+
}
169+
170+
val query = "SELECT x, y FROM tbl WHERE x > 10"
171+
// query option to pass on the query string.
172+
val df = spark.read.format("jdbc")
173+
.option("url", jdbcUrl)
174+
.option("query", query)
175+
.load()
176+
assert(df.collect.toSet === expectedResult)
177+
178+
// query option in the create table path.
179+
sql(
180+
s"""
181+
|CREATE OR REPLACE TEMPORARY VIEW queryOption
182+
|USING org.apache.spark.sql.jdbc
183+
|OPTIONS (url '$jdbcUrl', query '$query')
184+
""".stripMargin.replaceAll("\n", " "))
185+
assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
186+
}
161187
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.math.BigDecimal
2121
import java.sql.{Connection, Date, Timestamp}
2222
import java.util.Properties
2323

24+
import org.apache.spark.sql.{Row, SaveMode}
2425
import org.apache.spark.tags.DockerTest
2526

2627
@DockerTest
@@ -152,4 +153,30 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
152153
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
153154
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
154155
}
156+
157+
test("query JDBC option") {
158+
val expectedResult = Set(
159+
(42, "fred"),
160+
(17, "dave")
161+
).map { case (x, y) =>
162+
Row(Integer.valueOf(x), String.valueOf(y))
163+
}
164+
165+
val query = "SELECT x, y FROM tbl WHERE x > 10"
166+
// query option to pass on the query string.
167+
val df = spark.read.format("jdbc")
168+
.option("url", jdbcUrl)
169+
.option("query", query)
170+
.load()
171+
assert(df.collect.toSet === expectedResult)
172+
173+
// query option in the create table path.
174+
sql(
175+
s"""
176+
|CREATE OR REPLACE TEMPORARY VIEW queryOption
177+
|USING org.apache.spark.sql.jdbc
178+
|OPTIONS (url '$jdbcUrl', query '$query')
179+
""".stripMargin.replaceAll("\n", " "))
180+
assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
181+
}
155182
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,4 +482,32 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
482482
}
483483
assert(df2.collect.toSet === expectedResult)
484484
}
485+
486+
test("query JDBC option") {
487+
val expectedResult = Set(
488+
(1, "1991-11-09", "1996-01-01 01:23:45")
489+
).map { case (id, date, timestamp) =>
490+
Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp))
491+
}
492+
493+
val query = "SELECT id, d, t FROM datetime WHERE id = 1"
494+
// query option to pass on the query string.
495+
val df = spark.read.format("jdbc")
496+
.option("url", jdbcUrl)
497+
.option("query", query)
498+
.option("oracle.jdbc.mapDateToTimestamp", "false")
499+
.load()
500+
assert(df.collect.toSet === expectedResult)
501+
502+
// query option in the create table path.
503+
sql(
504+
s"""
505+
|CREATE OR REPLACE TEMPORARY VIEW queryOption
506+
|USING org.apache.spark.sql.jdbc
507+
|OPTIONS (url '$jdbcUrl',
508+
| query '$query',
509+
| oracle.jdbc.mapDateToTimestamp false)
510+
""".stripMargin.replaceAll("\n", " "))
511+
assert(sql("select id, d, t from queryOption").collect.toSet == expectedResult)
512+
}
485513
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.sql.Connection
2121
import java.util.Properties
2222

2323
import org.apache.spark.sql.Column
24+
import org.apache.spark.sql.Row
2425
import org.apache.spark.sql.catalyst.expressions.Literal
2526
import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType}
2627
import org.apache.spark.tags.DockerTest
@@ -180,4 +181,29 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
180181
assert(rows(0).getSeq(8) == Seq("""{"a": "foo", "b": "bar"}""", """{"a": 1, "b": 2}"""))
181182
assert(rows(0).getSeq(9) == Seq("""{"a": 1, "b": 2, "c": 3}"""))
182183
}
184+
185+
test("query JDBC option") {
186+
val expectedResult = Set(
187+
(42, 123456789012345L)
188+
).map { case (c1, c3) =>
189+
Row(Integer.valueOf(c1), java.lang.Long.valueOf(c3))
190+
}
191+
192+
val query = "SELECT c1, c3 FROM bar WHERE c1 IS NOT NULL"
193+
// query option to pass on the query string.
194+
val df = spark.read.format("jdbc")
195+
.option("url", jdbcUrl)
196+
.option("query", query)
197+
.load()
198+
assert(df.collect.toSet === expectedResult)
199+
200+
// query option in the create table path.
201+
sql(
202+
s"""
203+
|CREATE OR REPLACE TEMPORARY VIEW queryOption
204+
|USING org.apache.spark.sql.jdbc
205+
|OPTIONS (url '$jdbcUrl', query '$query')
206+
""".stripMargin.replaceAll("\n", " "))
207+
assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
208+
}
183209
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class JDBCOptions(
8787
if (subquery.isEmpty) {
8888
throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
8989
} else {
90-
s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
90+
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
9191
}
9292
}
9393

0 commit comments

Comments
 (0)