Skip to content

Commit 115fecf

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-26456][SQL] Cast date/timestamp to string by Date/TimestampFormatter
## What changes were proposed in this pull request? In the PR, I propose to switch on `TimestampFormatter`/`DateFormatter` in casting dates/timestamps to strings. The changes should make the date/timestamp casting consistent to JSON/CSV datasources and time-related functions like `to_date`, `to_unix_timestamp`/`from_unixtime`. Local formatters are moved out from `DateTimeUtils` to where they are actually used. It allows to avoid re-creation of new formatter instance per-each call. Another reason is to have separate parser for `PartitioningUtils` because default parsing pattern cannot be used (expected optional section `[.S]`). ## How was this patch tested? It was tested by `DateTimeUtilsSuite`, `CastSuite` and `JDBC*Suite`. Closes #23391 from MaxGekk/thread-local-date-format. Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Co-authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 3f80071 commit 115fecf

File tree

13 files changed

+125
-98
lines changed

13 files changed

+125
-98
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
230230
// [[func]] assumes the input is no longer null because eval already does the null check.
231231
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])
232232

233+
private lazy val dateFormatter = DateFormatter()
234+
private lazy val timestampFormatter = TimestampFormatter(timeZone)
235+
233236
// UDFToString
234237
private[this] def castToString(from: DataType): Any => Any = from match {
235238
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
236-
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
239+
case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d)))
237240
case TimestampType => buildCast[Long](_,
238-
t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
241+
t => UTF8String.fromString(DateTimeUtils.timestampToString(timestampFormatter, t)))
239242
case ArrayType(et, _) =>
240243
buildCast[ArrayData](_, array => {
241244
val builder = new UTF8StringBuilder
@@ -843,12 +846,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
843846
case BinaryType =>
844847
(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);"
845848
case DateType =>
846-
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
847-
org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));"""
849+
val df = JavaCode.global(
850+
ctx.addReferenceObj("dateFormatter", dateFormatter),
851+
dateFormatter.getClass)
852+
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(${df}.format($c));"""
848853
case TimestampType =>
849-
val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass)
854+
val tf = JavaCode.global(
855+
ctx.addReferenceObj("timestampFormatter", timestampFormatter),
856+
timestampFormatter.getClass)
850857
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
851-
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
858+
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));"""
852859
case ArrayType(et, _) =>
853860
(c, evPrim, evNull) => {
854861
val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder])

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
562562
copy(timeZoneId = Option(timeZoneId))
563563

564564
override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
565-
val df = TimestampFormatter(format.toString, timeZone, Locale.US)
565+
val df = TimestampFormatter(format.toString, timeZone)
566566
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
567567
}
568568

@@ -665,7 +665,7 @@ abstract class UnixTime
665665
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
666666
private lazy val formatter: TimestampFormatter =
667667
try {
668-
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
668+
TimestampFormatter(constFormat.toString, timeZone)
669669
} catch {
670670
case NonFatal(_) => null
671671
}
@@ -698,7 +698,7 @@ abstract class UnixTime
698698
} else {
699699
val formatString = f.asInstanceOf[UTF8String].toString
700700
try {
701-
TimestampFormatter(formatString, timeZone, Locale.US).parse(
701+
TimestampFormatter(formatString, timeZone).parse(
702702
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
703703
} catch {
704704
case NonFatal(_) => null
@@ -819,7 +819,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
819819
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
820820
private lazy val formatter: TimestampFormatter =
821821
try {
822-
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
822+
TimestampFormatter(constFormat.toString, timeZone)
823823
} catch {
824824
case NonFatal(_) => null
825825
}
@@ -845,7 +845,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
845845
null
846846
} else {
847847
try {
848-
UTF8String.fromString(TimestampFormatter(f.toString, timeZone, Locale.US)
848+
UTF8String.fromString(TimestampFormatter(f.toString, timeZone)
849849
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
850850
} catch {
851851
case NonFatal(_) => null

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,14 @@ class Iso8601DateFormatter(
5151
}
5252

5353
object DateFormatter {
54+
val defaultPattern: String = "yyyy-MM-dd"
55+
val defaultLocale: Locale = Locale.US
56+
5457
def apply(format: String, locale: Locale): DateFormatter = {
5558
new Iso8601DateFormatter(format, locale)
5659
}
60+
61+
def apply(format: String): DateFormatter = apply(format, defaultLocale)
62+
63+
def apply(): DateFormatter = apply(defaultPattern)
5764
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -92,32 +92,6 @@ object DateTimeUtils {
9292
}
9393
}
9494

95-
// `SimpleDateFormat` is not thread-safe.
96-
private val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
97-
override def initialValue(): SimpleDateFormat = {
98-
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
99-
}
100-
}
101-
102-
def getThreadLocalTimestampFormat(timeZone: TimeZone): DateFormat = {
103-
val sdf = threadLocalTimestampFormat.get()
104-
sdf.setTimeZone(timeZone)
105-
sdf
106-
}
107-
108-
// `SimpleDateFormat` is not thread-safe.
109-
private val threadLocalDateFormat = new ThreadLocal[DateFormat] {
110-
override def initialValue(): SimpleDateFormat = {
111-
new SimpleDateFormat("yyyy-MM-dd", Locale.US)
112-
}
113-
}
114-
115-
def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = {
116-
val sdf = threadLocalDateFormat.get()
117-
sdf.setTimeZone(timeZone)
118-
sdf
119-
}
120-
12195
private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
12296
private val computeTimeZone = new JFunction[String, TimeZone] {
12397
override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
@@ -149,24 +123,11 @@ object DateTimeUtils {
149123
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
150124
}
151125

152-
def dateToString(days: SQLDate): String =
153-
getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days))
154-
155-
def dateToString(days: SQLDate, timeZone: TimeZone): String = {
156-
getThreadLocalDateFormat(timeZone).format(toJavaDate(days))
157-
}
158-
159-
// Converts Timestamp to string according to Hive TimestampWritable convention.
160-
def timestampToString(us: SQLTimestamp): String = {
161-
timestampToString(us, defaultTimeZone())
162-
}
163-
164126
// Converts Timestamp to string according to Hive TimestampWritable convention.
165-
def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = {
127+
def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = {
166128
val ts = toJavaTimestamp(us)
167129
val timestampString = ts.toString
168-
val timestampFormat = getThreadLocalTimestampFormat(timeZone)
169-
val formatted = timestampFormat.format(ts)
130+
val formatted = tf.format(us)
170131

171132
if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
172133
formatted + timestampString.substring(19)
@@ -1168,7 +1129,5 @@ object DateTimeUtils {
11681129
*/
11691130
private[util] def resetThreadLocals(): Unit = {
11701131
threadLocalGmtCalendar.remove()
1171-
threadLocalTimestampFormat.remove()
1172-
threadLocalDateFormat.remove()
11731132
}
11741133
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ sealed trait TimestampFormatter extends Serializable {
3636
@throws(classOf[ParseException])
3737
@throws(classOf[DateTimeParseException])
3838
@throws(classOf[DateTimeException])
39-
def parse(s: String): Long // returns microseconds since epoch
39+
def parse(s: String): Long
4040
def format(us: Long): String
4141
}
4242

@@ -74,7 +74,18 @@ class Iso8601TimestampFormatter(
7474
}
7575

7676
object TimestampFormatter {
77+
val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
78+
val defaultLocale: Locale = Locale.US
79+
7780
def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
7881
new Iso8601TimestampFormatter(format, timeZone, locale)
7982
}
83+
84+
def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
85+
apply(format, timeZone, defaultLocale)
86+
}
87+
88+
def apply(timeZone: TimeZone): TimestampFormatter = {
89+
apply(defaultPattern, timeZone, defaultLocale)
90+
}
8091
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ class DateTimeUtilsSuite extends SparkFunSuite {
3535
}
3636

3737
test("nanoseconds truncation") {
38+
val tf = TimestampFormatter(DateTimeUtils.defaultTimeZone())
3839
def checkStringToTimestamp(originalTime: String, expectedParsedTime: String) {
3940
val parsedTimestampOp = DateTimeUtils.stringToTimestamp(UTF8String.fromString(originalTime))
4041
assert(parsedTimestampOp.isDefined, "timestamp with nanoseconds was not parsed correctly")
41-
assert(DateTimeUtils.timestampToString(parsedTimestampOp.get) === expectedParsedTime)
42+
assert(DateTimeUtils.timestampToString(tf, parsedTimestampOp.get) === expectedParsedTime)
4243
}
4344

4445
checkStringToTimestamp("2015-01-02 00:00:00.123456789", "2015-01-02 00:00:00.123456")

sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
2929
test("parsing dates") {
3030
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
3131
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
32-
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
32+
val formatter = DateFormatter()
3333
val daysSinceEpoch = formatter.parse("2018-12-02")
3434
assert(daysSinceEpoch === 17867)
3535
}
@@ -39,7 +39,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
3939
test("format dates") {
4040
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
4141
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
42-
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
42+
val formatter = DateFormatter()
4343
val date = formatter.format(17867)
4444
assert(date === "2018-12-02")
4545
}
@@ -59,7 +59,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
5959
"5010-11-17").foreach { date =>
6060
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
6161
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
62-
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
62+
val formatter = DateFormatter()
6363
val days = formatter.parse(date)
6464
val formatted = formatter.format(days)
6565
assert(date === formatted)
@@ -82,7 +82,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
8282
1110657).foreach { days =>
8383
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
8484
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
85-
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
85+
val formatter = DateFormatter()
8686
val date = formatter.format(days)
8787
val parsed = formatter.parse(date)
8888
assert(days === parsed)
@@ -92,7 +92,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
9292
}
9393

9494
test("parsing date without explicit day") {
95-
val formatter = DateFormatter("yyyy MMM", Locale.US)
95+
val formatter = DateFormatter("yyyy MMM")
9696
val daysSinceEpoch = formatter.parse("2018 Dec")
9797
assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay)
9898
}

sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
4141
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
4242
val formatter = TimestampFormatter(
4343
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
44-
TimeZone.getTimeZone(timeZone),
45-
Locale.US)
44+
TimeZone.getTimeZone(timeZone))
4645
val microsSinceEpoch = formatter.parse(localDate)
4746
assert(microsSinceEpoch === expectedMicros(timeZone))
4847
}
@@ -62,8 +61,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
6261
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
6362
val formatter = TimestampFormatter(
6463
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
65-
TimeZone.getTimeZone(timeZone),
66-
Locale.US)
64+
TimeZone.getTimeZone(timeZone))
6765
val timestamp = formatter.format(microsSinceEpoch)
6866
assert(timestamp === expectedTimestamp(timeZone))
6967
}
@@ -82,7 +80,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
8280
2177456523456789L,
8381
11858049903010203L).foreach { micros =>
8482
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
85-
val formatter = TimestampFormatter(pattern, timeZone, Locale.US)
83+
val formatter = TimestampFormatter(pattern, timeZone)
8684
val timestamp = formatter.format(micros)
8785
val parsed = formatter.parse(timestamp)
8886
assert(micros === parsed)
@@ -103,7 +101,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
103101
"2039-01-01T01:02:03.456789",
104102
"2345-10-07T22:45:03.010203").foreach { timestamp =>
105103
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
106-
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
104+
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
107105
val micros = formatter.parse(timestamp)
108106
val formatted = formatter.format(micros)
109107
assert(timestamp === formatted)
@@ -114,8 +112,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
114112
test(" case insensitive parsing of am and pm") {
115113
val formatter = TimestampFormatter(
116114
"yyyy MMM dd hh:mm:ss a",
117-
TimeZone.getTimeZone("UTC"),
118-
Locale.US)
115+
TimeZone.getTimeZone("UTC"))
119116
val micros = formatter.parse("2009 Mar 20 11:30:01 am")
120117
assert(micros === TimeUnit.SECONDS.toMicros(
121118
LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC)))

sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
2121
import java.sql.{Date, Timestamp}
2222

2323
import org.apache.spark.sql.Row
24-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
24+
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
2525
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types._
@@ -77,6 +77,10 @@ object HiveResult {
7777
TimestampType,
7878
BinaryType)
7979

80+
private lazy val dateFormatter = DateFormatter()
81+
private lazy val timestampFormatter = TimestampFormatter(
82+
DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone))
83+
8084
/** Hive outputs fields of structs slightly differently than top level attributes. */
8185
private def toHiveStructString(a: (Any, DataType)): String = a match {
8286
case (struct: Row, StructType(fields)) =>
@@ -111,11 +115,9 @@ object HiveResult {
111115
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
112116
}.toSeq.sorted.mkString("{", ",", "}")
113117
case (null, _) => "NULL"
114-
case (d: Date, DateType) =>
115-
DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
118+
case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
116119
case (t: Timestamp, TimestampType) =>
117-
val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
118-
DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), timeZone)
120+
DateTimeUtils.timestampToString(timestampFormatter, DateTimeUtils.fromJavaTimestamp(t))
119121
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
120122
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
121123
case (interval, CalendarIntervalType) => interval.toString

0 commit comments

Comments
 (0)