Skip to content

Commit fd2d55c

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-31405][SQL] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files
### What changes were proposed in this pull request? When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not. ### Why are the changes needed? Rebase or not rebase have different behaviors and we should let users decide it explicitly. In most cases, users won't hit this exception as it only affects ancient datetime values. ### Does this PR introduce _any_ user-facing change? Yes, now users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config. ### How was this patch tested? updated tests Closes #28477 from cloud-fan/rebase. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent 42951e6 commit fd2d55c

File tree

27 files changed

+519
-319
lines changed

27 files changed

+519
-319
lines changed

core/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String)
4848
* Exception thrown when Spark returns different result after upgrading to a new version.
4949
*/
5050
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
51-
extends SparkException("You may get a different result due to the upgrading of Spark" +
51+
extends RuntimeException("You may get a different result due to the upgrading of Spark" +
5252
s" $version: $message", cause)

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
3535
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
3636
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
37-
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
37+
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3838
import org.apache.spark.sql.internal.SQLConf
39+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3940
import org.apache.spark.sql.types._
4041
import org.apache.spark.unsafe.types.UTF8String
4142
/**
4243
* A deserializer to deserialize data in avro format to data in catalyst format.
4344
*/
44-
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
45+
class AvroDeserializer(
46+
rootAvroType: Schema,
47+
rootCatalystType: DataType,
48+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
4549

4650
def this(rootAvroType: Schema, rootCatalystType: DataType) {
4751
this(rootAvroType, rootCatalystType,
48-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
52+
LegacyBehaviorPolicy.withName(
53+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
4954
}
5055

5156
private lazy val decimalConversions = new DecimalConversion()
5257

58+
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
59+
datetimeRebaseMode, "Avro")
60+
61+
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
62+
datetimeRebaseMode, "Avro")
63+
5364
private val converter: Any => Any = rootCatalystType match {
5465
// A shortcut for empty schema.
5566
case st: StructType if st.isEmpty =>
@@ -96,36 +107,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
96107
case (INT, IntegerType) => (updater, ordinal, value) =>
97108
updater.setInt(ordinal, value.asInstanceOf[Int])
98109

99-
case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
100-
val days = value.asInstanceOf[Int]
101-
val rebasedDays = rebaseJulianToGregorianDays(days)
102-
updater.setInt(ordinal, rebasedDays)
103-
104110
case (INT, DateType) => (updater, ordinal, value) =>
105-
updater.setInt(ordinal, value.asInstanceOf[Int])
111+
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
106112

107113
case (LONG, LongType) => (updater, ordinal, value) =>
108114
updater.setLong(ordinal, value.asInstanceOf[Long])
109115

110116
case (LONG, TimestampType) => avroType.getLogicalType match {
111117
// For backward compatibility, if the Avro type is Long and it is not logical type
112118
// (the `null` case), the value is processed as timestamp type with millisecond precision.
113-
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
114-
val millis = value.asInstanceOf[Long]
115-
val micros = DateTimeUtils.millisToMicros(millis)
116-
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
117-
updater.setLong(ordinal, rebasedMicros)
118119
case null | _: TimestampMillis => (updater, ordinal, value) =>
119120
val millis = value.asInstanceOf[Long]
120121
val micros = DateTimeUtils.millisToMicros(millis)
121-
updater.setLong(ordinal, micros)
122-
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
123-
val micros = value.asInstanceOf[Long]
124-
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
125-
updater.setLong(ordinal, rebasedMicros)
122+
updater.setLong(ordinal, timestampRebaseFunc(micros))
126123
case _: TimestampMicros => (updater, ordinal, value) =>
127124
val micros = value.asInstanceOf[Long]
128-
updater.setLong(ordinal, micros)
125+
updater.setLong(ordinal, timestampRebaseFunc(micros))
129126
case other => throw new IncompatibleSchemaException(
130127
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
131128
}

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
124124
reader.sync(file.start)
125125
val stop = file.start + file.length
126126

127-
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
128-
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
129-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
130-
}
127+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
128+
reader.asInstanceOf[DataFileReader[_]].getMetaString,
129+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
130+
131131
val deserializer = new AvroDeserializer(
132-
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
132+
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode)
133133

134134
new Iterator[InternalRow] {
135135
private[this] var completed = false

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.execution.datasources.OutputWriter
3535
import org.apache.spark.sql.internal.SQLConf
36+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3637
import org.apache.spark.sql.types._
3738

3839
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
@@ -43,20 +44,24 @@ private[avro] class AvroOutputWriter(
4344
avroSchema: Schema) extends OutputWriter {
4445

4546
// Whether to rebase datetimes from Gregorian to Julian calendar in write
46-
private val rebaseDateTime: Boolean =
47-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
47+
private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
48+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))
4849

4950
// The input rows will never be null.
5051
private lazy val serializer =
51-
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
52+
new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode)
5253

5354
/**
5455
* Overrides the couple of methods responsible for generating the output streams / files so
5556
* that the data can be correctly partitioned
5657
*/
5758
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
5859
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
59-
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
60+
if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
61+
Some(SPARK_LEGACY_DATETIME -> "")
62+
} else {
63+
None
64+
}
6065
}
6166

