Skip to content

Commit 2a059e6

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-30788][SQL] Support SimpleDateFormat and FastDateFormat as legacy date/timestamp formatters
### What changes were proposed in this pull request? In the PR, I propose to add legacy date/timestamp formatters based on `SimpleDateFormat` and `FastDateFormat`: - `LegacyFastTimestampFormatter` - uses `FastDateFormat` and supports parsing/formatting in microsecond precision. The code was borrowed from Spark 2.4, see #26507 & #26582 - `LegacySimpleTimestampFormatter` uses `SimpleDateFormat`, and support the `lenient` mode. When the `lenient` parameter is set to `false`, the parser become much stronger in checking its input. ### Why are the changes needed? Spark 2.4.x uses the following parsers for parsing/formatting date/timestamp strings: - `DateTimeFormat` in CSV/JSON datasource - `SimpleDateFormat` - is used in JDBC datasource, in partitions parsing. - `SimpleDateFormat` in strong mode (`lenient = false`), see https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L124. It is used by the `date_format`, `from_unixtime`, `unix_timestamp` and `to_unix_timestamp` functions. The PR aims to make Spark 3.0 compatible with Spark 2.4.x in all those cases when `spark.sql.legacy.timeParser.enabled` is set to `true`. ### Does this PR introduce any user-facing change? This shouldn't change behavior with default settings. If `spark.sql.legacy.timeParser.enabled` is set to `true`, users should observe behavior of Spark 2.4. ### How was this patch tested? - Modified tests in `DateExpressionsSuite` to check the legacy parser - `SimpleDateFormat`. - Added `CSVLegacyTimeParserSuite` and `JsonLegacyTimeParserSuite` to run `CSVSuite` and `JsonSuite` with the legacy parser - `FastDateFormat`. Closes #27524 from MaxGekk/timestamp-formatter-legacy-fallback. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit c198620) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 7c5d7d7 commit 2a059e6

File tree

19 files changed

+654
-419
lines changed

19 files changed

+654
-419
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.util.control.Exception.allCatch
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2626
import org.apache.spark.sql.catalyst.expressions.ExprUtils
27+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
2728
import org.apache.spark.sql.catalyst.util.TimestampFormatter
2829
import org.apache.spark.sql.types._
2930

@@ -32,7 +33,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
3233
private val timestampParser = TimestampFormatter(
3334
options.timestampFormat,
3435
options.zoneId,
35-
options.locale)
36+
options.locale,
37+
legacyFormat = FAST_DATE_FORMAT)
3638

3739
private val decimalParser = if (options.locale == Locale.US) {
3840
// Special handling the default locale for backward compatibility

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,10 @@ class CSVOptions(
146146
// A language tag in IETF BCP 47 format
147147
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
148148

149-
val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd")
149+
val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)
150150

151151
val timestampFormat: String =
152-
parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")
152+
parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")
153153

154154
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
155155

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter
2323

2424
import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
26+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
2627
import org.apache.spark.sql.types._
2728

2829
class UnivocityGenerator(
@@ -44,11 +45,13 @@ class UnivocityGenerator(
4445
private val timestampFormatter = TimestampFormatter(
4546
options.timestampFormat,
4647
options.zoneId,
47-
options.locale)
48+
options.locale,
49+
legacyFormat = FAST_DATE_FORMAT)
4850
private val dateFormatter = DateFormatter(
4951
options.dateFormat,
5052
options.zoneId,
51-
options.locale)
53+
options.locale,
54+
legacyFormat = FAST_DATE_FORMAT)
5255

5356
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
5457
case DateType =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.InternalRow
2828
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
2929
import org.apache.spark.sql.catalyst.util._
30+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
3031
import org.apache.spark.sql.sources.Filter
3132
import org.apache.spark.sql.types._
3233
import org.apache.spark.unsafe.types.UTF8String
@@ -86,11 +87,13 @@ class UnivocityParser(
8687
private val timestampFormatter = TimestampFormatter(
8788
options.timestampFormat,
8889
options.zoneId,
89-
options.locale)
90+
options.locale,
91+
legacyFormat = FAST_DATE_FORMAT)
9092
private val dateFormatter = DateFormatter(
9193
options.dateFormat,
9294
options.zoneId,
93-
options.locale)
95+
options.locale,
96+
legacyFormat = FAST_DATE_FORMAT)
9497

