Skip to content

[SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support #35262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7a8b41c
[SPARK-37974][SQL] Vectorized implementation of DeltaLengthByteArray …
parthchandra Jan 13, 2022
2c73794
[SPARK-37974][SQL] Vectorized implementation of DeltaByteArray reader
parthchandra Jan 14, 2022
52df517
Addressing review comments
parthchandra Jan 22, 2022
0011bab
More review comments addressed
parthchandra Jan 24, 2022
50ed815
One more review comment
parthchandra Jan 25, 2022
3dc340a
Updated JDK 8 benchmark
parthchandra Jan 26, 2022
ca20068
Fix for off heap memory not being initialized. Added off heap mode to…
parthchandra Jan 28, 2022
6ad1dbe
Remove use of OffHeap vectors for internal buffers. Skip writing to o…
parthchandra Jan 31, 2022
6f364d9
more review comments addressed
parthchandra Feb 2, 2022
80a4ceb
Still more review comments addressed
parthchandra Feb 9, 2022
406d176
Remove unnecessary check for 'total' parameter in 'readValues'
parthchandra Feb 11, 2022
be62ad6
Remove check for zero length in DeltaLengthByteArrayReader, and add u…
parthchandra Feb 14, 2022
166afe1
Update benchmark
parthchandra Feb 16, 2022
0583e2f
Evaluate suffix array lazily in VectorizedDeltaLengthByteArrayReader
sunchao Mar 5, 2022
31eee9f
In DeltaLengthByteArrayReader avoid extra copy if memory mode is on_h…
sunchao Mar 5, 2022
9eaf387
Avoid unnecessary check for parameter in skipBytes
parthchandra Mar 7, 2022
1fc0060
Update benchmark results
parthchandra Mar 7, 2022
d95100a
More review comments
parthchandra Mar 15, 2022
6d273f0
More review comments addressed
parthchandra Mar 16, 2022
1d15022
Cleaner naming for WritableColumnVector.getBytesUnsafe
parthchandra Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 212 additions & 212 deletions sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt

Large diffs are not rendered by default.

470 changes: 235 additions & 235 deletions sql/core/benchmarks/DataSourceReadBenchmark-jdk17-results.txt

Large diffs are not rendered by default.

424 changes: 212 additions & 212 deletions sql/core/benchmarks/DataSourceReadBenchmark-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.apache.parquet.VersionParser;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.column.page.PageReadStore;
import scala.Option;

Expand Down Expand Up @@ -69,6 +71,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
protected MessageType fileSchema;
protected MessageType requestedSchema;
protected StructType sparkSchema;
// Keep track of the version of the parquet writer. An older version wrote
// corrupt delta byte arrays, and the version check is needed to detect that.
protected ParsedVersion writerVersion;

/**
* The total number of rows this RecordReader will eventually read. The sum of the
Expand All @@ -93,6 +98,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
HadoopInputFile.fromPath(file, configuration), options);
this.reader = new ParquetRowGroupReaderImpl(fileReader);
this.fileSchema = fileReader.getFileMetaData().getSchema();
try {
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy());
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will other types of exceptions be thrown here, except VersionParseException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes. I encountered at least one case where the version information was empty and the version check threw a NPE.

// Swallow any exception, if we cannot parse the version we will revert to a sequential read
// if the column is a delta byte array encoding (due to PARQUET-246).
}
Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData();
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import java.time.ZoneId;
import java.util.PrimitiveIterator;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
Expand Down Expand Up @@ -86,6 +89,7 @@ public class VectorizedColumnReader {
private final ColumnDescriptor descriptor;
private final LogicalTypeAnnotation logicalTypeAnnotation;
private final String datetimeRebaseMode;
private final ParsedVersion writerVersion;

public VectorizedColumnReader(
ColumnDescriptor descriptor,
Expand All @@ -96,7 +100,8 @@ public VectorizedColumnReader(
String datetimeRebaseMode,
String datetimeRebaseTz,
String int96RebaseMode,
String int96RebaseTz) throws IOException {
String int96RebaseTz,
ParsedVersion writerVersion) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes);
Expand Down Expand Up @@ -129,6 +134,7 @@ public VectorizedColumnReader(
this.datetimeRebaseMode = datetimeRebaseMode;
assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) ||
"CORRECTED".equals(int96RebaseMode);
this.writerVersion = writerVersion;
}

private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
Expand Down Expand Up @@ -259,6 +265,7 @@ private void initDataReader(
int pageValueCount,
Encoding dataEncoding,
ByteBufferInputStream in) throws IOException {
ValuesReader previousReader = this.dataColumn;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
if (dictionary == null) {
Expand All @@ -283,6 +290,12 @@ private void initDataReader(
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
// for PARQUET-246 (See VectorizedDeltaByteArrayReader.setPreviousValues)
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? Can you add a comment on why we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment. Detailed explanation is in the comment in VectorizedDeltaByteArrayReader.setPreviousValue

previousReader instanceof RequiresPreviousReader) {
// previousReader can only be set if reading sequentially
((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
}
}

private ValuesReader getValuesReader(Encoding encoding) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
Preconditions.checkArgument(miniSize % 8 == 0,
"miniBlockSize must be multiple of 8, but it's " + miniSize);
this.miniBlockSizeInValues = (int) miniSize;
// True value count. May be less than valueCount because of nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more useful to annotate the method getTotalValueCount instead of here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment to getTotalValueCount as well.

this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
this.bitWidths = new int[miniBlockNumInABlock];
this.unpackedValuesBuffer = new long[miniBlockSizeInValues];
// read the first value
firstValue = BytesUtils.readZigZagVarLong(in);
}

// True value count. May be less than valueCount because of nulls
int getTotalValueCount() {
return totalValueCount;
}

@Override
public byte readByte() {
readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,127 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;

import static org.apache.spark.sql.types.DataTypes.BinaryType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a clear import order definition for static import ? @sunchao @dongjoon-hyun

import static org.apache.spark.sql.types.DataTypes.IntegerType;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface.
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized
* interface.
*/
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader();
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
implements VectorizedValuesReader, RequiresPreviousReader {

private final VectorizedDeltaBinaryPackedReader prefixLengthReader;
private final VectorizedDeltaLengthByteArrayReader suffixReader;
private WritableColumnVector prefixLengthVector;
private ByteBuffer previous;
private int currentRow = 0;

// Temporary variable used by readBinary
private final WritableColumnVector binaryValVector;
// Temporary variable used by skipBinary
private final WritableColumnVector tempBinaryValVector;

VectorizedDeltaByteArrayReader() {
this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader();
this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
binaryValVector = new OnHeapColumnVector(1, BinaryType);
tempBinaryValVector = new OnHeapColumnVector(1, BinaryType);
}

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
deltaByteArrayReader.initFromPage(valueCount, in);
prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
prefixLengthReader.initFromPage(valueCount, in);
prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
prefixLengthVector, 0);
suffixReader.initFromPage(valueCount, in);
}

