Skip to content

Commit 2c73794

Browse files
committed
[SPARK-37974][SQL] Vectorized implementation of DeltaByteArray reader
1 parent 7a8b41c commit 2c73794

12 files changed

+983
-714
lines changed

sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

sql/core/benchmarks/DataSourceReadBenchmark-jdk17-results.txt

Lines changed: 235 additions & 235 deletions
Large diffs are not rendered by default.

sql/core/benchmarks/DataSourceReadBenchmark-results.txt

Lines changed: 235 additions & 235 deletions
Large diffs are not rendered by default.

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Set;
3030

3131
import com.google.common.annotations.VisibleForTesting;
32+
import org.apache.parquet.VersionParser;
33+
import org.apache.parquet.VersionParser.ParsedVersion;
3234
import org.apache.parquet.column.page.PageReadStore;
3335
import scala.Option;
3436

@@ -69,6 +71,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
6971
protected MessageType fileSchema;
7072
protected MessageType requestedSchema;
7173
protected StructType sparkSchema;
74+
protected ParsedVersion writerVersion;
7275

7376
/**
7477
* The total number of rows this RecordReader will eventually read. The sum of the
@@ -93,6 +96,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
9396
HadoopInputFile.fromPath(file, configuration), options);
9497
this.reader = new ParquetRowGroupReaderImpl(fileReader);
9598
this.fileSchema = fileReader.getFileMetaData().getSchema();
99+
try {
100+
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy());
101+
} catch (Exception e) {
102+
// Swallow any exception, if we cannot parse the version we will revert to a sequential read
103+
// if the column is a delta byte array encoding (due to PARQUET-246).
104+
}
96105
Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData();
97106
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
98107
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,24 @@
2121
import java.time.ZoneId;
2222
import java.util.PrimitiveIterator;
2323

24+
import org.apache.parquet.CorruptDeltaByteArrays;
25+
import org.apache.parquet.VersionParser.ParsedVersion;
2426
import org.apache.parquet.bytes.ByteBufferInputStream;
2527
import org.apache.parquet.bytes.BytesInput;
2628
import org.apache.parquet.bytes.BytesUtils;
2729
import org.apache.parquet.column.ColumnDescriptor;
2830
import org.apache.parquet.column.Dictionary;
2931
import org.apache.parquet.column.Encoding;
3032
import org.apache.parquet.column.page.*;
33+
import org.apache.parquet.column.values.RequiresPreviousReader;
3134
import org.apache.parquet.column.values.ValuesReader;
3235
import org.apache.parquet.schema.LogicalTypeAnnotation;
3336
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
3437
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
3538
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
3639
import org.apache.parquet.schema.PrimitiveType;
3740

41+
import org.apache.spark.memory.MemoryMode;
3842
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
3943
import org.apache.spark.sql.types.Decimal;
4044

@@ -86,6 +90,8 @@ public class VectorizedColumnReader {
8690
private final ColumnDescriptor descriptor;
8791
private final LogicalTypeAnnotation logicalTypeAnnotation;
8892
private final String datetimeRebaseMode;
93+
private final ParsedVersion writerVersion;
94+
private final MemoryMode memoryMode;
8995

9096
public VectorizedColumnReader(
9197
ColumnDescriptor descriptor,
@@ -96,7 +102,9 @@ public VectorizedColumnReader(
96102
String datetimeRebaseMode,
97103
String datetimeRebaseTz,
98104
String int96RebaseMode,
99-
String int96RebaseTz) throws IOException {
105+
String int96RebaseTz,
106+
ParsedVersion writerVersion,
107+
MemoryMode memoryMode) throws IOException {
100108
this.descriptor = descriptor;
101109
this.pageReader = pageReader;
102110
this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes);
@@ -129,6 +137,8 @@ public VectorizedColumnReader(
129137
this.datetimeRebaseMode = datetimeRebaseMode;
130138
assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) ||
131139
"CORRECTED".equals(int96RebaseMode);
140+
this.writerVersion = writerVersion;
141+
this.memoryMode = memoryMode;
132142
}
133143

134144
private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
@@ -174,7 +184,7 @@ void readBatch(int total, WritableColumnVector column) throws IOException {
174184
readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
175185
}
176186
PrimitiveType.PrimitiveTypeName typeName =
177-
descriptor.getPrimitiveType().getPrimitiveTypeName();
187+
descriptor.getPrimitiveType().getPrimitiveTypeName();
178188
if (isCurrentPageDictionaryEncoded) {
179189
// Save starting offset in case we need to decode dictionary IDs.
180190
int startOffset = readState.offset;
@@ -259,6 +269,7 @@ private void initDataReader(
259269
int pageValueCount,
260270
Encoding dataEncoding,
261271
ByteBufferInputStream in) throws IOException {
272+
ValuesReader previousReader = this.dataColumn;
262273
if (dataEncoding.usesDictionary()) {
263274
this.dataColumn = null;
264275
if (dictionary == null) {
@@ -283,25 +294,30 @@ private void initDataReader(
283294
} catch (IOException e) {
284295
throw new IOException("could not read page in col " + descriptor, e);
285296
}
297+
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
298+
previousReader != null && previousReader instanceof RequiresPreviousReader) {
299+
// previous reader can only be set if reading sequentially
300+
((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
301+
}
286302
}
287303

288304
private ValuesReader getValuesReader(Encoding encoding) {
289305
switch (encoding) {
290306
case PLAIN:
291307
return new VectorizedPlainValuesReader();
292308
case DELTA_BYTE_ARRAY:
293-
return new VectorizedDeltaByteArrayReader();
309+
return new VectorizedDeltaByteArrayReader(memoryMode);
294310
case DELTA_BINARY_PACKED:
295311
return new VectorizedDeltaBinaryPackedReader();
296312
case RLE:
297313
PrimitiveType.PrimitiveTypeName typeName =
298-
this.descriptor.getPrimitiveType().getPrimitiveTypeName();
314+
this.descriptor.getPrimitiveType().getPrimitiveTypeName();
299315
// RLE encoding only supports boolean type `Values`, and `bitwidth` is always 1.
300316
if (typeName == BOOLEAN) {
301317
return new VectorizedRleValuesReader(1);
302318
} else {
303319
throw new UnsupportedOperationException(
304-
"RLE encoding is not supported for values of type: " + typeName);
320+
"RLE encoding is not supported for values of type: " + typeName);
305321
}
306322
default:
307323
throw new UnsupportedOperationException("Unsupported encoding: " + encoding);

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,18 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
9090
Preconditions.checkArgument(miniSize % 8 == 0,
9191
"miniBlockSize must be multiple of 8, but it's " + miniSize);
9292
this.miniBlockSizeInValues = (int) miniSize;
93+
// True value count. May be less than valueCount because of nulls
9394
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
9495
this.bitWidths = new int[miniBlockNumInABlock];
9596
this.unpackedValuesBuffer = new long[miniBlockSizeInValues];
9697
// read the first value
9798
firstValue = BytesUtils.readZigZagVarLong(in);
9899
}
99100

101+
int getTotalValueCount() {
102+
return totalValueCount;
103+
}
104+
100105
@Override
101106
public byte readByte() {
102107
readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v);

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,51 +16,133 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.parquet;
1818

19+
import static org.apache.spark.sql.types.DataTypes.BinaryType;
20+
import static org.apache.spark.sql.types.DataTypes.IntegerType;
21+
1922
import org.apache.parquet.bytes.ByteBufferInputStream;
20-
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
23+
import org.apache.parquet.column.values.RequiresPreviousReader;
24+
import org.apache.parquet.column.values.ValuesReader;
2125
import org.apache.parquet.io.api.Binary;
26+
import org.apache.spark.memory.MemoryMode;
27+
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
28+
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
2229
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
2330

2431
import java.io.IOException;
2532
import java.nio.ByteBuffer;
2633

2734
/**
28-
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface.
35+
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized
36+
* interface.
2937
*/
30-
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
31-
private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader();
38+
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
39+
implements VectorizedValuesReader, RequiresPreviousReader {
40+
41+
private final MemoryMode memoryMode;
42+
private int valueCount;
43+
private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
44+
new VectorizedDeltaBinaryPackedReader();
45+
private final VectorizedDeltaLengthByteArrayReader suffixReader;
46+
private WritableColumnVector prefixLengthVector;
47+
private WritableColumnVector suffixVector;
48+
private byte[] previous = new byte[0];
49+
private int currentRow = 0;
50+
51+
//temporary variable used by getBinary
52+
Binary binaryVal;
53+
54+
VectorizedDeltaByteArrayReader(MemoryMode memoryMode){
55+
this.memoryMode = memoryMode;
56+
this.suffixReader = new VectorizedDeltaLengthByteArrayReader(memoryMode);
57+
}
3258

3359
@Override
3460
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
35-
deltaByteArrayReader.initFromPage(valueCount, in);
61+
this.valueCount = valueCount;
62+
if (memoryMode == MemoryMode.OFF_HEAP) {
63+
prefixLengthVector = new OffHeapColumnVector(valueCount, IntegerType);
64+
suffixVector = new OffHeapColumnVector(valueCount, BinaryType);
65+
} else {
66+
prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
67+
suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
68+
}
69+
prefixLengthReader.initFromPage(valueCount, in);
70+
prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
71+
prefixLengthVector, 0);
72+
suffixReader.initFromPage(valueCount, in);
73+
suffixReader.readBinary(valueCount, suffixVector, 0);
3674
}
3775

