-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-24971][SQL] remove SupportsDeprecatedScanRow #21921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import org.apache.commons.io.IOUtils | |
import org.apache.spark.internal.Logging | ||
import org.apache.spark.network.util.JavaUtils | ||
import org.apache.spark.sql.{Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
import org.apache.spark.sql.execution.streaming._ | ||
import org.apache.spark.sql.sources.v2.DataSourceOptions | ||
|
@@ -38,7 +39,7 @@ import org.apache.spark.sql.types.StructType | |
import org.apache.spark.util.{ManualClock, SystemClock} | ||
|
||
class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: String) | ||
extends MicroBatchReader with SupportsDeprecatedScanRow with Logging { | ||
extends MicroBatchReader with Logging { | ||
import RateStreamProvider._ | ||
|
||
private[sources] val clock = { | ||
|
@@ -134,7 +135,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: | |
LongOffset(json.toLong) | ||
} | ||
|
||
override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = { | ||
override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = { | ||
val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L) | ||
val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L) | ||
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)") | ||
|
@@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: | |
(0 until numPartitions).map { p => | ||
new RateStreamMicroBatchInputPartition( | ||
p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue) | ||
: InputPartition[Row] | ||
: InputPartition[InternalRow] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed? Doesn't RateStreamMicroBatchInputPartition implement InputPartition[InternalRow]? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fine since it isn't a cast, but it's generally better to check whether these are still necessary after refactoring. |
||
}.toList.asJava | ||
} | ||
|
||
|
@@ -188,9 +189,9 @@ class RateStreamMicroBatchInputPartition( | |
rangeStart: Long, | ||
rangeEnd: Long, | ||
localStartTimeMs: Long, | ||
relativeMsPerValue: Double) extends InputPartition[Row] { | ||
relativeMsPerValue: Double) extends InputPartition[InternalRow] { | ||
|
||
override def createPartitionReader(): InputPartitionReader[Row] = | ||
override def createPartitionReader(): InputPartitionReader[InternalRow] = | ||
new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd, | ||
localStartTimeMs, relativeMsPerValue) | ||
} | ||
|
@@ -201,22 +202,18 @@ class RateStreamMicroBatchInputPartitionReader( | |
rangeStart: Long, | ||
rangeEnd: Long, | ||
localStartTimeMs: Long, | ||
relativeMsPerValue: Double) extends InputPartitionReader[Row] { | ||
relativeMsPerValue: Double) extends InputPartitionReader[InternalRow] { | ||
private var count: Long = 0 | ||
|
||
override def next(): Boolean = { | ||
rangeStart + partitionId + numPartitions * count < rangeEnd | ||
} | ||
|
||
override def get(): Row = { | ||
override def get(): InternalRow = { | ||
val currValue = rangeStart + partitionId + numPartitions * count | ||
count += 1 | ||
val relative = math.round((currValue - rangeStart) * relativeMsPerValue) | ||
Row( | ||
DateTimeUtils.toJavaTimestamp( | ||
DateTimeUtils.fromMillis(relative + localStartTimeMs)), | ||
currValue | ||
) | ||
InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), currValue) | ||
} | ||
|
||
override def close(): Unit = {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this cast necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't dig into it as the cast was already there. The reason seems to be,
java.util.List
isn't covariant.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good idea to leave casts. Can you check to see if this can be avoided? I found in #21118 that many of the casts were unnecessary if variables had declared types and it is much better to avoid explicit casts that work around the type system.