Skip to content

Commit 7299961

Browse files
sameeragarwalyhuai
authored andcommitted
[SPARK-14016][SQL] Support high-precision decimals in vectorized parquet reader
## What changes were proposed in this pull request? This patch adds support for reading `DecimalTypes` with high (> 18) precision in `VectorizedColumnReader` ## How was this patch tested? 1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()` that made us fall back on parquet-mr for handling high-precision decimals. This condition is now removed. 2. In particular, the `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `DecimalType(25, 5)`) fails when the gating condition is removed (#11808) and should now pass with this change. Author: Sameer Agarwal <sameer@databricks.com> Closes #11869 from sameeragarwal/bigdecimal-parquet.
1 parent 43ef1e5 commit 7299961

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
262262
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
263263
column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
264264
}
265+
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
266+
for (int i = rowId; i < rowId + num; ++i) {
267+
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
268+
column.putByteArray(i, v.getBytes());
269+
}
265270
} else {
266271
throw new NotImplementedException();
267272
}
@@ -368,6 +373,14 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
368373
column.putNull(rowId + i);
369374
}
370375
}
376+
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
377+
for (int i = 0; i < num; i++) {
378+
if (defColumn.readInteger() == maxDefLevel) {
379+
column.putByteArray(rowId + i, data.readBinary(arrayLen).getBytes());
380+
} else {
381+
column.putNull(rowId + i);
382+
}
383+
}
371384
} else {
372385
throw new NotImplementedException("Unimplemented type: " + column.dataType());
373386
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,6 @@ private void initializeInternal() throws IOException {
220220
originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) {
221221
throw new IOException("Unsupported type: " + t);
222222
}
223-
if (originalTypes[i] == OriginalType.DECIMAL &&
224-
primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()) {
225-
throw new IOException("Decimal with high precision is not supported.");
226-
}
227223
if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
228224
throw new IOException("Int96 not supported.");
229225
}

0 commit comments

Comments
 (0)