3876
@Override
3977
public Binary readBinary(int len) {
40-
return deltaByteArrayReader.readBytes();
78+
readValues(1, null, 0,
79+
(w, r, v, l) ->
80+
binaryVal = Binary.fromConstantByteArray(v.array(), v.arrayOffset() + v.position(), l));
81+
return binaryVal;
4182
}
4283

43-
@Override
44-
public void readBinary(int total, WritableColumnVector c, int rowId) {
84+
public void readValues(int total, WritableColumnVector c, int rowId,
85+
ByteBufferOutputWriter outputWriter) {
86+
if (total == 0) {
87+
return;
88+
}
89+
4590
for (int i = 0; i < total; i++) {
46-
Binary binary = deltaByteArrayReader.readBytes();
47-
ByteBuffer buffer = binary.toByteBuffer();
48-
if (buffer.hasArray()) {
49-
c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(),
50-
binary.length());
91+
int prefixLength = prefixLengthVector.getInt(currentRow);
92+
byte[] suffix = suffixVector.getBinary(currentRow);
93+
// This does not copy bytes
94+
int length = prefixLength + suffix.length;
95+
96+
// NOTE: due to PARQUET-246, it is important that we
97+
// respect prefixLength which was read from prefixLengthReader,
98+
// even for the *first* value of a page. Even though the first
99+
// value of the page should have an empty prefix, it may not
100+
// because of PARQUET-246.
101+
102+
// We have to do this to materialize the output
103+
if (prefixLength != 0) {
104+
// We could do
105+
// c.putByteArray(rowId + i, previous, 0, prefixLength);
106+
// c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
107+
// previous = c.getBinary(rowId+1);
108+
// but it incurs the same cost of copying the values twice _and_ c.getBinary
109+
// is a _slow_ byte by byte copy
110+
// The following always uses the faster system arraycopy method
111+
byte[] out = new byte[length];
112+
System.arraycopy(previous, 0, out, 0, prefixLength);
113+
System.arraycopy(suffix, 0, out, prefixLength, suffix.length);
114+
previous = out;
51115
} else {
52-
byte[] bytes = new byte[binary.length()];
53-
buffer.get(bytes);
54-
c.putByteArray(rowId + i, bytes);
116+
previous = suffix;
55117
}
118+
outputWriter.write(c, rowId + i, ByteBuffer.wrap(previous), previous.length);
119+
currentRow++;
56120
}
57121
}
58122

59123
@Override
60-
public void skipBinary(int total) {
61-
for (int i = 0; i < total; i++) {
62-
deltaByteArrayReader.skip();
124+
public void readBinary(int total, WritableColumnVector c, int rowId) {
125+
readValues(total, c, rowId, ByteBufferOutputWriter::writeArrayByteBuffer);
126+
}
127+
128+
/**
129+
* There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not clear the
130+
* previous value state that it tracks internally. This resulted in the first value of all pages
131+
* (except for the first page) to be a delta from the last value of the previous page. In order to
132+
* read corrupted files written with this bug, when reading a new page we need to recover the
133+
* previous page's last value to use it (if needed) to read the first value.
134+
*/
135+
public void setPreviousReader(ValuesReader reader) {
136+
if (reader != null) {
137+
this.previous = ((VectorizedDeltaByteArrayReader) reader).previous;
63138
}
64139
}
65140

141+
@Override
142+
public void skipBinary(int total) {
143+
// we have to read all the values so that we always have the correct 'previous'
144+
// we just don't write it to the output vector
145+
readValues(total, null, currentRow, ByteBufferOutputWriter::skipWrite);
146+
}
147+
66148
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
5656
lengthsVector = new OnHeapColumnVector(valueCount, IntegerType);
5757
}
5858
lengthReader.initFromPage(valueCount, in);
59-
lengthReader.readIntegers(valueCount, lengthsVector, 0);
59+
lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0);
6060
this.in = in.remainingStream();
6161
}
6262

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,10 @@ private void checkEndOfRowGroup() throws IOException {
367367
datetimeRebaseMode,
368368
datetimeRebaseTz,
369369
int96RebaseMode,
370-
int96RebaseTz);
370+
int96RebaseTz,
371+
writerVersion,
372+
MEMORY_MODE
373+
);
371374
}
372375
totalCountLoadedSoFar += pages.getRowCount();
373376
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ static void copyWriteByteBuffer(WritableColumnVector c, int rowId, ByteBuffer va
105105
c.putByteArray(rowId, bytes);
106106
}
107107

108+
static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { }
109+
108110
}
109111

110112
}

0 commit comments

Comments
 (0)