Skip to content

Commit 680fb54

Browse files
committed
[SPARK-23325] Use InternalRow when reading with DataSourceV2.
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. This uses existing tests. Author: Ryan Blue <blue@apache.org> Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
1 parent 9b85abe commit 680fb54

File tree

11 files changed

+58
-60
lines changed

11 files changed

+58
-60
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.List;
2121

2222
import org.apache.spark.annotation.InterfaceStability;
23-
import org.apache.spark.sql.Row;
23+
import org.apache.spark.sql.catalyst.InternalRow;
2424
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2525
import org.apache.spark.sql.sources.v2.ReadSupport;
2626
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
@@ -40,7 +40,10 @@
4040
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
4141
* Names of these interfaces start with `SupportsReporting`.
4242
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc.
43-
* Names of these interfaces start with `SupportsScan`.
43+
* Names of these interfaces start with `SupportsScan`. Note that a reader should only
44+
* implement at most one of the special scans, if more than one special scans are implemented,
45+
* only one of them would be respected, according to the priority list from high to low:
46+
* {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}.
4447
*
4548
* If an exception was throw when applying any of these query optimizations, the action will fail
4649
* and no Spark job will be submitted.
@@ -73,5 +76,5 @@ public interface DataSourceReader {
7376
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
7477
* submitted.
7578
*/
76-
List<InputPartition<Row>> planInputPartitions();
79+
List<InputPartition<InternalRow>> planInputPartitions();
7780
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626
* An input partition reader returned by {@link InputPartition#createPartitionReader()} and is
2727
* responsible for outputting data for a RDD partition.
2828
*
29-
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal input
30-
* partition readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for input
31-
* partition readers that mix in {@link SupportsScanUnsafeRow}.
29+
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}
30+
* for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data
31+
* source readers that mix in {@link SupportsScanColumnarBatch}, or {@link org.apache.spark.sql.Row}
32+
* for data source readers that mix in {@link SupportsDeprecatedScanRow}.
3233
*/
3334
@InterfaceStability.Evolving
3435
public interface InputPartitionReader<T> extends Closeable {

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java renamed to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,23 @@
1717

1818
package org.apache.spark.sql.sources.v2.reader;
1919

20-
import java.util.List;
21-
2220
import org.apache.spark.annotation.InterfaceStability;
2321
import org.apache.spark.sql.Row;
24-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
22+
import org.apache.spark.sql.catalyst.InternalRow;
23+
24+
import java.util.List;
2525

2626
/**
2727
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
28-
* interface to output {@link UnsafeRow} directly and avoid the row copy at Spark side.
29-
* This is an experimental and unstable interface, as {@link UnsafeRow} is not public and may get
30-
* changed in the future Spark versions.
28+
* interface to output {@link Row} instead of {@link InternalRow}.
29+
* This is an experimental and unstable interface.
3130
*/
3231
@InterfaceStability.Unstable
33-
public interface SupportsScanUnsafeRow extends DataSourceReader {
34-
35-
@Override
36-
default List<InputPartition<Row>> planInputPartitions() {
32+
public interface SupportsDeprecatedScanRow extends DataSourceReader {
33+
default List<InputPartition<InternalRow>> planInputPartitions() {
3734
throw new IllegalStateException(
38-
"planInputPartitions not supported by default within SupportsScanUnsafeRow");
35+
"planInputPartitions not supported by default within SupportsDeprecatedScanRow");
3936
}
4037

41-
/**
42-
* Similar to {@link DataSourceReader#planInputPartitions()},
43-
* but returns data in unsafe row format.
44-
*/
45-
List<InputPartition<UnsafeRow>> planUnsafeInputPartitions();
38+
List<InputPartition<Row>> planRowInputPartitions();
4639
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ case class DataSourceV2ScanExec(
5454
Seq(output, source, options).hashCode()
5555
}
5656

57-
private lazy val partitions: Seq[InputPartition[UnsafeRow]] = reader match {
58-
case r: SupportsScanUnsafeRow => r.planUnsafeInputPartitions().asScala
59-
case _ =>
60-
reader.planInputPartitions().asScala.map {
61-
new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[UnsafeRow]
57+
private lazy val partitions: Seq[InputPartition[InternalRow]] = reader match {
58+
case r: SupportsDeprecatedScanRow =>
59+
r.planRowInputPartitions().asScala.map {
60+
new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[InternalRow]
6261
}
62+
case _ =>
63+
reader.planInputPartitions().asScala
6364
}
6465

6566
private lazy val inputRDD: RDD[InternalRow] = reader match {
@@ -80,11 +81,11 @@ case class DataSourceV2ScanExec(
8081
}
8182

8283
class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
83-
extends InputPartition[UnsafeRow] {
84+
extends InputPartition[InternalRow] {
8485

8586
override def preferredLocations: Array[String] = partition.preferredLocations
8687

87-
override def createPartitionReader: InputPartitionReader[UnsafeRow] = {
88+
override def createPartitionReader: InputPartitionReader[InternalRow] = {
8889
new RowToUnsafeInputPartitionReader(
8990
partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
9091
}
@@ -94,7 +95,7 @@ class RowToUnsafeInputPartitionReader(
9495
val rowReader: InputPartitionReader[Row],
9596
encoder: ExpressionEncoder[Row])
9697

97-
extends InputPartitionReader[UnsafeRow] {
98+
extends InputPartitionReader[InternalRow] {
9899

99100
override def next: Boolean = rowReader.next
100101

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,8 @@ object DataSourceV2Strategy extends Strategy {
124124
val filterCondition = postScanFilters.reduceLeftOption(And)
125125
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
126126

127-
val withProjection = if (withFilter.output != project) {
128-
ProjectExec(project, withFilter)
129-
} else {
130-
withFilter
131-
}
132-
133-
withProjection :: Nil
127+
// always add the projection, which will produce unsafe rows required by some operators
128+
ProjectExec(project, withFilter) :: Nil
134129

135130
case WriteToDataSourceV2(writer, query) =>
136131
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
3434

3535
public class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
36-
SupportsPushDownFilters {
36+
SupportsPushDownFilters, SupportsDeprecatedScanRow {
3737

3838
// Exposed for testing.
3939
public StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
@@ -79,7 +79,7 @@ public Filter[] pushedFilters() {
7979
}
8080

8181
@Override
82-
public List<InputPartition<Row>> planInputPartitions() {
82+
public List<InputPartition<Row>> planRowInputPartitions() {
8383
List<InputPartition<Row>> res = new ArrayList<>();
8484

8585
Integer lowerBound = null;

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
2626
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
2727
import org.apache.spark.sql.sources.v2.reader.InputPartition;
28+
import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
2829
import org.apache.spark.sql.types.StructType;
2930

3031
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
3132

32-
class Reader implements DataSourceReader {
33+
class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
3334
private final StructType schema;
3435

3536
Reader(StructType schema) {
@@ -42,7 +43,7 @@ public StructType readSchema() {
4243
}
4344

4445
@Override
45-
public List<InputPartition<Row>> planInputPartitions() {
46+
public List<InputPartition<Row>> planRowInputPartitions() {
4647
return java.util.Collections.emptyList();
4748
}
4849
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
2929
import org.apache.spark.sql.sources.v2.reader.InputPartition;
3030
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
31+
import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
3132
import org.apache.spark.sql.types.StructType;
3233

3334
public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
3435

35-
class Reader implements DataSourceReader {
36+
class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
3637
private final StructType schema = new StructType().add("i", "int").add("j", "int");
3738

3839
@Override
@@ -41,7 +42,7 @@ public StructType readSchema() {
4142
}
4243

4344
@Override
44-
public List<InputPartition<Row>> planInputPartitions() {
45+
public List<InputPartition<Row>> planRowInputPartitions() {
4546
return java.util.Arrays.asList(
4647
new JavaSimpleInputPartition(0, 5),
4748
new JavaSimpleInputPartition(5, 10));

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.util.List;
2222

23+
import org.apache.spark.sql.catalyst.InternalRow;
2324
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
2425
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2526
import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -29,7 +30,7 @@
2930

3031
public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
3132

32-
class Reader implements DataSourceReader, SupportsScanUnsafeRow {
33+
class Reader implements DataSourceReader {
3334
private final StructType schema = new StructType().add("i", "int").add("j", "int");
3435

3536
@Override
@@ -38,15 +39,15 @@ public StructType readSchema() {
3839
}
3940

4041
@Override
41-
public List<InputPartition<UnsafeRow>> planUnsafeInputPartitions() {
42+
public List<InputPartition<InternalRow>> planInputPartitions() {
4243
return java.util.Arrays.asList(
4344
new JavaUnsafeRowInputPartition(0, 5),
4445
new JavaUnsafeRowInputPartition(5, 10));
4546
}
4647
}
4748

4849
static class JavaUnsafeRowInputPartition
49-
implements InputPartition<UnsafeRow>, InputPartitionReader<UnsafeRow> {
50+
implements InputPartition<InternalRow>, InputPartitionReader<InternalRow> {
5051
private int start;
5152
private int end;
5253
private UnsafeRow row;
@@ -59,7 +60,7 @@ static class JavaUnsafeRowInputPartition
5960
}
6061

6162
@Override
62-
public InputPartitionReader<UnsafeRow> createPartitionReader() {
63+
public InputPartitionReader<InternalRow> createPartitionReader() {
6364
return new JavaUnsafeRowInputPartition(start - 1, end);
6465
}
6566

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import test.org.apache.spark.sql.sources.v2._
2323

2424
import org.apache.spark.{SparkConf, SparkException}
2525
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
26+
import org.apache.spark.sql.catalyst.InternalRow
2627
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2728
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
2829
import org.apache.spark.sql.functions._
@@ -283,10 +284,10 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
283284

284285
class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
285286

286-
class Reader extends DataSourceReader {
287+
class Reader extends DataSourceReader with SupportsDeprecatedScanRow {
287288
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
288289

289-
override def planInputPartitions(): JList[InputPartition[Row]] = {
290+
override def planRowInputPartitions(): JList[InputPartition[Row]] = {
290291
java.util.Arrays.asList(new SimpleInputPartition(0, 5), new SimpleInputPartition(5, 10))
291292
}
292293
}
@@ -316,7 +317,7 @@ class SimpleInputPartition(start: Int, end: Int)
316317

317318
class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
318319

319-
class Reader extends DataSourceReader
320+
class Reader extends DataSourceReader with SupportsDeprecatedScanRow
320321
with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
321322

322323
var requiredSchema = new StructType().add("i", "int").add("j", "int")
@@ -341,7 +342,7 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
341342
requiredSchema
342343
}
343344

344-
override def planInputPartitions(): JList[InputPartition[Row]] = {
345+
override def planRowInputPartitions(): JList[InputPartition[Row]] = {
345346
val lowerBound = filters.collect {
346347
case GreaterThan("i", v: Int) => v
347348
}.headOption
@@ -393,10 +394,10 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
393394

394395
class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
395396

396-
class Reader extends DataSourceReader with SupportsScanUnsafeRow {
397+
class Reader extends DataSourceReader {
397398
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
398399

399-
override def planUnsafeInputPartitions(): JList[InputPartition[UnsafeRow]] = {
400+
override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
400401
java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5),
401402
new UnsafeRowInputPartitionReader(5, 10))
402403
}
@@ -406,14 +407,14 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
406407
}
407408

408409
class UnsafeRowInputPartitionReader(start: Int, end: Int)
409-
extends InputPartition[UnsafeRow] with InputPartitionReader[UnsafeRow] {
410+
extends InputPartition[InternalRow] with InputPartitionReader[InternalRow] {
410411

411412
private val row = new UnsafeRow(2)
412413
row.pointTo(new Array[Byte](8 * 3), 8 * 3)
413414

414415
private var current = start - 1
415416

416-
override def createPartitionReader(): InputPartitionReader[UnsafeRow] = this
417+
override def createPartitionReader(): InputPartitionReader[InternalRow] = this
417418

418419
override def next(): Boolean = {
419420
current += 1
@@ -430,8 +431,8 @@ class UnsafeRowInputPartitionReader(start: Int, end: Int)
430431

431432
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
432433

433-
class Reader(val readSchema: StructType) extends DataSourceReader {
434-
override def planInputPartitions(): JList[InputPartition[Row]] =
434+
class Reader(val readSchema: StructType) extends DataSourceReader with SupportsDeprecatedScanRow {
435+
override def planRowInputPartitions(): JList[InputPartition[Row]] =
435436
java.util.Collections.emptyList()
436437
}
437438

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
2828
import org.apache.spark.SparkContext
2929
import org.apache.spark.sql.{Row, SaveMode}
3030
import org.apache.spark.sql.catalyst.InternalRow
31-
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader}
31+
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
3232
import org.apache.spark.sql.sources.v2.writer._
3333
import org.apache.spark.sql.types.{DataType, StructType}
3434
import org.apache.spark.util.SerializableConfiguration
@@ -42,10 +42,11 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
4242

4343
private val schema = new StructType().add("i", "long").add("j", "long")
4444

45-
class Reader(path: String, conf: Configuration) extends DataSourceReader {
45+
class Reader(path: String, conf: Configuration) extends DataSourceReader
46+
with SupportsDeprecatedScanRow {
4647
override def readSchema(): StructType = schema
4748

48-
override def planInputPartitions(): JList[InputPartition[Row]] = {
49+
override def planRowInputPartitions(): JList[InputPartition[Row]] = {
4950
val dataPath = new Path(path)
5051
val fs = dataPath.getFileSystem(conf)
5152
if (fs.exists(dataPath)) {

0 commit comments

Comments
 (0)