-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-31405][SQL] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files #28477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5b5cda2
2915675
2bc23b2
fc31eea
7392e00
c8920ca
f906a54
eb61edb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging | |
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow} | ||
import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
import org.apache.spark.sql.catalyst.util.RebaseDateTime._ | ||
import org.apache.spark.sql.execution.datasources.DataSourceUtils | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
|
@@ -46,17 +47,24 @@ class AvroSerializer( | |
rootCatalystType: DataType, | ||
rootAvroType: Schema, | ||
nullable: Boolean, | ||
rebaseDateTime: Boolean) extends Logging { | ||
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about to add a type alias to LegacyBehaviorPolicy: object LegacyBehaviorPolicy extends Enumeration {
type LegacyBehaviorPolicy = Value
val EXCEPTION, LEGACY, CORRECTED = Value
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't help match, as we need to access both |
||
|
||
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) { | ||
this(rootCatalystType, rootAvroType, nullable, | ||
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)) | ||
LegacyBehaviorPolicy.withName(SQLConf.get.getConf( | ||
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))) | ||
} | ||
|
||
def serialize(catalystData: Any): Any = { | ||
converter.apply(catalystData) | ||
} | ||
|
||
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite( | ||
datetimeRebaseMode, "Avro") | ||
|
||
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( | ||
datetimeRebaseMode, "Avro") | ||
|
||
private val converter: Any => Any = { | ||
val actualAvroType = resolveNullableType(rootAvroType, nullable) | ||
val baseConverter = rootCatalystType match { | ||
|
@@ -146,24 +154,16 @@ class AvroSerializer( | |
case (BinaryType, BYTES) => | ||
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) | ||
|
||
case (DateType, INT) if rebaseDateTime => | ||
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal)) | ||
|
||
case (DateType, INT) => | ||
(getter, ordinal) => getter.getInt(ordinal) | ||
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal)) | ||
|
||
case (TimestampType, LONG) => avroType.getLogicalType match { | ||
// For backward compatibility, if the Avro type is Long and it is not logical type | ||
// (the `null` case), output the timestamp value as with millisecond precision. | ||
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) => | ||
val micros = getter.getLong(ordinal) | ||
val rebasedMicros = rebaseGregorianToJulianMicros(micros) | ||
DateTimeUtils.microsToMillis(rebasedMicros) | ||
case null | _: TimestampMillis => (getter, ordinal) => | ||
DateTimeUtils.microsToMillis(getter.getLong(ordinal)) | ||
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) => | ||
rebaseGregorianToJulianMicros(getter.getLong(ordinal)) | ||
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) | ||
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) | ||
case _: TimestampMicros => (getter, ordinal) => | ||
timestampRebaseFunc(getter.getLong(ordinal)) | ||
case other => throw new IncompatibleSchemaException( | ||
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to throw this exception in vectorized parquet reader, which only allows IOException to be thrown. So change it to unchecked exception. cc @xuanyuanking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy that.