Skip to content

[SPARK-31405][SQL] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files #28477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String)
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
extends SparkException("You may get a different result due to the upgrading of Spark" +
extends RuntimeException("You may get a different result due to the upgrading of Spark" +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to throw this exception in vectorized parquet reader, which only allows IOException to be thrown. So change it to unchecked exception. cc @xuanyuanking

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy that.

s" $version: $message", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize data in avro format to data in catalyst format.
*/
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
class AvroDeserializer(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {

def this(rootAvroType: Schema, rootCatalystType: DataType) {
this(rootAvroType, rootCatalystType,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
LegacyBehaviorPolicy.withName(
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
}

private lazy val decimalConversions = new DecimalConversion()

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Expand Down Expand Up @@ -96,36 +107,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
val days = value.asInstanceOf[Int]
val rebasedDays = rebaseJulianToGregorianDays(days)
updater.setInt(ordinal, rebasedDays)

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])

case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
updater.setLong(ordinal, micros)
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, micros)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
reader.sync(file.start)
val stop = file.start + file.length

val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))

val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode)

new Iterator[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
Expand All @@ -43,20 +44,24 @@ private[avro] class AvroOutputWriter(
avroSchema: Schema) extends OutputWriter {

// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean =
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))

// The input rows will never be null.
private lazy val serializer =
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode)

/**
* Overrides the couple of methods responsible for generating the output streams / files so
* that the data can be correctly partitioned
*/
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
Some(SPARK_LEGACY_DATETIME -> "")
} else {
None
}
}

new SparkAvroKeyOutputFormat(fileMeta.asJava) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._

/**
Expand All @@ -46,17 +47,24 @@ class AvroSerializer(
rootCatalystType: DataType,
rootAvroType: Schema,
nullable: Boolean,
rebaseDateTime: Boolean) extends Logging {
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to add a type alias to LegacyBehaviorPolicy:

  object LegacyBehaviorPolicy extends Enumeration {
    type LegacyBehaviorPolicy = Value
    val EXCEPTION, LEGACY, CORRECTED = Value
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't help match, as we need to access both object LegacyBehaviorPolicy and type LegacyBehaviorPolicy and we still need some prefix to distinguish them.


def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
this(rootCatalystType, rootAvroType, nullable,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
Expand Down Expand Up @@ -146,24 +154,16 @@ class AvroSerializer(
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))

case (DateType, INT) if rebaseDateTime =>
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))

case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))

case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
DateTimeUtils.microsToMillis(rebasedMicros)
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(getter.getLong(ordinal))
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
rebaseGregorianToJulianMicros(getter.getLong(ordinal))
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
case _: TimestampMicros => (getter, ordinal) =>
timestampRebaseFunc(getter.getLong(ordinal))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
reader.sync(partitionedFile.start)
val stop = partitionedFile.start + partitionedFile.length

val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode)

val fileReader = new PartitionReader[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
""".stripMargin
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
val deserializer = new AvroDeserializer(avroSchema, dataType)

def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
assert(checkResult(
Expand Down
Loading