6267
new SparkAvroKeyOutputFormat(fileMeta.asJava) {

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
3737
import org.apache.spark.sql.catalyst.util.DateTimeUtils
38-
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
38+
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3939
import org.apache.spark.sql.internal.SQLConf
40+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
4041
import org.apache.spark.sql.types._
4142

4243
/**
@@ -46,17 +47,24 @@ class AvroSerializer(
4647
rootCatalystType: DataType,
4748
rootAvroType: Schema,
4849
nullable: Boolean,
49-
rebaseDateTime: Boolean) extends Logging {
50+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
5051

5152
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
5253
this(rootCatalystType, rootAvroType, nullable,
53-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
54+
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
55+
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
5456
}
5557

5658
def serialize(catalystData: Any): Any = {
5759
converter.apply(catalystData)
5860
}
5961

62+
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
63+
datetimeRebaseMode, "Avro")
64+
65+
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
66+
datetimeRebaseMode, "Avro")
67+
6068
private val converter: Any => Any = {
6169
val actualAvroType = resolveNullableType(rootAvroType, nullable)
6270
val baseConverter = rootCatalystType match {
@@ -146,24 +154,16 @@ class AvroSerializer(
146154
case (BinaryType, BYTES) =>
147155
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
148156

149-
case (DateType, INT) if rebaseDateTime =>
150-
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))
151-
152157
case (DateType, INT) =>
153-
(getter, ordinal) => getter.getInt(ordinal)
158+
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
154159

155160
case (TimestampType, LONG) => avroType.getLogicalType match {
156161
// For backward compatibility, if the Avro type is Long and it is not logical type
157162
// (the `null` case), output the timestamp value as with millisecond precision.
158-
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
159-
val micros = getter.getLong(ordinal)
160-
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
161-
DateTimeUtils.microsToMillis(rebasedMicros)
162163
case null | _: TimestampMillis => (getter, ordinal) =>
163-
DateTimeUtils.microsToMillis(getter.getLong(ordinal))
164-
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
165-
rebaseGregorianToJulianMicros(getter.getLong(ordinal))
166-
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
164+
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
165+
case _: TimestampMicros => (getter, ordinal) =>
166+
timestampRebaseFunc(getter.getLong(ordinal))
167167
case other => throw new IncompatibleSchemaException(
168168
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
169169
}

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
8888
reader.sync(partitionedFile.start)
8989
val stop = partitionedFile.start + partitionedFile.length
9090

91-
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
92-
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
93-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
94-
}
91+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
92+
reader.asInstanceOf[DataFileReader[_]].getMetaString,
93+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
9594
val deserializer = new AvroDeserializer(
96-
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
95+
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode)
9796

9897
val fileReader = new PartitionReader[InternalRow] {
9998
private[this] var completed = false

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
288288
""".stripMargin
289289
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
290290
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
291-
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
291+
val deserializer = new AvroDeserializer(avroSchema, dataType)
292292

293293
def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
294294
assert(checkResult(

0 commit comments

Comments
 (0)