Skip to content

Commit defc54c

Browse files
committed
[SPARK-24971][SQL] remove SupportsDeprecatedScanRow
## What changes were proposed in this pull request? This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21921 from cloud-fan/row.
1 parent 1122754 commit defc54c

File tree

17 files changed

+133
-314
lines changed

17 files changed

+133
-314
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
* pruning), etc. Names of these interfaces start with `SupportsPushDown`.
4040
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
4141
* Names of these interfaces start with `SupportsReporting`.
42-
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc.
43-
* Names of these interfaces start with `SupportsScan`. Note that a reader should only
44-
* implement at most one of the special scans, if more than one special scans are implemented,
45-
* only one of them would be respected, according to the priority list from high to low:
46-
* {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}.
42+
* 3. Columnar scan if implements {@link SupportsScanColumnarBatch}.
4743
*
4844
* If an exception was throw when applying any of these query optimizations, the action will fail
4945
* and no Spark job will be submitted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
*
2929
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}
3030
* for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data
31-
* source readers that mix in {@link SupportsScanColumnarBatch}, or {@link org.apache.spark.sql.Row}
32-
* for data source readers that mix in {@link SupportsDeprecatedScanRow}.
31+
* source readers that mix in {@link SupportsScanColumnarBatch}.
3332
*/
3433
@InterfaceStability.Evolving
3534
public interface InputPartitionReader<T> extends Closeable {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.spark.rdd.RDD
23-
import org.apache.spark.sql.Row
2423
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
2624
import org.apache.spark.sql.catalyst.expressions._
2725
import org.apache.spark.sql.catalyst.plans.physical
2826
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
@@ -31,7 +29,6 @@ import org.apache.spark.sql.execution.streaming.continuous._
3129
import org.apache.spark.sql.sources.v2.DataSourceV2
3230
import org.apache.spark.sql.sources.v2.reader._
3331
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
34-
import org.apache.spark.sql.types.StructType
3532
import org.apache.spark.sql.vectorized.ColumnarBatch
3633

3734
/**
@@ -75,13 +72,8 @@ case class DataSourceV2ScanExec(
7572
case _ => super.outputPartitioning
7673
}
7774

78-
private lazy val partitions: Seq[InputPartition[InternalRow]] = reader match {
79-
case r: SupportsDeprecatedScanRow =>
80-
r.planRowInputPartitions().asScala.map {
81-
new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[InternalRow]
82-
}
83-
case _ =>
84-
reader.planInputPartitions().asScala
75+
private lazy val partitions: Seq[InputPartition[InternalRow]] = {
76+
reader.planInputPartitions().asScala
8577
}
8678

8779
private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
@@ -131,27 +123,3 @@ case class DataSourceV2ScanExec(
131123
}
132124
}
133125
}
134-
135-
class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
136-
extends InputPartition[InternalRow] {
137-
138-
override def preferredLocations: Array[String] = partition.preferredLocations
139-
140-
override def createPartitionReader: InputPartitionReader[InternalRow] = {
141-
new RowToUnsafeInputPartitionReader(
142-
partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
143-
}
144-
}
145-
146-
class RowToUnsafeInputPartitionReader(
147-
val rowReader: InputPartitionReader[Row],
148-
encoder: ExpressionEncoder[Row])
149-
150-
extends InputPartitionReader[InternalRow] {
151-
152-
override def next: Boolean = rowReader.next
153-
154-
override def get: UnsafeRow = encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow]
155-
156-
override def close(): Unit = rowReader.close()
157-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.streaming.continuous
1919

2020
import org.apache.spark._
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.sql.Row
2322
import org.apache.spark.sql.catalyst.InternalRow
24-
import org.apache.spark.sql.execution.datasources.v2.RowToUnsafeInputPartitionReader
2523
import org.apache.spark.sql.sources.v2.reader._
2624
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
2725
import org.apache.spark.util.NextIterator
@@ -104,8 +102,6 @@ object ContinuousDataSourceRDD {
104102
reader: InputPartitionReader[InternalRow]): ContinuousInputPartitionReader[_] = {
105103
reader match {
106104
case r: ContinuousInputPartitionReader[InternalRow] => r
107-
case wrapped: RowToUnsafeInputPartitionReader =>
108-
wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]]
109105
case _ =>
110106
throw new IllegalStateException(s"Unknown continuous reader type ${reader.getClass}")
111107
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.json4s.DefaultFormats
2323
import org.json4s.jackson.Serialization
2424

25-
import org.apache.spark.sql.Row
25+
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2727
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
2828
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
@@ -34,8 +34,7 @@ import org.apache.spark.sql.types.StructType
3434
case class RateStreamPartitionOffset(
3535
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
3636

37-
class RateStreamContinuousReader(options: DataSourceOptions)
38-
extends ContinuousReader with SupportsDeprecatedScanRow {
37+
class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousReader {
3938
implicit val defaultFormats: DefaultFormats = DefaultFormats
4039

4140
val creationTime = System.currentTimeMillis()
@@ -67,7 +66,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
6766

6867
override def getStartOffset(): Offset = offset
6968

70-
override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = {
69+
override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = {
7170
val partitionStartMap = offset match {
7271
case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
7372
case off =>
@@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
9190
i,
9291
numPartitions,
9392
perPartitionRate)
94-
.asInstanceOf[InputPartition[Row]]
93+
.asInstanceOf[InputPartition[InternalRow]]
9594
}.asJava
9695
}
9796

@@ -119,9 +118,10 @@ case class RateStreamContinuousInputPartition(
119118
partitionIndex: Int,
120119
increment: Long,
121120
rowsPerSecond: Double)
122-
extends ContinuousInputPartition[Row] {
121+
extends ContinuousInputPartition[InternalRow] {
123122

124-
override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[Row] = {
123+
override def createContinuousReader(
124+
offset: PartitionOffset): InputPartitionReader[InternalRow] = {
125125
val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset]
126126
require(rateStreamOffset.partition == partitionIndex,
127127
s"Expected partitionIndex: $partitionIndex, but got: ${rateStreamOffset.partition}")
@@ -133,7 +133,7 @@ case class RateStreamContinuousInputPartition(
133133
rowsPerSecond)
134134
}
135135

136-
override def createPartitionReader(): InputPartitionReader[Row] =
136+
override def createPartitionReader(): InputPartitionReader[InternalRow] =
137137
new RateStreamContinuousInputPartitionReader(
138138
startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
139139
}
@@ -144,12 +144,12 @@ class RateStreamContinuousInputPartitionReader(
144144
partitionIndex: Int,
145145
increment: Long,
146146
rowsPerSecond: Double)
147-
extends ContinuousInputPartitionReader[Row] {
147+
extends ContinuousInputPartitionReader[InternalRow] {
148148
private var nextReadTime: Long = startTimeMs
149149
private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong
150150

151151
private var currentValue = startValue
152-
private var currentRow: Row = null
152+
private var currentRow: InternalRow = null
153153

154154
override def next(): Boolean = {
155155
currentValue += increment
@@ -165,14 +165,14 @@ class RateStreamContinuousInputPartitionReader(
165165
return false
166166
}
167167

168-
currentRow = Row(
169-
DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(nextReadTime)),
168+
currentRow = InternalRow(
169+
DateTimeUtils.fromMillis(nextReadTime),
170170
currentValue)
171171

172172
true
173173
}
174174

175-
override def get: Row = currentRow
175+
override def get: InternalRow = currentRow
176176

177177
override def close(): Unit = {}
178178

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,19 @@ import java.util.concurrent.atomic.AtomicInteger
2323
import javax.annotation.concurrent.GuardedBy
2424

2525
import scala.collection.JavaConverters._
26-
import scala.collection.SortedMap
2726
import scala.collection.mutable.ListBuffer
2827

2928
import org.json4s.NoTypeHints
3029
import org.json4s.jackson.Serialization
3130

3231
import org.apache.spark.{SparkEnv, TaskContext}
3332
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
34-
import org.apache.spark.sql.{Encoder, Row, SQLContext}
33+
import org.apache.spark.sql.{Encoder, SQLContext}
34+
import org.apache.spark.sql.catalyst.InternalRow
3535
import org.apache.spark.sql.execution.streaming._
3636
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream.GetRecord
3737
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions}
38-
import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsDeprecatedScanRow}
38+
import org.apache.spark.sql.sources.v2.reader.InputPartition
3939
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
4040
import org.apache.spark.sql.types.StructType
4141
import org.apache.spark.util.RpcUtils
@@ -49,8 +49,7 @@ import org.apache.spark.util.RpcUtils
4949
* the specified offset within the list, or null if that offset doesn't yet have a record.
5050
*/
5151
class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
52-
extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport
53-
with SupportsDeprecatedScanRow {
52+
extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport {
5453
private implicit val formats = Serialization.formats(NoTypeHints)
5554

5655
protected val logicalPlan =
@@ -100,7 +99,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
10099
)
101100
}
102101

103-
override def planRowInputPartitions(): ju.List[InputPartition[Row]] = {
102+
override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
104103
synchronized {
105104
val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
106105
endpointRef =
@@ -109,7 +108,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
109108
startOffset.partitionNums.map {
110109
case (part, index) =>
111110
new ContinuousMemoryStreamInputPartition(
112-
endpointName, part, index): InputPartition[Row]
111+
endpointName, part, index): InputPartition[InternalRow]
113112
}.toList.asJava
114113
}
115114
}
@@ -141,7 +140,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
141140
val buf = records(part)
142141
val record = if (buf.size <= index) None else Some(buf(index))
143142