@Override
public Binary readBinary(int len) {
return deltaByteArrayReader.readBytes();
readValues(1, binaryValVector, 0);
return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
private void readValues(int total, WritableColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
Binary binary = deltaByteArrayReader.readBytes();
ByteBuffer buffer = binary.toByteBuffer();
if (buffer.hasArray()) {
c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(),
binary.length());
} else {
byte[] bytes = new byte[binary.length()];
buffer.get(bytes);
c.putByteArray(rowId + i, bytes);
// NOTE: due to PARQUET-246, it is important that we
// respect prefixLength which was read from prefixLengthReader,
// even for the *first* value of a page. Even though the first
// value of the page should have an empty prefix, it may not
// because of PARQUET-246.
int prefixLength = prefixLengthVector.getInt(currentRow);
ByteBuffer suffix = suffixReader.getBytes(currentRow);
byte[] suffixArray = suffix.array();
int suffixLength = suffix.limit() - suffix.position();
int length = prefixLength + suffixLength;

// We have to do this to materialize the output
WritableColumnVector arrayData = c.arrayData();
int offset = arrayData.getElementsAppended();
if (prefixLength != 0) {
arrayData.appendBytes(prefixLength, previous.array(), previous.position());
}
arrayData.appendBytes(suffixLength, suffixArray, suffix.position());
c.putArray(rowId + i, offset, length);
previous = arrayData.getByteBuffer(offset, length);
currentRow++;
}
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
readValues(total, c, rowId);
}

