Skip to content

Commit dcbdeb4

Browse files
cloud-fanrdblue
authored andcommitted
[SPARK-24971][SQL] remove SupportsDeprecatedScanRow
This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21921 from cloud-fan/row.
1 parent 680fb54 commit dcbdeb4

File tree

9 files changed

+51
-226
lines changed

9 files changed

+51
-226
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
* pruning), etc. Names of these interfaces start with `SupportsPushDown`.
4040
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
4141
* Names of these interfaces start with `SupportsReporting`.
42-
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc.
43-
* Names of these interfaces start with `SupportsScan`. Note that a reader should only
44-
* implement at most one of the special scans, if more than one special scans are implemented,
45-
* only one of them would be respected, according to the priority list from high to low:
46-
* {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}.
42+
* 3. Columnar scan if implements {@link SupportsScanColumnarBatch}.
4743
*
4844
* If an exception was throw when applying any of these query optimizations, the action will fail
4945
* and no Spark job will be submitted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
*
2929
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}
3030
* for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data
31-
* source readers that mix in {@link SupportsScanColumnarBatch}, or {@link org.apache.spark.sql.Row}
32-
* for data source readers that mix in {@link SupportsDeprecatedScanRow}.
31+
* source readers that mix in {@link SupportsScanColumnarBatch}.
3332
*/
3433
@InterfaceStability.Evolving
3534
public interface InputPartitionReader<T> extends Closeable {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.spark.rdd.RDD
23-
import org.apache.spark.sql.Row
2423
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
2624
import org.apache.spark.sql.catalyst.expressions._
2725
import org.apache.spark.sql.execution.LeafExecNode
2826
import org.apache.spark.sql.execution.metric.SQLMetrics
2927
import org.apache.spark.sql.sources.v2.DataSourceV2
3028
import org.apache.spark.sql.sources.v2.reader._
31-
import org.apache.spark.sql.types.StructType
3229

3330
/**
3431
* Physical plan node for scanning data from a data source.
@@ -54,13 +51,8 @@ case class DataSourceV2ScanExec(
5451
Seq(output, source, options).hashCode()
5552
}
5653

57-
private lazy val partitions: Seq[InputPartition[InternalRow]] = reader match {
58-
case r: SupportsDeprecatedScanRow =>
59-
r.planRowInputPartitions().asScala.map {
60-
new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[InternalRow]
61-
}
62-
case _ =>
63-
reader.planInputPartitions().asScala
54+
private lazy val partitions: Seq[InputPartition[InternalRow]] = {
55+
reader.planInputPartitions().asScala
6456
}
6557

6658
private lazy val inputRDD: RDD[InternalRow] = reader match {
@@ -79,27 +71,3 @@ case class DataSourceV2ScanExec(
7971
}
8072
}
8173
}
82-
83-
class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
84-
extends InputPartition[InternalRow] {
85-
86-
override def preferredLocations: Array[String] = partition.preferredLocations
87-
88-
override def createPartitionReader: InputPartitionReader[InternalRow] = {
89-
new RowToUnsafeInputPartitionReader(
90-
partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
91-
}
92-
}
93-
94-
class RowToUnsafeInputPartitionReader(
95-
val rowReader: InputPartitionReader[Row],
96-
encoder: ExpressionEncoder[Row])
97-
98-
extends InputPartitionReader[InternalRow] {
99-
100-
override def next: Boolean = rowReader.next
101-
102-
override def get: UnsafeRow = encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow]
103-
104-
override def close(): Unit = rowReader.close()
105-
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import java.io.IOException;
2121
import java.util.*;
2222

23-
import org.apache.spark.sql.Row;
24-
import org.apache.spark.sql.catalyst.expressions.GenericRow;
23+
import org.apache.spark.sql.catalyst.InternalRow;
24+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
2525
import org.apache.spark.sql.sources.Filter;
2626
import org.apache.spark.sql.sources.GreaterThan;
2727
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -33,7 +33,7 @@
3333
public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
3434

3535
public class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
36-
SupportsPushDownFilters, SupportsDeprecatedScanRow {
36+
SupportsPushDownFilters {
3737

3838
// Exposed for testing.
3939
public StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
@@ -79,8 +79,8 @@ public Filter[] pushedFilters() {
7979
}
8080

8181
@Override
82-
public List<InputPartition<Row>> planRowInputPartitions() {
83-
List<InputPartition<Row>> res = new ArrayList<>();
82+
public List<InputPartition<InternalRow>> planInputPartitions() {
83+
List<InputPartition<InternalRow>> res = new ArrayList<>();
8484

8585
Integer lowerBound = null;
8686
for (Filter filter : filters) {
@@ -107,7 +107,8 @@ public List<InputPartition<Row>> planRowInputPartitions() {
107107
}
108108
}
109109

110-
static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
110+
static class JavaAdvancedInputPartition implements InputPartition<InternalRow>,
111+
InputPartitionReader<InternalRow> {
111112
private int start;
112113
private int end;
113114
private StructType requiredSchema;
@@ -119,7 +120,7 @@ static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPar
119120
}
120121

121122
@Override
122-
public InputPartitionReader<Row> createPartitionReader() {
123+
public InputPartitionReader<InternalRow> createPartitionReader() {
123124
return new JavaAdvancedInputPartition(start - 1, end, requiredSchema);
124125
}
125126

@@ -130,7 +131,7 @@ public boolean next() {
130131
}
131132

132133
@Override
133-
public Row get() {
134+
public InternalRow get() {
134135
Object[] values = new Object[requiredSchema.size()];
135136
for (int i = 0; i < values.length; i++) {
136137
if ("i".equals(requiredSchema.apply(i).name())) {
@@ -139,7 +140,7 @@ public Row get() {
139140
values[i] = -start;
140141
}
141142
}
142-
return new GenericRow(values);
143+
return new GenericInternalRow(values);
143144
}
144145

145146
@Override

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@
1919

2020
import java.util.List;
2121

22-
import org.apache.spark.sql.Row;
22+
import org.apache.spark.sql.catalyst.InternalRow;
2323
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2424
import org.apache.spark.sql.sources.v2.DataSourceV2;
2525
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
2626
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
2727
import org.apache.spark.sql.sources.v2.reader.InputPartition;
28-
import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
2928
import org.apache.spark.sql.types.StructType;
3029

3130
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
3231

33-
class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
32+
class Reader implements DataSourceReader {
3433
private final StructType schema;
3534

3635
Reader(StructType schema) {
@@ -43,7 +42,7 @@ public StructType readSchema() {
4342
}
4443

4544
@Override
46-
public List<InputPartition<Row>> planRowInputPartitions() {
45+
public List<InputPartition<InternalRow>> planInputPartitions() {
4746
return java.util.Collections.emptyList();
4847
}
4948
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,19 @@
2020
import java.io.IOException;
2121
import java.util.List;
2222

23-
import org.apache.spark.sql.Row;
24-
import org.apache.spark.sql.catalyst.expressions.GenericRow;
23+
import org.apache.spark.sql.catalyst.InternalRow;
24+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
2525
import org.apache.spark.sql.sources.v2.DataSourceV2;
2626
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2727
import org.apache.spark.sql.sources.v2.ReadSupport;
2828
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
2929
import org.apache.spark.sql.sources.v2.reader.InputPartition;
3030
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
31-
import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
3231
import org.apache.spark.sql.types.StructType;
3332

3433
public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
3534

36-
class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
35+
class Reader implements DataSourceReader {
3736
private final StructType schema = new StructType().add("i", "int").add("j", "int");
3837

3938
@Override
@@ -42,14 +41,16 @@ public StructType readSchema() {
4241
}
4342

4443
@Override
45-
public List<InputPartition<Row>> planRowInputPartitions() {
44+
public List<InputPartition<InternalRow>> planInputPartitions() {
4645
return java.util.Arrays.asList(
4746
new JavaSimpleInputPartition(0, 5),
4847
new JavaSimpleInputPartition(5, 10));
4948
}
5049
}
5150

52-
static class JavaSimpleInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
51+
static class JavaSimpleInputPartition implements InputPartition<InternalRow>,
52+
InputPartitionReader<InternalRow> {
53+
5354
private int start;
5455
private int end;
5556

@@ -59,7 +60,7 @@ static class JavaSimpleInputPartition implements InputPartition<Row>, InputParti
5960
}
6061

6162
@Override
62-
public InputPartitionReader<Row> createPartitionReader() {
63+
public InputPartitionReader<InternalRow> createPartitionReader() {
6364
return new JavaSimpleInputPartition(start - 1, end);
6465
}
6566

@@ -70,8 +71,8 @@ public boolean next() {
7071
}
7172

7273
@Override
73-
public Row get() {
74-
return new GenericRow(new Object[] {start, -start});
74+
public InternalRow get() {
75+
return new GenericInternalRow(new Object[] {start, -start});
7576
}
7677

7778
@Override

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java

Lines changed: 0 additions & 90 deletions
This file was deleted.

0 commit comments

Comments
 (0)