-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support #35262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7a8b41c
2c73794
52df517
0011bab
50ed815
3dc340a
ca20068
6ad1dbe
6f364d9
80a4ceb
406d176
be62ad6
166afe1
0583e2f
31eee9f
9eaf387
1fc0060
d95100a
6d273f0
1d15022
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,8 @@ | |
import java.util.Set; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import org.apache.parquet.VersionParser; | ||
import org.apache.parquet.VersionParser.ParsedVersion; | ||
import org.apache.parquet.column.page.PageReadStore; | ||
import scala.Option; | ||
|
||
|
@@ -69,6 +71,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo | |
protected MessageType fileSchema; | ||
protected MessageType requestedSchema; | ||
protected StructType sparkSchema; | ||
// Keep track of the version of the parquet writer. An older version wrote | ||
// corrupt delta byte arrays, and the version check is needed to detect that. | ||
protected ParsedVersion writerVersion; | ||
|
||
/** | ||
* The total number of rows this RecordReader will eventually read. The sum of the | ||
|
@@ -93,6 +98,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont | |
HadoopInputFile.fromPath(file, configuration), options); | ||
this.reader = new ParquetRowGroupReaderImpl(fileReader); | ||
this.fileSchema = fileReader.getFileMetaData().getSchema(); | ||
try { | ||
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy()); | ||
} catch (Exception e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will other types of exceptions be thrown here, except There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well yes. I encountered at least one case where the version information was empty and the version check threw a NPE. |
||
// Swallow any exception, if we cannot parse the version we will revert to a sequential read | ||
// if the column is a delta byte array encoding (due to PARQUET-246). | ||
} | ||
Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData(); | ||
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration)); | ||
ReadSupport.ReadContext readContext = readSupport.init(new InitContext( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,13 +21,16 @@ | |
import java.time.ZoneId; | ||
import java.util.PrimitiveIterator; | ||
|
||
import org.apache.parquet.CorruptDeltaByteArrays; | ||
import org.apache.parquet.VersionParser.ParsedVersion; | ||
import org.apache.parquet.bytes.ByteBufferInputStream; | ||
import org.apache.parquet.bytes.BytesInput; | ||
import org.apache.parquet.bytes.BytesUtils; | ||
import org.apache.parquet.column.ColumnDescriptor; | ||
import org.apache.parquet.column.Dictionary; | ||
import org.apache.parquet.column.Encoding; | ||
import org.apache.parquet.column.page.*; | ||
import org.apache.parquet.column.values.RequiresPreviousReader; | ||
import org.apache.parquet.column.values.ValuesReader; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; | ||
|
@@ -86,6 +89,7 @@ public class VectorizedColumnReader { | |
private final ColumnDescriptor descriptor; | ||
private final LogicalTypeAnnotation logicalTypeAnnotation; | ||
private final String datetimeRebaseMode; | ||
private final ParsedVersion writerVersion; | ||
|
||
public VectorizedColumnReader( | ||
ColumnDescriptor descriptor, | ||
|
@@ -96,7 +100,8 @@ public VectorizedColumnReader( | |
String datetimeRebaseMode, | ||
String datetimeRebaseTz, | ||
String int96RebaseMode, | ||
String int96RebaseTz) throws IOException { | ||
String int96RebaseTz, | ||
ParsedVersion writerVersion) throws IOException { | ||
this.descriptor = descriptor; | ||
this.pageReader = pageReader; | ||
this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes); | ||
|
@@ -129,6 +134,7 @@ public VectorizedColumnReader( | |
this.datetimeRebaseMode = datetimeRebaseMode; | ||
assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) || | ||
"CORRECTED".equals(int96RebaseMode); | ||
this.writerVersion = writerVersion; | ||
} | ||
|
||
private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) { | ||
|
@@ -259,6 +265,7 @@ private void initDataReader( | |
int pageValueCount, | ||
Encoding dataEncoding, | ||
ByteBufferInputStream in) throws IOException { | ||
ValuesReader previousReader = this.dataColumn; | ||
if (dataEncoding.usesDictionary()) { | ||
this.dataColumn = null; | ||
if (dictionary == null) { | ||
|
@@ -283,6 +290,12 @@ private void initDataReader( | |
} catch (IOException e) { | ||
throw new IOException("could not read page in col " + descriptor, e); | ||
} | ||
// for PARQUET-246 (See VectorizedDeltaByteArrayReader.setPreviousValues) | ||
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does this happen? Can you add a comment on why we need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment. Detailed explanation is in the comment in |
||
previousReader instanceof RequiresPreviousReader) { | ||
// previousReader can only be set if reading sequentially | ||
((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader); | ||
} | ||
} | ||
|
||
private ValuesReader getValuesReader(Encoding encoding) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,13 +90,19 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce | |
Preconditions.checkArgument(miniSize % 8 == 0, | ||
"miniBlockSize must be multiple of 8, but it's " + miniSize); | ||
this.miniBlockSizeInValues = (int) miniSize; | ||
// True value count. May be less than valueCount because of nulls | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be more useful to annotate the method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the comment to |
||
this.totalValueCount = BytesUtils.readUnsignedVarInt(in); | ||
this.bitWidths = new int[miniBlockNumInABlock]; | ||
this.unpackedValuesBuffer = new long[miniBlockSizeInValues]; | ||
// read the first value | ||
firstValue = BytesUtils.readZigZagVarLong(in); | ||
} | ||
|
||
// True value count. May be less than valueCount because of nulls | ||
int getTotalValueCount() { | ||
return totalValueCount; | ||
} | ||
|
||
@Override | ||
public byte readByte() { | ||
readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,50 +16,127 @@ | |
*/ | ||
package org.apache.spark.sql.execution.datasources.parquet; | ||
|
||
import static org.apache.spark.sql.types.DataTypes.BinaryType; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a clear import order definition for static import ? @sunchao @dongjoon-hyun |
||
import static org.apache.spark.sql.types.DataTypes.IntegerType; | ||
|
||
import org.apache.parquet.bytes.ByteBufferInputStream; | ||
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; | ||
import org.apache.parquet.column.values.RequiresPreviousReader; | ||
import org.apache.parquet.column.values.ValuesReader; | ||
import org.apache.parquet.io.api.Binary; | ||
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; | ||
import org.apache.spark.sql.execution.vectorized.WritableColumnVector; | ||
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface. | ||
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized | ||
* interface. | ||
*/ | ||
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase { | ||
private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader(); | ||
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase | ||
implements VectorizedValuesReader, RequiresPreviousReader { | ||
|
||
private final VectorizedDeltaBinaryPackedReader prefixLengthReader; | ||
private final VectorizedDeltaLengthByteArrayReader suffixReader; | ||
private WritableColumnVector prefixLengthVector; | ||
private ByteBuffer previous; | ||
private int currentRow = 0; | ||
|
||
// Temporary variable used by readBinary | ||
private final WritableColumnVector binaryValVector; | ||
// Temporary variable used by skipBinary | ||
private final WritableColumnVector tempBinaryValVector; | ||
|
||
VectorizedDeltaByteArrayReader() { | ||
this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader(); | ||
this.suffixReader = new VectorizedDeltaLengthByteArrayReader(); | ||
binaryValVector = new OnHeapColumnVector(1, BinaryType); | ||
tempBinaryValVector = new OnHeapColumnVector(1, BinaryType); | ||
} | ||
|
||
@Override | ||
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { | ||
deltaByteArrayReader.initFromPage(valueCount, in); | ||
prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType); | ||
sunchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
prefixLengthReader.initFromPage(valueCount, in); | ||
prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(), | ||
prefixLengthVector, 0); | ||
suffixReader.initFromPage(valueCount, in); | ||
} | ||
|
||
@Override | ||
public Binary readBinary(int len) { | ||
return deltaByteArrayReader.readBytes(); | ||
readValues(1, binaryValVector, 0); | ||
return Binary.fromConstantByteArray(binaryValVector.getBinary(0)); | ||
} | ||
|
||
@Override | ||
public void readBinary(int total, WritableColumnVector c, int rowId) { | ||
private void readValues(int total, WritableColumnVector c, int rowId) { | ||
for (int i = 0; i < total; i++) { | ||
Binary binary = deltaByteArrayReader.readBytes(); | ||
ByteBuffer buffer = binary.toByteBuffer(); | ||
if (buffer.hasArray()) { | ||
c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), | ||
binary.length()); | ||
} else { | ||
byte[] bytes = new byte[binary.length()]; | ||
buffer.get(bytes); | ||
c.putByteArray(rowId + i, bytes); | ||
// NOTE: due to PARQUET-246, it is important that we | ||
sunchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// respect prefixLength which was read from prefixLengthReader, | ||
// even for the *first* value of a page. Even though the first | ||
// value of the page should have an empty prefix, it may not | ||
// because of PARQUET-246. | ||
int prefixLength = prefixLengthVector.getInt(currentRow); | ||
ByteBuffer suffix = suffixReader.getBytes(currentRow); | ||
byte[] suffixArray = suffix.array(); | ||
int suffixLength = suffix.limit() - suffix.position(); | ||
int length = prefixLength + suffixLength; | ||
|
||
// We have to do this to materialize the output | ||
WritableColumnVector arrayData = c.arrayData(); | ||
int offset = arrayData.getElementsAppended(); | ||
if (prefixLength != 0) { | ||
arrayData.appendBytes(prefixLength, previous.array(), previous.position()); | ||
} | ||
arrayData.appendBytes(suffixLength, suffixArray, suffix.position()); | ||
c.putArray(rowId + i, offset, length); | ||
previous = arrayData.getByteBuffer(offset, length); | ||
currentRow++; | ||
} | ||
} | ||
|
||
@Override | ||
public void readBinary(int total, WritableColumnVector c, int rowId) { | ||
readValues(total, c, rowId); | ||
} | ||
|
||
/** | ||
* There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not clear the | ||
* previous value state that it tracks internally. This resulted in the first value of all pages | ||
sunchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* (except for the first page) to be a delta from the last value of the previous page. In order to | ||
* read corrupted files written with this bug, when reading a new page we need to recover the | ||
* previous page's last value to use it (if needed) to read the first value. | ||
*/ | ||
public void setPreviousReader(ValuesReader reader) { | ||
if (reader != null) { | ||
this.previous = ((VectorizedDeltaByteArrayReader) reader).previous; | ||
} | ||
} | ||
|
||
@Override | ||
public void skipBinary(int total) { | ||
WritableColumnVector c1 = tempBinaryValVector; | ||
WritableColumnVector c2 = binaryValVector; | ||
|
||
for (int i = 0; i < total; i++) { | ||
deltaByteArrayReader.skip(); | ||
int prefixLength = prefixLengthVector.getInt(currentRow); | ||
ByteBuffer suffix = suffixReader.getBytes(currentRow); | ||
byte[] suffixArray = suffix.array(); | ||
int suffixLength = suffix.limit() - suffix.position(); | ||
int length = prefixLength + suffixLength; | ||
|
||
WritableColumnVector arrayData = c1.arrayData(); | ||
c1.reset(); | ||
if (prefixLength != 0) { | ||
arrayData.appendBytes(prefixLength, previous.array(), previous.position()); | ||
} | ||
arrayData.appendBytes(suffixLength, suffixArray, suffix.position()); | ||
previous = arrayData.getByteBuffer(0, length); | ||
currentRow++; | ||
|
||
WritableColumnVector tmp = c1; | ||
c1 = c2; | ||
c2 = tmp; | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.sql.execution.datasources.parquet; | ||
|
||
import static org.apache.spark.sql.types.DataTypes.IntegerType; | ||
|
||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import org.apache.parquet.bytes.ByteBufferInputStream; | ||
import org.apache.parquet.io.ParquetDecodingException; | ||
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; | ||
import org.apache.spark.sql.execution.vectorized.WritableColumnVector; | ||
|
||
/** | ||
* An implementation of the Parquet DELTA_LENGTH_BYTE_ARRAY decoder that supports the vectorized | ||
* interface. | ||
*/ | ||
public class VectorizedDeltaLengthByteArrayReader extends VectorizedReaderBase implements | ||
VectorizedValuesReader { | ||
|
||
private final VectorizedDeltaBinaryPackedReader lengthReader; | ||
private ByteBufferInputStream in; | ||
private WritableColumnVector lengthsVector; | ||
private int currentRow = 0; | ||
|
||
VectorizedDeltaLengthByteArrayReader() { | ||
lengthReader = new VectorizedDeltaBinaryPackedReader(); | ||
} | ||
|
||
@Override | ||
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { | ||
lengthsVector = new OnHeapColumnVector(valueCount, IntegerType); | ||
lengthReader.initFromPage(valueCount, in); | ||
lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0); | ||
this.in = in.remainingStream(); | ||
} | ||
|
||
@Override | ||
public void readBinary(int total, WritableColumnVector c, int rowId) { | ||
ByteBuffer buffer; | ||
ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer; | ||
int length; | ||
for (int i = 0; i < total; i++) { | ||
length = lengthsVector.getInt(rowId + i); | ||
sunchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
buffer = in.slice(length); | ||
} catch (EOFException e) { | ||
throw new ParquetDecodingException("Failed to read " + length + " bytes"); | ||
} | ||
outputWriter.write(c, rowId + i, buffer, length); | ||
} | ||
currentRow += total; | ||
} | ||
|
||
public ByteBuffer getBytes(int rowId) { | ||
int length = lengthsVector.getInt(rowId); | ||
try { | ||
return in.slice(length); | ||
} catch (EOFException e) { | ||
throw new ParquetDecodingException("Failed to read " + length + " bytes"); | ||
} | ||
} | ||
|
||
@Override | ||
public void skipBinary(int total) { | ||
for (int i = 0; i < total; i++) { | ||
int remaining = lengthsVector.getInt(currentRow + i); | ||
while (remaining > 0) { | ||
remaining -= in.skip(remaining); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did I miss anything? Do we really need |
||
} | ||
} | ||
currentRow += total; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package org.apache.spark.sql.execution.datasources.parquet; | ||
|
||
import java.nio.ByteBuffer; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: new line after the import. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
|
||
import org.apache.spark.sql.execution.vectorized.WritableColumnVector; | ||
|
||
import org.apache.parquet.io.api.Binary; | ||
|
@@ -86,4 +88,18 @@ interface IntegerOutputWriter { | |
void write(WritableColumnVector outputColumnVector, int rowId, long val); | ||
} | ||
|
||
@FunctionalInterface | ||
interface ByteBufferOutputWriter { | ||
void write(WritableColumnVector c, int rowId, ByteBuffer val, int length); | ||
|
||
static void writeArrayByteBuffer(WritableColumnVector c, int rowId, ByteBuffer val, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it a good practice to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if it is frowned upon. In this case, not including in the interface only leads to some code bloat. |
||
int length) { | ||
c.putByteArray(rowId, | ||
val.array(), | ||
val.arrayOffset() + val.position(), | ||
length); | ||
} | ||
|
||
static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { } | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.