Skip to content

Commit 72f3c1a

Browse files
committed
SPARK-23325: Use InternalRow when reading with DataSourceV2.
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Because the API is changing significantly in the same places, this also renames ReaderFactory back to ReadTask. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.
1 parent 1d758dc commit 72f3c1a

29 files changed

+202
-199
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition
2626
import org.apache.spark.TaskContext
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.catalyst.InternalRow
2930
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3031
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
3132
import org.apache.spark.sql.sources.v2.reader._
@@ -53,7 +54,7 @@ class KafkaContinuousReader(
5354
metadataPath: String,
5455
initialOffsets: KafkaOffsetRangeLimit,
5556
failOnDataLoss: Boolean)
56-
extends ContinuousReader with SupportsScanUnsafeRow with Logging {
57+
extends ContinuousReader with Logging {
5758

5859
private lazy val session = SparkSession.getActiveSession.get
5960
private lazy val sc = session.sparkContext
@@ -86,7 +87,7 @@ class KafkaContinuousReader(
8687
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
8788
}
8889

89-
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
90+
override def createReadTasks(): ju.List[ReadTask[InternalRow]] = {
9091
import scala.collection.JavaConverters._
9192

9293
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@@ -104,12 +105,13 @@ class KafkaContinuousReader(
104105
oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
105106
knownPartitions = startOffsets.keySet
106107

107-
startOffsets.toSeq.map {
108+
val tasks: Seq[ReadTask[InternalRow]] = startOffsets.toSeq.map {
108109
case (topicPartition, start) =>
109110
KafkaContinuousDataReaderFactory(
110111
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
111-
.asInstanceOf[DataReaderFactory[UnsafeRow]]
112-
}.asJava
112+
}
113+
114+
tasks.asJava
113115
}
114116

115117
/** Stop this source and free any resources it has allocated. */
@@ -161,7 +163,7 @@ case class KafkaContinuousDataReaderFactory(
161163
startOffset: Long,
162164
kafkaParams: ju.Map[String, Object],
163165
pollTimeoutMs: Long,
164-
failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {
166+
failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[InternalRow] {
165167

166168
override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
167169
val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ import org.apache.spark.SparkEnv
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
3131
import org.apache.spark.sql.SparkSession
32+
import org.apache.spark.sql.catalyst.InternalRow
3233
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3334
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
3435
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
3536
import org.apache.spark.sql.sources.v2.DataSourceOptions
36-
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
37+
import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
3738
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
3839
import org.apache.spark.sql.types.StructType
3940
import org.apache.spark.util.UninterruptibleThread
@@ -61,7 +62,7 @@ private[kafka010] class KafkaMicroBatchReader(
6162
metadataPath: String,
6263
startingOffsets: KafkaOffsetRangeLimit,
6364
failOnDataLoss: Boolean)
64-
extends MicroBatchReader with SupportsScanUnsafeRow with Logging {
65+
extends MicroBatchReader with Logging {
6566

6667
private var startPartitionOffsets: PartitionOffsetMap = _
6768
private var endPartitionOffsets: PartitionOffsetMap = _
@@ -101,7 +102,7 @@ private[kafka010] class KafkaMicroBatchReader(
101102
}
102103
}
103104

104-
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
105+
override def createReadTasks(): ju.List[ReadTask[InternalRow]] = {
105106
// Find the new partitions, and get their earliest offsets
106107
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
107108
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
@@ -142,11 +143,11 @@ private[kafka010] class KafkaMicroBatchReader(
142143
val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
143144

144145
// Generate factories based on the offset ranges
145-
val factories = offsetRanges.map { range =>
146+
val tasks: Seq[ReadTask[InternalRow]] = offsetRanges.map { range =>
146147
new KafkaMicroBatchDataReaderFactory(
147148
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
148149
}
149-
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
150+
tasks.asJava
150151
}
151152

152153
override def getStartOffset: Offset = {
@@ -299,13 +300,13 @@ private[kafka010] class KafkaMicroBatchReader(
299300
}
300301
}
301302

302-
/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
303+
/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming query. */
303304
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
304305
offsetRange: KafkaOffsetRange,
305306
executorKafkaParams: ju.Map[String, Object],
306307
pollTimeoutMs: Long,
307308
failOnDataLoss: Boolean,
308-
reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
309+
reuseKafkaConsumer: Boolean) extends ReadTask[InternalRow] {
309310

310311
override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
311312

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
2222

2323
/**
24-
* A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can
24+
* A mix-in interface for {@link ReadTask}. Continuous data reader factories can
2525
* implement this interface to provide creating {@link DataReader} with particular offset.
2626
*/
2727
@InterfaceStability.Evolving
28-
public interface ContinuousDataReaderFactory<T> extends DataReaderFactory<T> {
28+
public interface ContinuousDataReaderFactory<T> extends ReadTask<T> {
2929
/**
3030
* Create a DataReader with particular offset as its startOffset.
3131
*

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import org.apache.spark.annotation.InterfaceStability;
2424

2525
/**
26-
* A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
26+
* A data reader returned by {@link ReadTask#createDataReader()} and is responsible for
2727
* outputting data for a RDD partition.
2828
*
29-
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
30-
* source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
31-
* readers that mix in {@link SupportsScanUnsafeRow}.
29+
* Note that, Currently the type `T` should be {@link org.apache.spark.sql.catalyst.InternalRow}
30+
* for normal data source readers, or {@link org.apache.spark.sql.vectorized.ColumnarBatch} for
31+
* data source readers that mix in {@link SupportsScanColumnarBatch}.
3232
*/
3333
@InterfaceStability.Evolving
3434
public interface DataReader<T> extends Closeable {

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.List;
2121

2222
import org.apache.spark.annotation.InterfaceStability;
23-
import org.apache.spark.sql.Row;
23+
import org.apache.spark.sql.catalyst.InternalRow;
2424
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2525
import org.apache.spark.sql.sources.v2.ReadSupport;
2626
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
@@ -31,19 +31,19 @@
3131
* {@link ReadSupport#createReader(DataSourceOptions)} or
3232
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
3333
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
34-
* logic is delegated to {@link DataReaderFactory}s that are returned by
35-
* {@link #createDataReaderFactories()}.
34+
* logic is delegated to {@link ReadTask}s that are returned by
35+
* {@link #createReadTasks()}.
3636
*
3737
* There are mainly 3 kinds of query optimizations:
3838
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
3939
* pruning), etc. Names of these interfaces start with `SupportsPushDown`.
4040
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
4141
* Names of these interfaces start with `SupportsReporting`.
42-
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc.
42+
* 3. Special scans. E.g, columnar scan.
4343
* Names of these interfaces start with `SupportsScan`. Note that a reader should only
4444
* implement at most one of the special scans, if more than one special scans are implemented,
4545
* only one of them would be respected, according to the priority list from high to low:
46-
* {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}.
46+
* {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}.
4747
*
4848
* If an exception was throw when applying any of these query optimizations, the action would fail
4949
* and no Spark job was submitted.
@@ -76,5 +76,5 @@ public interface DataSourceReader {
7676
* If this method fails (by throwing an exception), the action would fail and no Spark job was
7777
* submitted.
7878
*/
79-
List<DataReaderFactory<Row>> createDataReaderFactories();
79+
List<ReadTask<InternalRow>> createReadTasks();
8080
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java renamed to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@
2222
import org.apache.spark.annotation.InterfaceStability;
2323

2424
/**
25-
* A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
25+
* A reader factory returned by {@link DataSourceReader#createReadTasks()} and is
2626
* responsible for creating the actual data reader. The relationship between
27-
* {@link DataReaderFactory} and {@link DataReader}
27+
* {@link ReadTask} and {@link DataReader}
2828
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
2929
*
30-
* Note that, the reader factory will be serialized and sent to executors, then the data reader
31-
* will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
30+
* Note that this read task will be serialized and sent to executors, then the data reader
31+
* will be created on executors and do the actual reading. So {@link ReadTask} must be
3232
* serializable and {@link DataReader} doesn't need to be.
3333
*/
3434
@InterfaceStability.Evolving
35-
public interface DataReaderFactory<T> extends Serializable {
35+
public interface ReadTask<T> extends Serializable {
3636

3737
/**
3838
* The preferred locations where the data reader returned by this reader factory can run faster,
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.apache.spark.sql.sources.v2.reader;
2+
3+
import org.apache.spark.annotation.InterfaceStability;
4+
import org.apache.spark.sql.Row;
5+
import org.apache.spark.sql.catalyst.InternalRow;
6+
7+
import java.util.List;
8+
9+
/**
10+
* A mix-in interface for {@link DataSourceReader} to assist in moving from Row to InternalRow.
11+
* Data source readers can implement this interface to output {@link Row}.
12+
*/
13+
@Deprecated
14+
@InterfaceStability.Evolving
15+
public interface SupportsDeprecatedScanRow extends DataSourceReader {
16+
@Override
17+
default List<ReadTask<InternalRow>> createReadTasks() {
18+
throw new IllegalStateException(
19+
"createReadTasks not supported by default within SupportsDeprecatedScanRow.");
20+
}
21+
22+
/**
23+
* Returns a list of reader factories. Each factory is responsible for creating a data reader to
24+
* output data for one RDD partition. That means the number of factories returned here is same as
25+
* the number of RDD partitions this scan outputs.
26+
*
27+
* Note that, this may not be a full scan if the data source reader mixes in other optimization
28+
* interfaces like column pruning, filter push-down, etc. These optimizations are applied before
29+
* Spark issues the scan request.
30+
*
31+
* If this method fails (by throwing an exception), the action would fail and no Spark job was
32+
* submitted.
33+
*/
34+
List<ReadTask<Row>> createDataReaderFactories();
35+
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
2525
* interface to report data partitioning and try to avoid shuffle at Spark side.
2626
*
27-
* Note that, when the reader creates exactly one {@link DataReaderFactory}, Spark may avoid
27+
* Note that, when the reader creates exactly one {@link ReadTask}, Spark may avoid
2828
* adding a shuffle even if the reader does not implement this interface.
2929
*/
3030
@InterfaceStability.Evolving

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.List;
2121

2222
import org.apache.spark.annotation.InterfaceStability;
23-
import org.apache.spark.sql.Row;
23+
import org.apache.spark.sql.catalyst.InternalRow;
2424
import org.apache.spark.sql.vectorized.ColumnarBatch;
2525

2626
/**
@@ -30,22 +30,22 @@
3030
@InterfaceStability.Evolving
3131
public interface SupportsScanColumnarBatch extends DataSourceReader {
3232
@Override
33-
default List<DataReaderFactory<Row>> createDataReaderFactories() {
33+
default List<ReadTask<InternalRow>> createReadTasks() {
3434
throw new IllegalStateException(
35-
"createDataReaderFactories not supported by default within SupportsScanColumnarBatch.");
35+
"createReadTasks not supported by default within SupportsScanColumnarBatch.");
3636
}
3737

3838
/**
39-
* Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
39+
* Similar to {@link DataSourceReader#createReadTasks()}, but returns columnar data
4040
* in batches.
4141
*/
42-
List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();
42+
List<ReadTask<ColumnarBatch>> createBatchReadTasks();
4343

4444
/**
4545
* Returns true if the concrete data source reader can read data in batch according to the scan
4646
* properties like required columns, pushes filters, etc. It's possible that the implementation
4747
* can only support some certain columns with certain types. Users can overwrite this method and
48-
* {@link #createDataReaderFactories()} to fallback to normal read path under some conditions.
48+
* {@link #createReadTasks()} to fallback to normal read path under some conditions.
4949
*/
5050
default boolean enableBatchRead() {
5151
return true;

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)