Skip to content

Commit 1423979

Browse files
committed
SPARK-24073: Rename DataReaderFactory to InputPartition.
Renames: * DataReaderFactory -> InputPartition * DataReader -> InputPartitionReader * createDataReaderFactories -> planInputPartitions * createUnsafeDataReaderFactories -> planUnsafeInputPartitions * createBatchDataReaderFactories -> planBatchInputPartitions This fixes the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One InputPartition is a specific partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.
1 parent 487faf1 commit 1423979

File tree

39 files changed

+272
-263
lines changed

39 files changed

+272
-263
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
2929
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3030
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
3131
import org.apache.spark.sql.sources.v2.reader._
32-
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
32+
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
3333
import org.apache.spark.sql.types.StructType
3434

3535
/**
@@ -86,7 +86,7 @@ class KafkaContinuousReader(
8686
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
8787
}
8888

89-
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
89+
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
9090
import scala.collection.JavaConverters._
9191

9292
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@@ -108,7 +108,7 @@ class KafkaContinuousReader(
108108
case (topicPartition, start) =>
109109
KafkaContinuousDataReaderFactory(
110110
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
111-
.asInstanceOf[DataReaderFactory[UnsafeRow]]
111+
.asInstanceOf[InputPartition[UnsafeRow]]
112112
}.asJava
113113
}
114114

@@ -161,18 +161,18 @@ case class KafkaContinuousDataReaderFactory(
161161
startOffset: Long,
162162
kafkaParams: ju.Map[String, Object],
163163
pollTimeoutMs: Long,
164-
failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {
164+
failOnDataLoss: Boolean) extends ContinuousInputPartition[UnsafeRow] {
165165

166-
override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
166+
override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[UnsafeRow] = {
167167
val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
168168
require(kafkaOffset.topicPartition == topicPartition,
169169
s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
170-
new KafkaContinuousDataReader(
170+
new KafkaContinuousInputPartitionReader(
171171
topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
172172
}
173173

174-
override def createDataReader(): KafkaContinuousDataReader = {
175-
new KafkaContinuousDataReader(
174+
override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
175+
new KafkaContinuousInputPartitionReader(
176176
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
177177
}
178178
}
@@ -187,12 +187,12 @@ case class KafkaContinuousDataReaderFactory(
187187
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
188188
* are skipped.
189189
*/
190-
class KafkaContinuousDataReader(
190+
class KafkaContinuousInputPartitionReader(
191191
topicPartition: TopicPartition,
192192
startOffset: Long,
193193
kafkaParams: ju.Map[String, Object],
194194
pollTimeoutMs: Long,
195-
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
195+
failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[UnsafeRow] {
196196
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
197197
private val converter = new KafkaRecordToUnsafeRowConverter
198198

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3333
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
3434
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
3535
import org.apache.spark.sql.sources.v2.DataSourceOptions
36-
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
36+
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
3737
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
3838
import org.apache.spark.sql.types.StructType
3939
import org.apache.spark.util.UninterruptibleThread
@@ -101,7 +101,7 @@ private[kafka010] class KafkaMicroBatchReader(
101101
}
102102
}
103103

104-
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
104+
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
105105
// Find the new partitions, and get their earliest offsets
106106
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
107107
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
@@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader(
146146
new KafkaMicroBatchDataReaderFactory(
147147
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
148148
}
149-
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
149+
factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
150150
}
151151

152152
override def getStartOffset: Offset = {
@@ -299,27 +299,28 @@ private[kafka010] class KafkaMicroBatchReader(
299299
}
300300
}
301301

302-
/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
302+
/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
303303
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
304304
offsetRange: KafkaOffsetRange,
305305
executorKafkaParams: ju.Map[String, Object],
306306
pollTimeoutMs: Long,
307307
failOnDataLoss: Boolean,
308-
reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
308+
reuseKafkaConsumer: Boolean) extends InputPartition[UnsafeRow] {
309309

310310
override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
311311

312-
override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
313-
offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
312+
override def createPartitionReader(): InputPartitionReader[UnsafeRow] =
313+
new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
314+
failOnDataLoss, reuseKafkaConsumer)
314315
}
315316

