Skip to content

[SPARK-20557] [SQL] Only support TIMESTAMP WITH TIME ZONE for Oracle Dialect #19939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.spark.sql.jdbc

import java.sql.{Connection, Date, Timestamp}
import java.util.Properties
import java.util.{Properties, TimeZone}
import java.math.BigDecimal

import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.execution.{WholeStageCodegenExec, RowDataSourceScanExec}
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -77,6 +78,9 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
conn.prepareStatement(
"INSERT INTO ts_with_timezone VALUES " +
"(1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate()
conn.prepareStatement(
"INSERT INTO ts_with_timezone VALUES " +
"(2, to_timestamp_tz('1999-12-01 12:00:00 PST','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate()
conn.commit()

conn.prepareStatement(
Expand Down Expand Up @@ -235,6 +239,63 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
assert(types(1).equals("class java.sql.Timestamp"))
}

test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") {
val defaultJVMTimeZone = TimeZone.getDefault
// Pick the timezone different from the current default time zone of JVM
val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai")
val localSessionTimeZone =
if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone

withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) {
val e = intercept[java.sql.SQLException] {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
dfRead.collect()
}.getMessage
assert(e.contains("Unrecognized SQL type -101"))
}
}

/**
* Change the Time Zone `timeZoneId` of JVM before executing `f`, then switches back to the
* original after `f` returns.
* @param timeZoneId the ID for a TimeZone, either an abbreviation such as "PST", a full name such
* as "America/Los_Angeles", or a custom ID such as "GMT-8:00".
*/
private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = {
val originalLocale = TimeZone.getDefault
try {
// Add Locale setting
TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId))
f
} finally {
TimeZone.setDefault(originalLocale)
}
}

test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
def checkRow(row: Row, ts: String): Unit = {
assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
}

withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> TimeZone.getDefault.getID) {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
withTimeZone("PST") {
assert(dfRead.collect().toSet ===
Set(
Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 03:00:00")),
Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 12:00:00"))))
}

withTimeZone("UTC") {
assert(dfRead.collect().toSet ===
Set(
Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 11:00:00")),
Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 20:00:00"))))
}
}
}

test("SPARK-18004: Make sure date or timestamp related predicate is pushed down correctly") {
val props = new Properties()
props.put("oracle.jdbc.mapDateToTimestamp", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {

test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE " +
"should be recognized") {
// When using JDBC to read the columns of TIMESTAMP with TIME ZONE and TIME with TIME ZONE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is for the support of ArrayType. We can revisit that one in the next PR.

// the actual types are java.sql.Types.TIMESTAMP and java.sql.Types.TIME
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
val rows = dfRead.collect()
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ object JdbcUtils extends Logging {
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME_WITH_TIMEZONE
=> TimestampType
=> null
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> TimestampType
=> null
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.spark.sql.jdbc

import java.sql.{Date, Timestamp, Types}
import java.util.TimeZone

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand All @@ -29,6 +32,13 @@ private case object OracleDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")

private def supportTimeZoneTypes: Boolean = {
val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
// TODO: support timezone types when users are not using the JVM timezone, which
// is the default value of SESSION_LOCAL_TIMEZONE
timeZone == TimeZone.getDefault
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
Expand All @@ -49,7 +59,8 @@ private case object OracleDialect extends JdbcDialect {
case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
case _ => None
}
case TIMESTAMPTZ => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
case TIMESTAMPTZ if supportTimeZoneTypes
=> Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,10 +1064,10 @@ class JDBCSuite extends SparkFunSuite
}

test("unsupported types") {
var e = intercept[SparkException] {
var e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect()
}.getMessage
assert(e.contains("java.lang.UnsupportedOperationException: unimplemented"))
assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE"))
e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
}.getMessage
Expand Down