Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class KafkaContinuousReader(

startOffsets.toSeq.map {
case (topicPartition, start) =>
KafkaContinuousDataReaderFactory(
KafkaContinuousInputPartition(
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
.asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
Expand Down Expand Up @@ -146,7 +146,7 @@ class KafkaContinuousReader(
}

/**
* A data reader factory for continuous Kafka processing. This will be serialized and transformed
* An input partition for continuous Kafka processing. This will be serialized and transformed
* into a full reader on executors.
*
* @param topicPartition The (topic, partition) pair this task is responsible for.
Expand All @@ -156,7 +156,7 @@ class KafkaContinuousReader(
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
* are skipped.
*/
case class KafkaContinuousDataReaderFactory(
case class KafkaContinuousInputPartition(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private[kafka010] class KafkaMicroBatchReader(

// Generate factories based on the offset ranges
val factories = offsetRanges.map { range =>
new KafkaMicroBatchDataReaderFactory(
new KafkaMicroBatchInputPartition(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}
factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
Expand Down Expand Up @@ -300,7 +300,7 @@ private[kafka010] class KafkaMicroBatchReader(
}

/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
private[kafka010] case class KafkaMicroBatchInputPartition(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
)
val factories = reader.planUnsafeInputPartitions().asScala
.map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
.map(_.asInstanceOf[KafkaMicroBatchInputPartition])
withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
assert(factories.size == numPartitionsGenerated)
factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
@InterfaceStability.Evolving
public interface ContinuousInputPartition<T> extends InputPartition<T> {
/**
* Create a DataReader with particular offset as its startOffset.
* Create an input partition reader with particular offset as its startOffset.
*
* @param offset offset want to set as the DataReader's startOffset.
* @param offset offset want to set as the input partition reader's startOffset.
*/
InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
public interface InputPartition<T> extends Serializable {

/**
* The preferred locations where the data reader returned by this partition can run faster,
* but Spark does not guarantee to run the data reader on these locations.
* The preferred locations where the input partition reader returned by this partition can run faster,
* but Spark does not guarantee to run the input partition reader on these locations.
* The implementations should make sure that it can be run on any location.
* The location is a string representing the host name.
*
Expand All @@ -53,7 +53,7 @@ default String[] preferredLocations() {
}

/**
* Returns a data reader to do the actual reading work.
* Returns an input partition reader to do the actual reading work.
*
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
* An input partition reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
* outputting data for a RDD partition.
*
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
* source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal input
* partition readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for input partition
* readers that mix in {@link SupportsScanUnsafeRow}.
*/
@InterfaceStability.Evolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: I

class DataSourceRDD[T: ClassTag](
sc: SparkContext,
@transient private val readerFactories: Seq[InputPartition[T]])
@transient private val inputPartitions: Seq[InputPartition[T]])
extends RDD[T](sc, Nil) {

override protected def getPartitions: Array[Partition] = {
readerFactories.zipWithIndex.map {
case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
inputPartitions.zipWithIndex.map {
case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition)
}.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
val start = partitionStartMap(i)
// Have each partition advance by numPartitions each row, with starting points staggered
// by their partition index.
RateStreamContinuousDataReaderFactory(
RateStreamContinuousInputPartition(
start.value,
start.runTimeMs,
i,
Expand Down Expand Up @@ -113,7 +113,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)

}

case class RateStreamContinuousDataReaderFactory(
case class RateStreamContinuousInputPartition(
startValue: Long,
startTimeMs: Long,
partitionIndex: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))

newBlocks.map { block =>
new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
new MemoryStreamInputPartition(block).asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
}
}
Expand Down Expand Up @@ -201,7 +201,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}


class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
class MemoryStreamInputPartition(records: Array[UnsafeRow])
extends InputPartition[UnsafeRow] {
override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
new InputPartitionReader[UnsafeRow] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import org.apache.spark.util.RpcUtils
* * ContinuousMemoryStream maintains a list of records for each partition. addData() will
* distribute records evenly-ish across partitions.
* * RecordEndpoint is set up as an endpoint for executor-side
* ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified
* offset within the list, or null if that offset doesn't yet have a record.
* ContinuousMemoryStreamInputPartitionReader instances to poll. It returns the record at
* the specified offset within the list, or null if that offset doesn't yet have a record.
*/
class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport {
Expand Down Expand Up @@ -106,7 +106,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa

startOffset.partitionNums.map {
case (part, index) =>
new ContinuousMemoryStreamDataReaderFactory(
new ContinuousMemoryStreamInputPartition(
endpointName, part, index): InputPartition[Row]
}.toList.asJava
}
Expand Down Expand Up @@ -157,9 +157,9 @@ object ContinuousMemoryStream {
}

/**
* Data reader factory for continuous memory stream.
* An input partition for continuous memory stream.
*/
class ContinuousMemoryStreamDataReaderFactory(
class ContinuousMemoryStreamInputPartition(
driverEndpointName: String,
partition: Int,
startOffset: Int) extends InputPartition[Row] {
Expand All @@ -168,7 +168,7 @@ class ContinuousMemoryStreamDataReaderFactory(
}

/**
* Data reader for continuous memory stream.
* An input partition reader for continuous memory stream.
*
* Polls the driver endpoint for new records.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
}

(0 until numPartitions).map { p =>
new RateStreamMicroBatchDataReaderFactory(
new RateStreamMicroBatchInputPartition(
p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
: InputPartition[Row]
}.toList.asJava
Expand All @@ -182,7 +182,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
}

class RateStreamMicroBatchDataReaderFactory(
class RateStreamMicroBatchInputPartition(
partitionId: Int,
numPartitions: Int,
rangeStart: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class RateSourceSuite extends StreamTest {

val data = scala.collection.mutable.ListBuffer[Row]()
tasks.asScala.foreach {
case t: RateStreamContinuousDataReaderFactory =>
case t: RateStreamContinuousInputPartition =>
val startTimeMs = reader.getStartOffset()
.asInstanceOf[RateStreamOffset]
.partitionToValueAndRunTimeMs(t.partitionIndex)
Expand Down