316-
/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
317-
private[kafka010] case class KafkaMicroBatchDataReader(
317+
/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
318+
private[kafka010] case class KafkaMicroBatchInputPartitionReader(
318319
offsetRange: KafkaOffsetRange,
319320
executorKafkaParams: ju.Map[String, Object],
320321
pollTimeoutMs: Long,
321322
failOnDataLoss: Boolean,
322-
reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {
323+
reuseKafkaConsumer: Boolean) extends InputPartitionReader[UnsafeRow] with Logging {
323324

324325
private val consumer = KafkaDataConsumer.acquire(
325326
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
3131
import org.apache.spark.sql.execution.streaming.{Sink, Source}
3232
import org.apache.spark.sql.sources._
3333
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
34+
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
3435
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
3536
import org.apache.spark.sql.streaming.OutputMode
3637
import org.apache.spark.sql.types.StructType
@@ -149,7 +150,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
149150
}
150151

151152
/**
152-
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
153+
* Creates a [[ContinuousInputPartitionReader]] to read
153154
* Kafka data in a continuous streaming query.
154155
*/
155156
override def createContinuousReader(

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
678678
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
679679
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
680680
)
681-
val factories = reader.createUnsafeRowReaderFactories().asScala
681+
val factories = reader.planUnsafeInputPartitions().asScala
682682
.map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
683683
withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
684684
assert(factories.size == numPartitionsGenerated)

sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
3434
* streaming query.
3535
*
3636
* The execution engine will create a micro-batch reader at the start of a streaming query,
37-
* alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
37+
* alternate calls to setOffsetRange and planInputPartitions for each batch to process, and
3838
* then call stop() when the execution is complete. Note that a single query may have multiple
3939
* executions due to restart or failure recovery.
4040
*

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java renamed to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
2222

2323
/**
24-
* A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can
25-
* implement this interface to provide creating {@link DataReader} with particular offset.
24+
* A mix-in interface for {@link InputPartition}. Continuous input partitions can
25+
* implement this interface to provide creating {@link InputPartitionReader} with particular offset.
2626
*/
2727
@InterfaceStability.Evolving
28-
public interface ContinuousDataReaderFactory<T> extends DataReaderFactory<T> {
28+
public interface ContinuousInputPartition<T> extends InputPartition<T> {
2929
/**
3030
* Create a DataReader with particular offset as its startOffset.
3131
*
3232
* @param offset offset want to set as the DataReader's startOffset.
3333
*/
34-
DataReader<T> createDataReaderWithOffset(PartitionOffset offset);
34+
InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
3535
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
* {@link ReadSupport#createReader(DataSourceOptions)} or
3232
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
3333
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
34-
* logic is delegated to {@link DataReaderFactory}s that are returned by
35-
* {@link #createDataReaderFactories()}.
34+
* logic is delegated to {@link InputPartition}s that are returned by
35+
* {@link #planInputPartitions()}.
3636
*
3737
* There are mainly 3 kinds of query optimizations:
3838
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
@@ -65,8 +65,8 @@ public interface DataSourceReader {
6565
StructType readSchema();
6666

6767
/**
68-
* Returns a list of reader factories. Each factory is responsible for creating a data reader to
69-
* output data for one RDD partition. That means the number of factories returned here is same as
68+
* Returns a list of read tasks. Each task is responsible for creating a data reader to
69+
* output data for one RDD partition. That means the number of tasks returned here is same as
7070
* the number of RDD partitions this scan outputs.
7171
*
7272
* Note that, this may not be a full scan if the data source reader mixes in other optimization
@@ -76,5 +76,5 @@ public interface DataSourceReader {
7676
* If this method fails (by throwing an exception), the action would fail and no Spark job was
7777
* submitted.
7878
*/
79-
List<DataReaderFactory<Row>> createDataReaderFactories();
79+
List<InputPartition<Row>> planInputPartitions();
8080
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java renamed to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,20 @@
2222
import org.apache.spark.annotation.InterfaceStability;
2323

2424
/**
25-
* A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
25+
* An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
2626
* responsible for creating the actual data reader. The relationship between
27-
* {@link DataReaderFactory} and {@link DataReader}
27+
* {@link InputPartition} and {@link InputPartitionReader}
2828
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
2929
*
30-
* Note that, the reader factory will be serialized and sent to executors, then the data reader
31-
* will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
32-
* serializable and {@link DataReader} doesn't need to be.
30+
* Note that input partitions will be serialized and sent to executors, then the partition reader
31+
* will be created on executors and do the actual reading. So {@link InputPartition} must be
32+
* serializable and {@link InputPartitionReader} doesn't need to be.
3333
*/
3434
@InterfaceStability.Evolving
35-
public interface DataReaderFactory<T> extends Serializable {
35+
public interface InputPartition<T> extends Serializable {
3636

3737
/**
38-
* The preferred locations where the data reader returned by this reader factory can run faster,
38+
* The preferred locations where the data reader returned by this partition can run faster,
3939
* but Spark does not guarantee to run the data reader on these locations.
4040
* The implementations should make sure that it can be run on any location.
4141
* The location is a string representing the host name.
@@ -57,5 +57,5 @@ default String[] preferredLocations() {
5757
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
5858
* get retried until hitting the maximum retry times.
5959
*/
60-
DataReader<T> createDataReader();
60+
InputPartitionReader<T> createPartitionReader();
6161
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java renamed to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
import org.apache.spark.annotation.InterfaceStability;
2424

2525
/**
26-
* A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
26+
* A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
2727
* outputting data for a RDD partition.
2828
*
2929
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
3030
* source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
3131
* readers that mix in {@link SupportsScanUnsafeRow}.
3232
*/
3333
@InterfaceStability.Evolving
34-
public interface DataReader<T> extends Closeable {
34+
public interface InputPartitionReader<T> extends Closeable {
3535

3636
/**
3737
* Proceed to next record, returns false if there is no more records.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
2525
* interface to report data partitioning and try to avoid shuffle at Spark side.
2626
*
27-
* Note that, when the reader creates exactly one {@link DataReaderFactory}, Spark may avoid
27+
* Note that, when the reader creates exactly one {@link InputPartition}, Spark may avoid
2828
* adding a shuffle even if the reader does not implement this interface.
2929
*/
3030
@InterfaceStability.Evolving

0 commit comments

Comments
 (0)