144-
context.reply(record.map(Row(_)))
143+
context.reply(record.map(r => encoder.toRow(r).copy()))
145144
}
146145
}
147146
}
@@ -164,7 +163,7 @@ object ContinuousMemoryStream {
164163
class ContinuousMemoryStreamInputPartition(
165164
driverEndpointName: String,
166165
partition: Int,
167-
startOffset: Int) extends InputPartition[Row] {
166+
startOffset: Int) extends InputPartition[InternalRow] {
168167
override def createPartitionReader: ContinuousMemoryStreamInputPartitionReader =
169168
new ContinuousMemoryStreamInputPartitionReader(driverEndpointName, partition, startOffset)
170169
}
@@ -177,14 +176,14 @@ class ContinuousMemoryStreamInputPartition(
177176
class ContinuousMemoryStreamInputPartitionReader(
178177
driverEndpointName: String,
179178
partition: Int,
180-
startOffset: Int) extends ContinuousInputPartitionReader[Row] {
179+
startOffset: Int) extends ContinuousInputPartitionReader[InternalRow] {
181180
private val endpoint = RpcUtils.makeDriverRef(
182181
driverEndpointName,
183182
SparkEnv.get.conf,
184183
SparkEnv.get.rpcEnv)
185184

186185
private var currentOffset = startOffset
187-
private var current: Option[Row] = None
186+
private var current: Option[InternalRow] = None
188187

189188
// Defense-in-depth against failing to propagate the task context. Since it's not inheritable,
190189
// we have to do a bit of error prone work to get it into every thread used by continuous
@@ -204,15 +203,15 @@ class ContinuousMemoryStreamInputPartitionReader(
204203
true
205204
}
206205

207-
override def get(): Row = current.get
206+
override def get(): InternalRow = current.get
208207

209208
override def close(): Unit = {}
210209

211210
override def getOffset: ContinuousMemoryStreamPartitionOffset =
212211
ContinuousMemoryStreamPartitionOffset(partition, currentOffset)
213212

214-
private def getRecord: Option[Row] =
215-
endpoint.askSync[Option[Row]](
213+
private def getRecord: Option[InternalRow] =
214+
endpoint.askSync[Option[InternalRow]](
216215
GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset)))
217216
}
218217

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.commons.io.IOUtils
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.network.util.JavaUtils
3131
import org.apache.spark.sql.{Row, SparkSession}
32+
import org.apache.spark.sql.catalyst.InternalRow
3233
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3334
import org.apache.spark.sql.execution.streaming._
3435
import org.apache.spark.sql.sources.v2.DataSourceOptions
@@ -38,7 +39,7 @@ import org.apache.spark.sql.types.StructType
3839
import org.apache.spark.util.{ManualClock, SystemClock}
3940

4041
class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: String)
41-
extends MicroBatchReader with SupportsDeprecatedScanRow with Logging {
42+
extends MicroBatchReader with Logging {
4243
import RateStreamProvider._
4344

4445
private[sources] val clock = {
@@ -134,7 +135,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
134135
LongOffset(json.toLong)
135136
}
136137

137-
override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = {
138+
override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = {
138139
val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L)
139140
val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
140141
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
@@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
169170
(0 until numPartitions).map { p =>
170171
new RateStreamMicroBatchInputPartition(
171172
p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
172-
: InputPartition[Row]
173+
: InputPartition[InternalRow]
173174
}.toList.asJava
174175
}
175176

@@ -188,9 +189,9 @@ class RateStreamMicroBatchInputPartition(
188189
rangeStart: Long,
189190
rangeEnd: Long,
190191
localStartTimeMs: Long,
191-
relativeMsPerValue: Double) extends InputPartition[Row] {
192+
relativeMsPerValue: Double) extends InputPartition[InternalRow] {
192193

193-
override def createPartitionReader(): InputPartitionReader[Row] =
194+
override def createPartitionReader(): InputPartitionReader[InternalRow] =
194195
new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd,
195196
localStartTimeMs, relativeMsPerValue)
196197
}
@@ -201,22 +202,18 @@ class RateStreamMicroBatchInputPartitionReader(
201202
rangeStart: Long,
202203
rangeEnd: Long,
203204
localStartTimeMs: Long,
204-
relativeMsPerValue: Double) extends InputPartitionReader[Row] {
205+
relativeMsPerValue: Double) extends InputPartitionReader[InternalRow] {
205206
private var count: Long = 0
206207

207208
override def next(): Boolean = {
208209
rangeStart + partitionId + numPartitions * count < rangeEnd
209210
}
210211

211-
override def get(): Row = {
212+
override def get(): InternalRow = {
212213
val currValue = rangeStart + partitionId + numPartitions * count
213214
count += 1
214215
val relative = math.round((currValue - rangeStart) * relativeMsPerValue)
215-
Row(
216-
DateTimeUtils.toJavaTimestamp(
217-
DateTimeUtils.fromMillis(relative + localStartTimeMs)),
218-
currValue
219-
)
216+
InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), currValue)
220217
}
221218

222219
override def close(): Unit = {}

0 commit comments

Comments
 (0)