/**
* There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not clear the
* previous value state that it tracks internally. This resulted in the first value of all pages
* (except for the first page) to be a delta from the last value of the previous page. In order to
* read corrupted files written with this bug, when reading a new page we need to recover the
* previous page's last value to use it (if needed) to read the first value.
*/
public void setPreviousReader(ValuesReader reader) {
if (reader != null) {
this.previous = ((VectorizedDeltaByteArrayReader) reader).previous;
}
}

@Override
public void skipBinary(int total) {
WritableColumnVector c1 = tempBinaryValVector;
WritableColumnVector c2 = binaryValVector;

for (int i = 0; i < total; i++) {
deltaByteArrayReader.skip();
int prefixLength = prefixLengthVector.getInt(currentRow);
ByteBuffer suffix = suffixReader.getBytes(currentRow);
byte[] suffixArray = suffix.array();
int suffixLength = suffix.limit() - suffix.position();
int length = prefixLength + suffixLength;

WritableColumnVector arrayData = c1.arrayData();
c1.reset();
if (prefixLength != 0) {
arrayData.appendBytes(prefixLength, previous.array(), previous.position());
}
arrayData.appendBytes(suffixLength, suffixArray, suffix.position());
previous = arrayData.getByteBuffer(0, length);
currentRow++;

WritableColumnVector tmp = c1;
c1 = c2;
c2 = tmp;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;

import static org.apache.spark.sql.types.DataTypes.IntegerType;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

/**
* An implementation of the Parquet DELTA_LENGTH_BYTE_ARRAY decoder that supports the vectorized
* interface.
*/
public class VectorizedDeltaLengthByteArrayReader extends VectorizedReaderBase implements
VectorizedValuesReader {

private final VectorizedDeltaBinaryPackedReader lengthReader;
private ByteBufferInputStream in;
private WritableColumnVector lengthsVector;
private int currentRow = 0;

VectorizedDeltaLengthByteArrayReader() {
lengthReader = new VectorizedDeltaBinaryPackedReader();
}

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
lengthsVector = new OnHeapColumnVector(valueCount, IntegerType);
lengthReader.initFromPage(valueCount, in);
lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0);
this.in = in.remainingStream();
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
ByteBuffer buffer;
ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
int length;
for (int i = 0; i < total; i++) {
length = lengthsVector.getInt(rowId + i);
try {
buffer = in.slice(length);
} catch (EOFException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes");
}
outputWriter.write(c, rowId + i, buffer, length);
}
currentRow += total;
}

public ByteBuffer getBytes(int rowId) {
int length = lengthsVector.getInt(rowId);
try {
return in.slice(length);
} catch (EOFException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes");
}
}

@Override
public void skipBinary(int total) {
for (int i = 0; i < total; i++) {
int remaining = lengthsVector.getInt(currentRow + i);
while (remaining > 0) {
remaining -= in.skip(remaining);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss anything? Do we really need length here?

}
}
currentRow += total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ private void checkEndOfRowGroup() throws IOException {
datetimeRebaseMode,
datetimeRebaseTz,
int96RebaseMode,
int96RebaseTz);
int96RebaseTz,
writerVersion);
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.parquet;

import java.nio.ByteBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line after the import.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -86,4 +88,18 @@ interface IntegerOutputWriter {
void write(WritableColumnVector outputColumnVector, int rowId, long val);
}

@FunctionalInterface
interface ByteBufferOutputWriter {
void write(WritableColumnVector c, int rowId, ByteBuffer val, int length);

static void writeArrayByteBuffer(WritableColumnVector c, int rowId, ByteBuffer val,
Copy link
Contributor

@LuciferYang LuciferYang Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good practice to add static methods to interface? I'm not sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it is frowned upon. In this case, not including in the interface only leads to some code bloat.

int length) {
c.putByteArray(rowId,
val.array(),
val.arrayOffset() + val.position(),
length);
}

static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { }
}
}
Loading