9598
private val csvFilters = new CSVFilters(filters, requiredSchema)
9699

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ import org.apache.spark.sql.AnalysisException
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.expressions.codegen._
3232
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
33-
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
33+
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, LegacyDateFormats, TimestampFormatter}
3434
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
3535
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
36+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.SIMPLE_DATE_FORMAT
3637
import org.apache.spark.sql.types._
3738
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
3839

@@ -622,13 +623,15 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
622623

623624
@transient private lazy val formatter: Option[TimestampFormatter] = {
624625
if (right.foldable) {
625-
Option(right.eval()).map(format => TimestampFormatter(format.toString, zoneId))
626+
Option(right.eval()).map { format =>
627+
TimestampFormatter(format.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
628+
}
626629
} else None
627630
}
628631

629632
override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
630633
val tf = if (formatter.isEmpty) {
631-
TimestampFormatter(format.toString, zoneId)
634+
TimestampFormatter(format.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
632635
} else {
633636
formatter.get
634637
}
@@ -643,10 +646,14 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
643646
})
644647
}.getOrElse {
645648
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
649+
val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$")
646650
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
647651
defineCodeGen(ctx, ev, (timestamp, format) => {
648-
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $zid)
649-
.format($timestamp))"""
652+
s"""|UTF8String.fromString($tf$$.MODULE$$.apply(
653+
| $format.toString(),
654+
| $zid,
655+
| $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT())
656+
|.format($timestamp))""".stripMargin
650657
})
651658
}
652659
}
@@ -688,7 +695,7 @@ case class ToUnixTimestamp(
688695
copy(timeZoneId = Option(timeZoneId))
689696

690697
def this(time: Expression) = {
691-
this(time, Literal("uuuu-MM-dd HH:mm:ss"))
698+
this(time, Literal(TimestampFormatter.defaultPattern))
692699
}
693700

694701
override def prettyName: String = "to_unix_timestamp"
@@ -732,7 +739,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op
732739
copy(timeZoneId = Option(timeZoneId))
733740

734741
def this(time: Expression) = {
735-
this(time, Literal("uuuu-MM-dd HH:mm:ss"))
742+
this(time, Literal(TimestampFormatter.defaultPattern))
736743
}
737744

738745
def this() = {
@@ -758,7 +765,7 @@ abstract class ToTimestamp
758765
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
759766
private lazy val formatter: TimestampFormatter =
760767
try {
761-
TimestampFormatter(constFormat.toString, zoneId)
768+
TimestampFormatter(constFormat.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
762769
} catch {
763770
case NonFatal(_) => null
764771
}
@@ -791,8 +798,8 @@ abstract class ToTimestamp
791798
} else {
792799
val formatString = f.asInstanceOf[UTF8String].toString
793800
try {
794-
TimestampFormatter(formatString, zoneId).parse(
795-
t.asInstanceOf[UTF8String].toString) / downScaleFactor
801+
TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
802+
.parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
796803
} catch {
797804
case NonFatal(_) => null
798805
}
@@ -831,13 +838,16 @@ abstract class ToTimestamp
831838
}
832839
case StringType =>
833840
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
834-
val locale = ctx.addReferenceObj("locale", Locale.US)
835841
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
842+
val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$")
836843
nullSafeCodeGen(ctx, ev, (string, format) => {
837844
s"""
838845
try {
839-
${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, $locale)
840-
.parse($string.toString()) / $downScaleFactor;
846+
${ev.value} = $tf$$.MODULE$$.apply(
847+
$format.toString(),
848+
$zid,
849+
$ldf$$.MODULE$$.SIMPLE_DATE_FORMAT())
850+
.parse($string.toString()) / $downScaleFactor;
841851
} catch (java.lang.IllegalArgumentException e) {
842852
${ev.isNull} = true;
843853
} catch (java.text.ParseException e) {
@@ -908,7 +918,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
908918
override def prettyName: String = "from_unixtime"
909919

910920
def this(unix: Expression) = {
911-
this(unix, Literal("uuuu-MM-dd HH:mm:ss"))
921+
this(unix, Literal(TimestampFormatter.defaultPattern))
912922
}
913923

914924
override def dataType: DataType = StringType
@@ -922,7 +932,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
922932
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
923933
private lazy val formatter: TimestampFormatter =
924934
try {
925-
TimestampFormatter(constFormat.toString, zoneId)
935+
TimestampFormatter(constFormat.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
926936
} catch {
927937
case NonFatal(_) => null
928938
}
@@ -948,8 +958,9 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
948958
null
949959
} else {
950960
try {
951-
UTF8String.fromString(TimestampFormatter(f.toString, zoneId)
952-
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
961+
UTF8String.fromString(
962+
TimestampFormatter(f.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
963+
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
953964
} catch {
954965
case NonFatal(_) => null
955966
}
@@ -980,13 +991,14 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
980991
}
981992
} else {
982993
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
983-
val locale = ctx.addReferenceObj("locale", Locale.US)
984994
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
995+
val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$")
985996
nullSafeCodeGen(ctx, ev, (seconds, f) => {
986997
s"""
987998
try {
988-
${ev.value} = UTF8String.fromString($tf$$.MODULE$$.apply($f.toString(), $zid, $locale).
989-
format($seconds * 1000000L));
999+
${ev.value} = UTF8String.fromString(
1000+
$tf$$.MODULE$$.apply($f.toString(), $zid, $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT())
1001+
.format($seconds * 1000000L));
9901002
} catch (java.lang.IllegalArgumentException e) {
9911003
${ev.isNull} = true;
9921004
}"""

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ private[sql] class JSONOptions(
8888
val zoneId: ZoneId = DateTimeUtils.getZoneId(
8989
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
9090

91-
val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd")
91+
val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)
9292

9393
val timestampFormat: String =
94-
parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")
94+
parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")
9595

9696
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
9797

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._
2424
import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
2626
import org.apache.spark.sql.catalyst.util._
27+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
2728
import org.apache.spark.sql.types._
2829

2930
/**
@@ -80,11 +81,13 @@ private[sql] class JacksonGenerator(
8081
private val timestampFormatter = TimestampFormatter(
8182
options.timestampFormat,
8283
options.zoneId,
83-
options.locale)
84+
options.locale,
85+
legacyFormat = FAST_DATE_FORMAT)
8486
private val dateFormatter = DateFormatter(
8587
options.dateFormat,
8688
options.zoneId,
87-
options.locale)
89+
options.locale,
90+
legacyFormat = FAST_DATE_FORMAT)
8891

8992
private def makeWriter(dataType: DataType): ValueWriter = dataType match {
9093
case NullType =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.util._
33+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
3334
import org.apache.spark.sql.internal.SQLConf
3435
import org.apache.spark.sql.types._
3536
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -58,11 +59,13 @@ class JacksonParser(
5859
private val timestampFormatter = TimestampFormatter(
5960
options.timestampFormat,
6061
options.zoneId,
61-
options.locale)
62+
options.locale,
63+
legacyFormat = FAST_DATE_FORMAT)
6264
private val dateFormatter = DateFormatter(
6365
options.dateFormat,
6466
options.zoneId,
65-
options.locale)
67+
options.locale,
68+
legacyFormat = FAST_DATE_FORMAT)
6669

6770
/**
6871
* Create a converter which converts the JSON documents held by the `JsonParser`

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2929
import org.apache.spark.sql.catalyst.expressions.ExprUtils
3030
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
3131
import org.apache.spark.sql.catalyst.util._
32+
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
3233
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.types._
3435
import org.apache.spark.util.Utils
@@ -40,7 +41,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
4041
private val timestampFormatter = TimestampFormatter(
4142
options.timestampFormat,
4243
options.zoneId,
43-
options.locale)
44+
options.locale,
45+
legacyFormat = FAST_DATE_FORMAT)
4446

4547
/**
4648
* Infer the type of a collection of json records in three stages:

0 commit comments

Comments
 (0)