Skip to content

Commit 95f272c

Browse files
committed
improve 2
1 parent 6d15a86 commit 95f272c

File tree

4 files changed

+51
-22
lines changed

4 files changed

+51
-22
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,18 @@ public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
3939
private final VectorizedDeltaBinaryPackedReader prefixLengthReader;
4040
private final VectorizedDeltaLengthByteArrayReader suffixReader;
4141
private WritableColumnVector prefixLengthVector;
42-
private ByteBuffer previous = null;
42+
private ByteBuffer previous;
4343
private int currentRow = 0;
4444

4545
// temporary variable used by getBinary
4646
private final WritableColumnVector binaryValVector;
47+
private final WritableColumnVector tempBinaryValVector;
4748

4849
VectorizedDeltaByteArrayReader() {
4950
this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader();
5051
this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
5152
binaryValVector = new OnHeapColumnVector(1, BinaryType);
53+
tempBinaryValVector = new OnHeapColumnVector(1, BinaryType);
5254
}
5355

5456
@Override
@@ -62,12 +64,11 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
6264

6365
@Override
6466
public Binary readBinary(int len) {
65-
readValues(1, binaryValVector, 0, ByteBufferOutputWriter::writeArrayByteBuffer);
67+
readValues(1, binaryValVector, 0);
6668
return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
6769
}
6870

69-
private void readValues(int total, WritableColumnVector c, int rowId,
70-
ByteBufferOutputWriter outputWriter) {
71+
private void readValues(int total, WritableColumnVector c, int rowId) {
7172
for (int i = 0; i < total; i++) {
7273
// NOTE: due to PARQUET-246, it is important that we
7374
// respect prefixLength which was read from prefixLengthReader,
@@ -81,29 +82,21 @@ private void readValues(int total, WritableColumnVector c, int rowId,
8182
int length = prefixLength + suffixLength;
8283

8384
// We have to do this to materialize the output
85+
WritableColumnVector arrayData = c.arrayData();
86+
int offset = arrayData.getElementsAppended();
8487
if (prefixLength != 0) {
85-
// We could do
86-
// c.putByteArray(rowId + i, previous, 0, prefixLength);
87-
// c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
88-
// previous = c.getBinary(rowId+1);
89-
// but it incurs the same cost of copying the values twice _and_ c.getBinary
90-
// is a _slow_ byte by byte copy
91-
// The following always uses the faster system arraycopy method
92-
byte[] out = new byte[length];
93-
System.arraycopy(previous.array(), previous.position(), out, 0, prefixLength);
94-
System.arraycopy(suffixArray, suffix.position(), out, prefixLength, suffixLength);
95-
previous = ByteBuffer.wrap(out);
96-
} else {
97-
previous = suffix;
88+
arrayData.appendBytes(prefixLength, previous.array(), previous.position());
9889
}
99-
outputWriter.write(c, rowId + i, previous, previous.limit() - previous.position());
90+
arrayData.appendBytes(suffixLength, suffixArray, suffix.position());
91+
c.putArray(rowId + i, offset, length);
92+
previous = arrayData.getBytesUnsafe(offset, length);
10093
currentRow++;
10194
}
10295
}
10396

10497
@Override
10598
public void readBinary(int total, WritableColumnVector c, int rowId) {
106-
readValues(total, c, rowId, ByteBufferOutputWriter::writeArrayByteBuffer);
99+
readValues(total, c, rowId);
107100
}
108101

109102
/**
@@ -121,9 +114,29 @@ public void setPreviousReader(ValuesReader reader) {
121114

122115
@Override
123116
public void skipBinary(int total) {
124-
// we have to read all the values so that we always have the correct 'previous'
125-
// we just don't write it to the output vector
126-
readValues(total, null, currentRow, ByteBufferOutputWriter::skipWrite);
117+
WritableColumnVector c1 = tempBinaryValVector;
118+
WritableColumnVector c2 = binaryValVector;
119+
120+
for (int i = 0; i < total; i++) {
121+
int prefixLength = prefixLengthVector.getInt(currentRow);
122+
ByteBuffer suffix = suffixReader.getBytes(currentRow);
123+
byte[] suffixArray = suffix.array();
124+
int suffixLength = suffix.limit() - suffix.position();
125+
int length = prefixLength + suffixLength;
126+
127+
WritableColumnVector arrayData = c1.arrayData();
128+
c1.reset();
129+
if (prefixLength != 0) {
130+
arrayData.appendBytes(prefixLength, previous.array(), previous.position());
131+
}
132+
arrayData.appendBytes(suffixLength, suffixArray, suffix.position());
133+
previous = arrayData.getBytesUnsafe(0, length);
134+
currentRow++;
135+
136+
WritableColumnVector tmp = c1;
137+
c1 = c2;
138+
c2 = tmp;
139+
}
127140
}
128141

129142
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,13 @@ protected UTF8String getBytesAsUTF8String(int rowId, int count) {
221221
return UTF8String.fromAddress(null, data + rowId, count);
222222
}
223223

224+
@Override
225+
public ByteBuffer getBytesUnsafe(int rowId, int count) {
226+
byte[] array = new byte[count];
227+
Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
228+
return ByteBuffer.wrap(array);
229+
}
230+
224231
//
225232
// APIs dealing with shorts
226233
//

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ protected UTF8String getBytesAsUTF8String(int rowId, int count) {
219219
return UTF8String.fromBytes(byteData, rowId, count);
220220
}
221221

222+
@Override
223+
public ByteBuffer getBytesUnsafe(int rowId, int count) {
224+
return ByteBuffer.wrap(byteData, rowId, count);
225+
}
226+
227+
222228
//
223229
// APIs dealing with Shorts
224230
//

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.math.BigDecimal;
2020
import java.math.BigInteger;
21+
import java.nio.ByteBuffer;
2122

2223
import com.google.common.annotations.VisibleForTesting;
2324

@@ -443,6 +444,8 @@ public byte[] getBinary(int rowId) {
443444
}
444445
}
445446

447+
public abstract ByteBuffer getBytesUnsafe(int rowId, int count);
448+
446449
/**
447450
* Append APIs. These APIs all behave similarly and will append data to the current vector. It
448451
* is not valid to mix the put and append APIs. The append APIs are slower and should only be

0 commit comments

Comments
 (0)