Skip to content

[SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds #23597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use java.time API for calculation week number of year and day number of week based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid calendar (Julian + Gregorian) is used for the same purpose. Results of the functions returned by Spark 3.0 and previous versions can be different for dates before October 15, 1582 (Gregorian).

- Since Spark 3.0, the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone.

## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Date, Timestamp}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Partition
Expand All @@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

/**
* Instructions on how to partition the table among workers.
Expand Down Expand Up @@ -85,8 +85,8 @@ private[sql] object JDBCRelation extends Logging {
val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
schema, partitionColumn.get, resolver, jdbcOptions)

val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType)
val upperBoundValue = toInternalBoundValue(upperBound.get, columnType)
val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
JDBCPartitioningInfo(
column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
}
Expand Down Expand Up @@ -174,10 +174,21 @@ private[sql] object JDBCRelation extends Logging {
(dialect.quoteIdentifier(column.name), column.dataType)
}

private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
case _: NumericType => value.toLong
case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
private def toInternalBoundValue(
value: String,
columnType: DataType,
timeZoneId: String): Long = {
def parse[T](f: UTF8String => Option[T]): T = {
f(UTF8String.fromString(value)).getOrElse {
throw new IllegalArgumentException(
s"Cannot parse the bound value $value as ${columnType.catalogString}")
}
}
columnType match {
case _: NumericType => value.toLong
case DateType => parse(stringToDate).toLong
case TimestampType => parse(stringToTimestamp(_, getTimeZone(timeZoneId)))
}
}

private def toBoundValueInWhereClause(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -1523,4 +1523,36 @@ class JDBCSuite extends QueryTest
assert(e.contains("The driver could not open a JDBC connection. " +
"Check the URL: jdbc:mysql://localhost/db"))
}

test("support casting patterns for lower/upper bounds of TimestampType") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
Seq(
("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"),
("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"),
("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456",
"2019-01-20T00:10:00.123456"),
("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456")
).foreach { case (lower, middle, upper) =>
val df = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "TEST.DATETIME")
.option("partitionColumn", "t")
.option("lowerBound", lower)
.option("upperBound", upper)
.option("numPartitions", 2)
.load()

df.logicalPlan match {
case lr: LogicalRelation if lr.relation.isInstanceOf[JDBCRelation] =>
val jdbcRelation = lr.relation.asInstanceOf[JDBCRelation]
val whereClauses = jdbcRelation.parts.map(_.asInstanceOf[JDBCPartition].whereClause)
assert(whereClauses.toSet === Set(
s""""T" < '$middle' or "T" is null""",
s""""T" >= '$middle'"""))
}
}
}
}
}
}