Skip to content

Commit cd7f5a7

Browse files
cloud-fanrdblue
authored andcommitted
[SPARK-24990][SQL] merge ReadSupport and ReadSupportWithSchema
Regarding user-specified schema, data sources may have 3 different behaviors: 1. must have a user-specified schema 2. can't have a user-specified schema 3. can accept the user-specified if it's given, or infer the schema. I added `ReadSupportWithSchema` to support these behaviors, following data source v1. But it turns out we don't need this extra interface. We can just add a `createReader(schema, options)` to `ReadSupport` and make it call `createReader(options)` by default. TODO: also fix the streaming API in followup PRs. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21946 from cloud-fan/ds-schema.
1 parent dcbdeb4 commit cd7f5a7

File tree

6 files changed

+46
-78
lines changed

6 files changed

+46
-78
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.sql.sources.v2;
1919

2020
import org.apache.spark.annotation.InterfaceStability;
21+
import org.apache.spark.sql.sources.DataSourceRegister;
2122
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
23+
import org.apache.spark.sql.types.StructType;
2224

2325
/**
2426
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
@@ -27,6 +29,29 @@
2729
@InterfaceStability.Evolving
2830
public interface ReadSupport extends DataSourceV2 {
2931

32+
/**
33+
* Creates a {@link DataSourceReader} to scan the data from this data source.
34+
*
35+
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
36+
* submitted.
37+
*
38+
* @param schema the user specified schema.
39+
* @param options the options for the returned data source reader, which is an immutable
40+
* case-insensitive string-to-string map.
41+
*
42+
* By default this method throws {@link UnsupportedOperationException}, implementations should
43+
* override this method to handle user specified schema.
44+
*/
45+
default DataSourceReader createReader(StructType schema, DataSourceOptions options) {
46+
String name;
47+
if (this instanceof DataSourceRegister) {
48+
name = ((DataSourceRegister) this).shortName();
49+
} else {
50+
name = this.getClass().getName();
51+
}
52+
throw new UnsupportedOperationException(name + " does not support user specified schema");
53+
}
54+
3055
/**
3156
* Creates a {@link DataSourceReader} to scan the data from this data source.
3257
*

sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@
2323
import org.apache.spark.sql.catalyst.InternalRow;
2424
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2525
import org.apache.spark.sql.sources.v2.ReadSupport;
26-
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
2726
import org.apache.spark.sql.types.StructType;
2827

2928
/**
3029
* A data source reader that is returned by
3130
* {@link ReadSupport#createReader(DataSourceOptions)} or
32-
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
31+
* {@link ReadSupport#createReader(StructType, DataSourceOptions)}.
3332
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
3433
* logic is delegated to {@link InputPartition}s, which are returned by
3534
* {@link #planInputPartitions()}.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStorageFor
2828
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, NamedExpression}
2929
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics, SupportsPhysicalStats}
3030
import org.apache.spark.sql.sources.DataSourceRegister
31-
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport}
31+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport}
3232
import org.apache.spark.sql.sources.v2.reader._
3333
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
3434
import org.apache.spark.sql.types.StructType
@@ -121,22 +121,6 @@ object DataSourceV2Relation {
121121
source match {
122122
case support: ReadSupport =>
123123
support
124-
case _: ReadSupportWithSchema =>
125-
// this method is only called if there is no user-supplied schema. if there is no
126-
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
127-
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
128-
case _ =>
129-
throw new AnalysisException(s"Data source is not readable: $name")
130-
}
131-
}
132-
133-
def asReadSupportWithSchema: ReadSupportWithSchema = {
134-
source match {
135-
case support: ReadSupportWithSchema =>
136-
support
137-
case _: ReadSupport =>
138-
throw new AnalysisException(
139-
s"Data source does not support user-supplied schema: $name")
140124
case _ =>
141125
throw new AnalysisException(s"Data source is not readable: $name")
142126
}
@@ -166,7 +150,7 @@ object DataSourceV2Relation {
166150
val v2Options = new DataSourceOptions(options.asJava)
167151
userSpecifiedSchema match {
168152
case Some(s) =>
169-
asReadSupportWithSchema.createReader(s, v2Options)
153+
asReadSupport.createReader(s, v2Options)
170154
case _ =>
171155
asReadSupport.createReader(v2Options)
172156
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import org.apache.spark.sql.catalyst.InternalRow;
2323
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2424
import org.apache.spark.sql.sources.v2.DataSourceV2;
25-
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
25+
import org.apache.spark.sql.sources.v2.ReadSupport;
2626
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
2727
import org.apache.spark.sql.sources.v2.reader.InputPartition;
2828
import org.apache.spark.sql.types.StructType;
2929

30-
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
30+
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupport {
3131

3232
class Reader implements DataSourceReader {
3333
private final StructType schema;
@@ -47,6 +47,11 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
4747
}
4848
}
4949

50+
@Override
51+
public DataSourceReader createReader(DataSourceOptions options) {
52+
throw new IllegalArgumentException("requires a user-supplied schema");
53+
}
54+
5055
@Override
5156
public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
5257
return new Reader(schema);

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import java.util.{ArrayList, List => JList}
2121

2222
import test.org.apache.spark.sql.sources.v2._
2323

24-
import org.apache.spark.{SparkConf, SparkException}
25-
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
24+
import org.apache.spark.SparkException
25+
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2827
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
2928
import org.apache.spark.sql.functions._
3029
import org.apache.spark.sql.sources.{Filter, GreaterThan}
@@ -120,8 +119,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
120119
test("schema required data source") {
121120
Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
122121
withClue(cls.getName) {
123-
val e = intercept[AnalysisException](spark.read.format(cls.getName).load())
124-
assert(e.message.contains("requires a user-supplied schema"))
122+
val e = intercept[IllegalArgumentException](spark.read.format(cls.getName).load())
123+
assert(e.getMessage.contains("requires a user-supplied schema"))
125124

126125
val schema = new StructType().add("i", "int").add("s", "string")
127126
val df = spark.read.format(cls.getName).schema(schema).load()
@@ -381,13 +380,18 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
381380
}
382381

383382

384-
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
383+
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupport {
385384

386385
class Reader(val readSchema: StructType) extends DataSourceReader {
387386
override def planInputPartitions(): JList[InputPartition[InternalRow]] =
388387
java.util.Collections.emptyList()
389388
}
390389

391-
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader =
390+
override def createReader(options: DataSourceOptions): DataSourceReader = {
391+
throw new IllegalArgumentException("requires a user-supplied schema")
392+
}
393+
394+
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
392395
new Reader(schema)
396+
}
393397
}

0 commit comments

Comments
 (0)