Skip to content

Commit 85cc59b

Browse files
committed
Refactored ColumnAccessors & ColumnBuilders to remove duplicate code
Primitive setters/getters for (Mutable)Rows are moved to ColumnTypes.
1 parent ada310a commit 85cc59b

File tree

4 files changed

+128
-155
lines changed

4 files changed

+128
-155
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala

Lines changed: 20 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.nio.{ByteOrder, ByteBuffer}
2121

2222
import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
2323
import org.apache.spark.sql.catalyst.expressions.MutableRow
24-
import org.apache.spark.sql.execution.SparkSqlSerializer
2524

2625
/**
2726
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
@@ -41,116 +40,61 @@ private[sql] trait ColumnAccessor {
4140
protected def underlyingBuffer: ByteBuffer
4241
}
4342

44-
private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
43+
private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
44+
buffer: ByteBuffer, columnType: ColumnType[T, JvmType])
4545
extends ColumnAccessor {
4646

4747
protected def initialize() {}
4848

49-
def columnType: ColumnType[T, JvmType]
50-
5149
def hasNext = buffer.hasRemaining
5250

5351
def extractTo(row: MutableRow, ordinal: Int) {
54-
doExtractTo(row, ordinal)
52+
columnType.setField(row, ordinal, columnType.extract(buffer))
5553
}
5654

57-
protected def doExtractTo(row: MutableRow, ordinal: Int)
58-
5955
protected def underlyingBuffer = buffer
6056
}
6157

6258
private[sql] abstract class NativeColumnAccessor[T <: NativeType](
6359
buffer: ByteBuffer,
6460
val columnType: NativeColumnType[T])
65-
extends BasicColumnAccessor[T, T#JvmType](buffer)
66-
with NullableColumnAccessor
67-
68-
private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
69-
extends NativeColumnAccessor(buffer, BOOLEAN) {
61+
extends BasicColumnAccessor[T, T#JvmType](buffer, columnType)
62+
with NullableColumnAccessor {
7063

71-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
72-
row.setBoolean(ordinal, columnType.extract(buffer))
73-
}
64+
type JvmType = T#JvmType
7465
}
7566

76-
private[sql] class IntColumnAccessor(buffer: ByteBuffer)
77-
extends NativeColumnAccessor(buffer, INT) {
67+
private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
68+
extends NativeColumnAccessor(buffer, BOOLEAN)
7869

79-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
80-
row.setInt(ordinal, columnType.extract(buffer))
81-
}
82-
}
70+
private[sql] class IntColumnAccessor(buffer: ByteBuffer)
71+
extends NativeColumnAccessor(buffer, INT)
8372

8473
private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
85-
extends NativeColumnAccessor(buffer, SHORT) {
86-
87-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
88-
row.setShort(ordinal, columnType.extract(buffer))
89-
}
90-
}
74+
extends NativeColumnAccessor(buffer, SHORT)
9175

9276
private[sql] class LongColumnAccessor(buffer: ByteBuffer)
93-
extends NativeColumnAccessor(buffer, LONG) {
94-
95-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
96-
row.setLong(ordinal, columnType.extract(buffer))
97-
}
98-
}
77+
extends NativeColumnAccessor(buffer, LONG)
9978

10079
private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
101-
extends NativeColumnAccessor(buffer, BYTE) {
102-
103-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
104-
row.setByte(ordinal, columnType.extract(buffer))
105-
}
106-
}
80+
extends NativeColumnAccessor(buffer, BYTE)
10781

10882
private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
109-
extends NativeColumnAccessor(buffer, DOUBLE) {
110-
111-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
112-
row.setDouble(ordinal, columnType.extract(buffer))
113-
}
114-
}
83+
extends NativeColumnAccessor(buffer, DOUBLE)
11584

11685
private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
117-
extends NativeColumnAccessor(buffer, FLOAT) {
118-
119-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
120-
row.setFloat(ordinal, columnType.extract(buffer))
121-
}
122-
}
86+
extends NativeColumnAccessor(buffer, FLOAT)
12387

12488
private[sql] class StringColumnAccessor(buffer: ByteBuffer)
125-
extends NativeColumnAccessor(buffer, STRING) {
126-
127-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
128-
row.setString(ordinal, columnType.extract(buffer))
129-
}
130-
}
89+
extends NativeColumnAccessor(buffer, STRING)
13190

13291
private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
133-
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
134-
with NullableColumnAccessor {
135-
136-
def columnType = BINARY
137-
138-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
139-
row(ordinal) = columnType.extract(buffer)
140-
}
141-
}
92+
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
93+
with NullableColumnAccessor
14294

14395
private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
144-
extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
145-
with NullableColumnAccessor {
146-
147-
def columnType = GENERIC
148-
149-
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
150-
val serialized = columnType.extract(buffer)
151-
row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
152-
}
153-
}
96+
extends BasicColumnAccessor[DataType, Array[Byte]](buffer, GENERIC)
97+
with NullableColumnAccessor
15498

15599
private[sql] object ColumnAccessor {
156100
def apply(b: ByteBuffer): ColumnAccessor = {

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala

Lines changed: 29 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,27 @@ import java.nio.{ByteBuffer, ByteOrder}
2222
import org.apache.spark.sql.Row
2323
import org.apache.spark.sql.catalyst.types._
2424
import org.apache.spark.sql.columnar.ColumnBuilder._
25-
import org.apache.spark.sql.execution.SparkSqlSerializer
2625

2726
private[sql] trait ColumnBuilder {
2827
/**
2928
* Initializes with an approximate lower bound on the expected number of elements in this column.
3029
*/
3130
def initialize(initialSize: Int, columnName: String = "")
3231

