Skip to content

Commit fc31eea

Browse files
committed
fix conflicts
1 parent 2bc23b2 commit fc31eea

File tree

2 files changed

+21
-13
lines changed

2 files changed

+21
-13
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,11 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName
159159
boolean isSupported = false;
160160
switch (typeName) {
161161
case INT32:
162-
isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
162+
isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode);
163163
break;
164164
case INT64:
165165
if (originalType == OriginalType.TIMESTAMP_MICROS) {
166-
isSupported = !rebaseDateTime;
166+
isSupported = "CORRECTED".equals(datetimeRebaseMode);
167167
} else {
168168
isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
169169
}
@@ -286,7 +286,7 @@ private void decodeDictionaryIds(
286286
case INT32:
287287
if (column.dataType() == DataTypes.IntegerType ||
288288
DecimalType.is32BitDecimalType(column.dataType()) ||
289-
(column.dataType() == DataTypes.DateType && !rebaseDateTime)) {
289+
(column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) {
290290
for (int i = rowId; i < rowId + num; ++i) {
291291
if (!column.isNullAt(i)) {
292292
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
@@ -305,9 +305,13 @@ private void decodeDictionaryIds(
305305
}
306306
}
307307
} else if (column.dataType() == DataTypes.DateType) {
308+
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
308309
for (int i = rowId; i < rowId + num; ++i) {
309310
if (!column.isNullAt(i)) {
310311
int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
312+
if (failIfRebase && julianDays < RebaseDateTime.lastSwitchJulianDay()) {
313+
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
314+
}
311315
int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
312316
column.putInt(i, gregorianDays);
313317
}
@@ -320,27 +324,31 @@ private void decodeDictionaryIds(
320324
case INT64:
321325
if (column.dataType() == DataTypes.LongType ||
322326
DecimalType.is64BitDecimalType(column.dataType()) ||
323-
(originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) {
327+
(originalType == OriginalType.TIMESTAMP_MICROS && "CORRECTED".equals(datetimeRebaseMode))) {
324328
for (int i = rowId; i < rowId + num; ++i) {
325329
if (!column.isNullAt(i)) {
326330
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
327331
}
328332
}
329333
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
330-
if (rebaseDateTime) {
334+
if ("CORRECTED".equals(datetimeRebaseMode)) {
331335
for (int i = rowId; i < rowId + num; ++i) {
332336
if (!column.isNullAt(i)) {
333-
long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
334-
long julianMicros = DateTimeUtils.millisToMicros(julianMillis);
335-
long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
336-
column.putLong(i, gregorianMicros);
337+
long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
338+
column.putLong(i, DateTimeUtils.millisToMicros(gregorianMillis));
337339
}
338340
}
339341
} else {
342+
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
340343
for (int i = rowId; i < rowId + num; ++i) {
341344
if (!column.isNullAt(i)) {
342-
long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
343-
column.putLong(i, DateTimeUtils.millisToMicros(gregorianMillis));
345+
long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
346+
long julianMicros = DateTimeUtils.millisToMicros(julianMillis);
347+
if (failIfRebase && julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
348+
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
349+
}
350+
long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
351+
column.putLong(i, gregorianMicros);
344352
}
345353
}
346354
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,8 +1029,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
10291029

10301030
Seq(false, true).foreach { vectorized =>
10311031
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
1032-
// The file metadata indicates if it needs rebase or not, so we can always get the correct
1033-
// result regardless of the "rebase mode" config.
1032+
// The file metadata indicates if it needs rebase or not, so we can always get the
1033+
// correct result regardless of the "rebase mode" config.
10341034
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
10351035
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
10361036
checkAnswer(

0 commit comments

Comments
 (0)