Skip to content

Commit b297029

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-20960][SQL] make ColumnVector public
## What changes were proposed in this pull request? move `ColumnVector` and related classes to `org.apache.spark.sql.vectorized`, and improve the document. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #20116 from cloud-fan/column-vector.
1 parent 9a2b65a commit b297029

File tree

20 files changed

+63
-125
lines changed

20 files changed

+63
-125
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import org.apache.spark.memory.MemoryMode;
3232
import org.apache.spark.sql.catalyst.InternalRow;
3333
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
34-
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
3534
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
3635
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
3736
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
37+
import org.apache.spark.sql.vectorized.ColumnarBatch;
3838
import org.apache.spark.sql.types.StructField;
3939
import org.apache.spark.sql.types.StructType;
4040

@@ -248,7 +248,10 @@ public void enableReturningBatches() {
248248
* Advances to the next batch of rows. Returns false if there are no more.
249249
*/
250250
public boolean nextBatch() throws IOException {
251-
columnarBatch.reset();
251+
for (WritableColumnVector vector : columnVectors) {
252+
vector.reset();
253+
}
254+
columnarBatch.setNumRows(0);
252255
if (rowsReturned >= totalRowCount) return false;
253256
checkEndOfRowGroup();
254257

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.spark.sql.catalyst.InternalRow;
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
3030
import org.apache.spark.sql.types.*;
31+
import org.apache.spark.sql.vectorized.ColumnarArray;
32+
import org.apache.spark.sql.vectorized.ColumnarBatch;
3133
import org.apache.spark.unsafe.types.CalendarInterval;
3234
import org.apache.spark.unsafe.types.UTF8String;
3335

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
2424
import org.apache.spark.sql.catalyst.util.MapData;
2525
import org.apache.spark.sql.types.*;
26+
import org.apache.spark.sql.vectorized.ColumnarArray;
27+
import org.apache.spark.sql.vectorized.ColumnarBatch;
28+
import org.apache.spark.sql.vectorized.ColumnarRow;
29+
import org.apache.spark.sql.vectorized.ColumnVector;
2630
import org.apache.spark.unsafe.types.CalendarInterval;
2731
import org.apache.spark.unsafe.types.UTF8String;
2832

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.spark.sql.internal.SQLConf;
2525
import org.apache.spark.sql.types.*;
26+
import org.apache.spark.sql.vectorized.ColumnVector;
2627
import org.apache.spark.unsafe.array.ByteArrayMethods;
2728
import org.apache.spark.unsafe.types.UTF8String;
2829

@@ -585,11 +586,11 @@ public final int appendArray(int length) {
585586
public final int appendStruct(boolean isNull) {
586587
if (isNull) {
587588
appendNull();
588-
for (ColumnVector c: childColumns) {
589+
for (WritableColumnVector c: childColumns) {
589590
if (c.type instanceof StructType) {
590-
((WritableColumnVector) c).appendStruct(true);
591+
c.appendStruct(true);
591592
} else {
592-
((WritableColumnVector) c).appendNull();
593+
c.appendNull();
593594
}
594595
}
595596
} else {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java renamed to sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java

Lines changed: 2 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.vectorized;
18+
package org.apache.spark.sql.vectorized;
1919

2020
import org.apache.arrow.vector.*;
2121
import org.apache.arrow.vector.complex.*;
@@ -34,11 +34,7 @@ public final class ArrowColumnVector extends ColumnVector {
3434
private ArrowColumnVector[] childColumns;
3535

3636
private void ensureAccessible(int index) {
37-
int valueCount = accessor.getValueCount();
38-
if (index < 0 || index >= valueCount) {
39-
throw new IndexOutOfBoundsException(
40-
String.format("index: %d, valueCount: %d", index, valueCount));
41-
}
37+
ensureAccessible(index, 1);
4238
}
4339

4440
private void ensureAccessible(int index, int count) {
@@ -64,20 +60,12 @@ public void close() {
6460
accessor.close();
6561
}
6662

67-
//
68-
// APIs dealing with nulls
69-
//
70-
7163
@Override
7264
public boolean isNullAt(int rowId) {
7365
ensureAccessible(rowId);
7466
return accessor.isNullAt(rowId);
7567
}
7668

77-
//
78-
// APIs dealing with Booleans
79-
//
80-
8169
@Override
8270
public boolean getBoolean(int rowId) {
8371
ensureAccessible(rowId);
@@ -94,10 +82,6 @@ public boolean[] getBooleans(int rowId, int count) {
9482
return array;
9583
}
9684

97-
//
98-
// APIs dealing with Bytes
99-
//
100-
10185
@Override
10286
public byte getByte(int rowId) {
10387
ensureAccessible(rowId);
@@ -114,10 +98,6 @@ public byte[] getBytes(int rowId, int count) {
11498
return array;
11599
}
116100

117-
//
118-
// APIs dealing with Shorts
119-
//
120-
121101
@Override
122102
public short getShort(int rowId) {
123103
ensureAccessible(rowId);
@@ -134,10 +114,6 @@ public short[] getShorts(int rowId, int count) {
134114
return array;
135115
}
136116

137-
//
138-
// APIs dealing with Ints
139-
//
140-
141117
@Override
142118
public int getInt(int rowId) {
143119
ensureAccessible(rowId);
@@ -154,10 +130,6 @@ public int[] getInts(int rowId, int count) {
154130
return array;
155131
}
156132

157-
//
158-
// APIs dealing with Longs
159-
//
160-
161133
@Override
162134
public long getLong(int rowId) {
163135
ensureAccessible(rowId);
@@ -174,10 +146,6 @@ public long[] getLongs(int rowId, int count) {
174146
return array;
175147
}
176148

177-
//
178-
// APIs dealing with floats
179-
//
180-
181149
@Override
182150
public float getFloat(int rowId) {
183151
ensureAccessible(rowId);
@@ -194,10 +162,6 @@ public float[] getFloats(int rowId, int count) {
194162
return array;
195163
}
196164

197-
//
198-
// APIs dealing with doubles
199-
//
200-
201165
@Override
202166
public double getDouble(int rowId) {
203167
ensureAccessible(rowId);
@@ -214,10 +178,6 @@ public double[] getDoubles(int rowId, int count) {
214178
return array;
215179
}
216180

217-
//
218-
// APIs dealing with Arrays
219-
//
220-
221181
@Override
222182
public int getArrayLength(int rowId) {
223183
ensureAccessible(rowId);
@@ -230,45 +190,27 @@ public int getArrayOffset(int rowId) {
230190
return accessor.getArrayOffset(rowId);
231191
}
232192

233-
//
234-
// APIs dealing with Decimals
235-
//
236-
237193
@Override
238194
public Decimal getDecimal(int rowId, int precision, int scale) {
239195
ensureAccessible(rowId);
240196
return accessor.getDecimal(rowId, precision, scale);
241197
}
242198

243-
//
244-
// APIs dealing with UTF8Strings
245-
//
246-
247199
@Override
248200
public UTF8String getUTF8String(int rowId) {
249201
ensureAccessible(rowId);
250202
return accessor.getUTF8String(rowId);
251203
}
252204

253-
//
254-
// APIs dealing with Binaries
255-
//
256-
257205
@Override
258206
public byte[] getBinary(int rowId) {
259207
ensureAccessible(rowId);
260208
return accessor.getBinary(rowId);
261209
}
262210

263-
/**
264-
* Returns the data for the underlying array.
265-
*/
266211
@Override
267212
public ArrowColumnVector arrayData() { return childColumns[0]; }
268213

269-
/**
270-
* Returns the ordinal's child data column.
271-
*/
272214
@Override
273215
public ArrowColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }
274216

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java renamed to sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,39 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql.execution.vectorized;
17+
package org.apache.spark.sql.vectorized;
1818

1919
import org.apache.spark.sql.catalyst.util.MapData;
2020
import org.apache.spark.sql.types.DataType;
2121
import org.apache.spark.sql.types.Decimal;
2222
import org.apache.spark.unsafe.types.UTF8String;
2323

2424
/**
25-
* This class represents in-memory values of a column and provides the main APIs to access the data.
26-
* It supports all the types and contains get APIs as well as their batched versions. The batched
27-
* versions are considered to be faster and preferable whenever possible.
25+
* An interface representing in-memory columnar data in Spark. This interface defines the main APIs
26+
* to access the data, as well as their batched versions. The batched versions are considered to be
27+
* faster and preferable whenever possible.
2828
*
29-
* To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
30-
* columns have child columns. All of the data are stored in the child columns and the parent column
31-
* only contains nullability. In the case of Arrays, the lengths and offsets are saved in the child
32-
* column and are encoded identically to INTs.
29+
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
30+
* in this ColumnVector.
3331
*
34-
* Maps are just a special case of a two field struct.
32+
* ColumnVector supports all the data types including nested types. To handle nested types,
33+
* ColumnVector can have children and is a tree structure. For struct type, it stores the actual
34+
* data of each field in the corresponding child ColumnVector, and only stores null information in
35+
* the parent ColumnVector. For array type, it stores the actual array elements in the child
36+
* ColumnVector, and stores null information, array offsets and lengths in the parent ColumnVector.
3537
*
36-
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
37-
* in the current batch.
38+
* ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
39+
* memory again and again.
40+
*
41+
* ColumnVector is meant to maximize CPU efficiency but not to minimize storage footprint.
42+
* Implementations should prefer computing efficiency over storage efficiency when design the
43+
* format. Since it is expected to reuse the ColumnVector instance while loading data, the storage
44+
* footprint is negligible.
3845
*/
3946
public abstract class ColumnVector implements AutoCloseable {
4047

4148
/**
42-
* Returns the data type of this column.
49+
* Returns the data type of this column vector.
4350
*/
4451
public final DataType dataType() { return type; }
4552

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java renamed to sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql.execution.vectorized;
17+
package org.apache.spark.sql.vectorized;
1818

1919
import org.apache.spark.sql.catalyst.util.ArrayData;
2020
import org.apache.spark.sql.catalyst.util.MapData;
@@ -23,8 +23,7 @@
2323
import org.apache.spark.unsafe.types.UTF8String;
2424

2525
/**
26-
* Array abstraction in {@link ColumnVector}. The instance of this class is intended
27-
* to be reused, callers should copy the data out if it needs to be stored.
26+
* Array abstraction in {@link ColumnVector}.
2827
*/
2928
public final class ColumnarArray extends ArrayData {
3029
// The data for this array. This array contains elements from
@@ -33,7 +32,7 @@ public final class ColumnarArray extends ArrayData {
3332
private final int offset;
3433
private final int length;
3534

36-
ColumnarArray(ColumnVector data, int offset, int length) {
35+
public ColumnarArray(ColumnVector data, int offset, int length) {
3736
this.data = data;
3837
this.offset = offset;
3938
this.length = length;

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java renamed to sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,18 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql.execution.vectorized;
17+
package org.apache.spark.sql.vectorized;
1818

1919
import java.util.*;
2020

2121
import org.apache.spark.sql.catalyst.InternalRow;
22+
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
2223
import org.apache.spark.sql.types.StructType;
2324

2425
/**
25-
* This class is the in memory representation of rows as they are streamed through operators. It
26-
* is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
27-
* each operator allocates one of these objects, the storage footprint on the task is negligible.
28-
*
29-
* The layout is a columnar with values encoded in their native format. Each RowBatch contains
30-
* a horizontal partitioning of the data, split into columns.
31-
*
32-
* The ColumnarBatch supports either on heap or offheap modes with (mostly) the identical API.
33-
*
34-
* TODO:
35-
* - There are many TODOs for the existing APIs. They should throw a not implemented exception.
36-
* - Compaction: The batch and columns should be able to compact based on a selection vector.
26+
* This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this
27+
* batch so that Spark can access the data row by row. Instance of it is meant to be reused during
28+
* the entire data loading process.
3729
*/
3830
public final class ColumnarBatch {
3931
public static final int DEFAULT_BATCH_SIZE = 4 * 1024;
@@ -57,7 +49,7 @@ public void close() {
5749
}
5850

5951
/**
60-
* Returns an iterator over the rows in this batch. This skips rows that are filtered out.
52+
* Returns an iterator over the rows in this batch.
6153
*/
6254
public Iterator<InternalRow> rowIterator() {
6355
final int maxRows = numRows;
@@ -87,19 +79,7 @@ public void remove() {
8779
}
8880

8981
/**
90-
* Resets the batch for writing.
91-
*/
92-
public void reset() {
93-
for (int i = 0; i < numCols(); ++i) {
94-
if (columns[i] instanceof WritableColumnVector) {
95-
((WritableColumnVector) columns[i]).reset();
96-
}
97-
}
98-
this.numRows = 0;
99-
}
100-
101-
/**
102-
* Sets the number of rows that are valid.
82+
* Sets the number of rows in this batch.
10383
*/
10484
public void setNumRows(int numRows) {
10585
assert(numRows <= this.capacity);

0 commit comments

Comments
 (0)