32+
def gatherStats(row: Row, ordinal: Int) {}
33+
3334
def appendFrom(row: Row, ordinal: Int)
3435

3536
def build(): ByteBuffer
3637
}
3738

38-
private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
39+
private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
40+
val columnType: ColumnType[T, JvmType])
41+
extends ColumnBuilder {
3942

4043
private var columnName: String = _
41-
protected var buffer: ByteBuffer = _
4244

43-
def columnType: ColumnType[T, JvmType]
45+
protected var buffer: ByteBuffer = _
4446

4547
override def initialize(initialSize: Int, columnName: String = "") = {
4648
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
@@ -49,18 +51,10 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
4951
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
5052
}
5153

52-
// Have to give a concrete implementation to make mixin possible
5354
override def appendFrom(row: Row, ordinal: Int) {
54-
doAppendFrom(row, ordinal)
55-
}
56-
57-
// Concrete `ColumnBuilder`s can override this method to append values
58-
protected def doAppendFrom(row: Row, ordinal: Int)
59-
60-
// Helper method to append primitive values (to avoid boxing cost)
61-
protected def appendValue(v: JvmType) {
62-
buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
63-
columnType.append(v, buffer)
55+
val field = columnType.getField(row, ordinal)
56+
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
57+
columnType.append(field, buffer)
6458
}
6559

6660
override def build() = {
@@ -70,82 +64,41 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
7064
}
7165

7266
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
73-
val columnType: NativeColumnType[T])
74-
extends BasicColumnBuilder[T, T#JvmType]
75-
with NullableColumnBuilder
67+
protected val columnStats: ColumnStats[T],
68+
columnType: NativeColumnType[T])
69+
extends BasicColumnBuilder[T, T#JvmType](columnType)
70+
with NullableColumnBuilder {
7671

77-
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
78-
override def doAppendFrom(row: Row, ordinal: Int) {
79-
appendValue(row.getBoolean(ordinal))
72+
override def gatherStats(row: Row, ordinal: Int) {
73+
columnStats.gatherStats(row, ordinal)
8074
}
8175
}
8276

83-
private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
84-
override def doAppendFrom(row: Row, ordinal: Int) {
85-
appendValue(row.getInt(ordinal))
86-
}
87-
}
77+
private[sql] class BooleanColumnBuilder
78+
extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
8879

89-
private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
90-
override def doAppendFrom(row: Row, ordinal: Int) {
91-
appendValue(row.getShort(ordinal))
92-
}
93-
}
80+
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
9481

95-
private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
96-
override def doAppendFrom(row: Row, ordinal: Int) {
97-
appendValue(row.getLong(ordinal))
98-
}
99-
}
82+
private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)
10083

101-
private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
102-
override def doAppendFrom(row: Row, ordinal: Int) {
103-
appendValue(row.getByte(ordinal))
104-
}
105-
}
84+
private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)
10685

107-
private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
108-
override def doAppendFrom(row: Row, ordinal: Int) {
109-
appendValue(row.getDouble(ordinal))
110-
}
111-
}
86+
private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)
11287

113-
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
114-
override def doAppendFrom(row: Row, ordinal: Int) {
115-
appendValue(row.getFloat(ordinal))
116-
}
117-
}
88+
private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)
11889

119-
private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
120-
override def doAppendFrom(row: Row, ordinal: Int) {
121-
appendValue(row.getString(ordinal))
122-
}
123-
}
90+
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)
12491

125-
private[sql] class BinaryColumnBuilder
126-
extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
127-
with NullableColumnBuilder {
128-
129-
def columnType = BINARY
92+
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStates, STRING)
13093

131-
override def doAppendFrom(row: Row, ordinal: Int) {
132-
appendValue(row(ordinal).asInstanceOf[Array[Byte]])
133-
}
134-
}
94+
private[sql] class BinaryColumnBuilder
95+
extends BasicColumnBuilder[BinaryType.type, Array[Byte]](BINARY)
96+
with NullableColumnBuilder
13597

13698
// TODO (lian) Add support for array, struct and map
13799
private[sql] class GenericColumnBuilder
138-
extends BasicColumnBuilder[DataType, Array[Byte]]
139-
with NullableColumnBuilder {
140-
141-
def columnType = GENERIC
142-
143-
override def doAppendFrom(row: Row, ordinal: Int) {
144-
val serialized = SparkSqlSerializer.serialize(row(ordinal))
145-
buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
146-
columnType.append(serialized, buffer)
147-
}
148-
}
100+
extends BasicColumnBuilder[DataType, Array[Byte]](GENERIC)
101+
with NullableColumnBuilder
149102

150103
private[sql] object ColumnBuilder {
151104
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104

0 commit comments

Comments
 (0)