Skip to content

Commit 710e4e8

Browse files
arunmahadevangatorsmile
authored andcommitted
[SPARK-24308][SQL] Handle DataReaderFactory to InputPartition rename in left over classes
## What changes were proposed in this pull request? SPARK-24073 renames DataReaderFactory -> InputPartition and DataReader -> InputPartitionReader. Some classes still reflects the old name and causes confusion. This patch renames the left over classes to reflect the new interface and fixes a few comments. ## How was this patch tested? Existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Arun Mahadevan <arunm@apache.org> Closes #21355 from arunmahadevan/SPARK-24308.
1 parent a53ea70 commit 710e4e8

File tree

12 files changed

+30
-30
lines changed

12 files changed

+30
-30
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class KafkaContinuousReader(
106106

107107
startOffsets.toSeq.map {
108108
case (topicPartition, start) =>
109-
KafkaContinuousDataReaderFactory(
109+
KafkaContinuousInputPartition(
110110
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
111111
.asInstanceOf[InputPartition[UnsafeRow]]
112112
}.asJava
@@ -146,7 +146,7 @@ class KafkaContinuousReader(
146146
}
147147

148148
/**
149-
* A data reader factory for continuous Kafka processing. This will be serialized and transformed
149+
* An input partition for continuous Kafka processing. This will be serialized and transformed
150150
* into a full reader on executors.
151151
*
152152
* @param topicPartition The (topic, partition) pair this task is responsible for.
@@ -156,7 +156,7 @@ class KafkaContinuousReader(
156156
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
157157
* are skipped.
158158
*/
159-
case class KafkaContinuousDataReaderFactory(
159+
case class KafkaContinuousInputPartition(
160160
topicPartition: TopicPartition,
161161
startOffset: Long,
162162
kafkaParams: ju.Map[String, Object],

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private[kafka010] class KafkaMicroBatchReader(
143143

144144
// Generate factories based on the offset ranges
145145
val factories = offsetRanges.map { range =>
146-
new KafkaMicroBatchDataReaderFactory(
146+
new KafkaMicroBatchInputPartition(
147147
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
148148
}
149149
factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
@@ -300,7 +300,7 @@ private[kafka010] class KafkaMicroBatchReader(
300300
}
301301

302302
/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
303-
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
303+
private[kafka010] case class KafkaMicroBatchInputPartition(
304304
offsetRange: KafkaOffsetRange,
305305
executorKafkaParams: ju.Map[String, Object],
306306
pollTimeoutMs: Long,

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
679679
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
680680
)
681681
val factories = reader.planUnsafeInputPartitions().asScala
682-
.map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
682+
.map(_.asInstanceOf[KafkaMicroBatchInputPartition])
683683
withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
684684
assert(factories.size == numPartitionsGenerated)
685685
factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
@InterfaceStability.Evolving
2828
public interface ContinuousInputPartition<T> extends InputPartition<T> {
2929
/**
30-
* Create a DataReader with particular offset as its startOffset.
30+
* Create an input partition reader with particular offset as its startOffset.
3131
*
32-
* @param offset offset want to set as the DataReader's startOffset.
32+
* @param offset offset want to set as the input partition reader's startOffset.
3333
*/
3434
InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
3535
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
public interface InputPartition<T> extends Serializable {
3737

3838
/**
39-
* The preferred locations where the data reader returned by this partition can run faster,
40-
* but Spark does not guarantee to run the data reader on these locations.
39+
* The preferred locations where the input partition reader returned by this partition can run faster,
40+
* but Spark does not guarantee to run the input partition reader on these locations.
4141
* The implementations should make sure that it can be run on any location.
4242
* The location is a string representing the host name.
4343
*
@@ -53,7 +53,7 @@ default String[] preferredLocations() {
5353
}
5454

5555
/**
56-
* Returns a data reader to do the actual reading work.
56+
* Returns an input partition reader to do the actual reading work.
5757
*
5858
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
5959
* get retried until hitting the maximum retry times.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
import org.apache.spark.annotation.InterfaceStability;
2424

2525
/**
26-
* A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
26+
* An input partition reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
2727
* outputting data for a RDD partition.
2828
*
29-
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
30-
* source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
29+
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal input
30+
* partition readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for input partition
3131
* readers that mix in {@link SupportsScanUnsafeRow}.
3232
*/
3333
@InterfaceStability.Evolving

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: I
2929

3030
class DataSourceRDD[T: ClassTag](
3131
sc: SparkContext,
32-
@transient private val readerFactories: Seq[InputPartition[T]])
32+
@transient private val inputPartitions: Seq[InputPartition[T]])
3333
extends RDD[T](sc, Nil) {
3434

3535
override protected def getPartitions: Array[Partition] = {
36-
readerFactories.zipWithIndex.map {
37-
case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
36+
inputPartitions.zipWithIndex.map {
37+
case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition)
3838
}.toArray
3939
}
4040

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
8585
val start = partitionStartMap(i)
8686
// Have each partition advance by numPartitions each row, with starting points staggered
8787
// by their partition index.
88-
RateStreamContinuousDataReaderFactory(
88+
RateStreamContinuousInputPartition(
8989
start.value,
9090
start.runTimeMs,
9191
i,
@@ -113,7 +113,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
113113

114114
}
115115

116-
case class RateStreamContinuousDataReaderFactory(
116+
case class RateStreamContinuousInputPartition(
117117
startValue: Long,
118118
startTimeMs: Long,
119119
partitionIndex: Int,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
156156
logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
157157

158158
newBlocks.map { block =>
159-
new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
159+
new MemoryStreamInputPartition(block).asInstanceOf[InputPartition[UnsafeRow]]
160160
}.asJava
161161
}
162162
}
@@ -201,7 +201,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
201201
}
202202

203203

204-
class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
204+
class MemoryStreamInputPartition(records: Array[UnsafeRow])
205205
extends InputPartition[UnsafeRow] {
206206
override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
207207
new InputPartitionReader[UnsafeRow] {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ import org.apache.spark.util.RpcUtils
4444
* * ContinuousMemoryStream maintains a list of records for each partition. addData() will
4545
* distribute records evenly-ish across partitions.
4646
* * RecordEndpoint is set up as an endpoint for executor-side
47-
* ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified
48-
* offset within the list, or null if that offset doesn't yet have a record.
47+
* ContinuousMemoryStreamInputPartitionReader instances to poll. It returns the record at
48+
* the specified offset within the list, or null if that offset doesn't yet have a record.
4949
*/
5050
class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
5151
extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport {
@@ -106,7 +106,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
106106

107107
startOffset.partitionNums.map {
108108
case (part, index) =>
109-
new ContinuousMemoryStreamDataReaderFactory(
109+
new ContinuousMemoryStreamInputPartition(
110110
endpointName, part, index): InputPartition[Row]
111111
}.toList.asJava
112112
}
@@ -157,9 +157,9 @@ object ContinuousMemoryStream {
157157
}
158158

159159
/**
160-
* Data reader factory for continuous memory stream.
160+
* An input partition for continuous memory stream.
161161
*/
162-
class ContinuousMemoryStreamDataReaderFactory(
162+
class ContinuousMemoryStreamInputPartition(
163163
driverEndpointName: String,
164164
partition: Int,
165165
startOffset: Int) extends InputPartition[Row] {
@@ -168,7 +168,7 @@ class ContinuousMemoryStreamDataReaderFactory(
168168
}
169169

170170
/**
171-
* Data reader for continuous memory stream.
171+
* An input partition reader for continuous memory stream.
172172
*
173173
* Polls the driver endpoint for new records.
174174
*/

0 commit comments

Comments
 (0)