Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
Expand All @@ -27,6 +29,29 @@
@InterfaceStability.Evolving
public interface ReadSupport extends DataSourceV2 {

/**
* Creates a {@link DataSourceReader} to scan the data from this data source.
*
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*
* @param schema the user specified schema.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
*/
default DataSourceReader createReader(StructType schema, DataSourceOptions options) {
String name;
if (this instanceof DataSourceRegister) {
name = ((DataSourceRegister) this).shortName();
} else {
name = this.getClass().getName();
}
throw new UnsupportedOperationException(name + " does not support user specified schema");
}

/**
* Creates a {@link DataSourceReader} to scan the data from this data source.
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
import org.apache.spark.sql.types.StructType;

/**
* A data source reader that is returned by
* {@link ReadSupport#createReader(DataSourceOptions)} or
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
* {@link ReadSupport#createReader(StructType, DataSourceOptions)}.
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
* logic is delegated to {@link InputPartition}s, which are returned by
* {@link #planInputPartitions()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
if (ds.isInstanceOf[ReadSupport]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = ds, conf = sparkSession.sessionState.conf)
val pathsOption = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -110,22 +110,6 @@ object DataSourceV2Relation {
source match {
case support: ReadSupport =>
support
case _: ReadSupportWithSchema =>
// this method is only called if there is no user-supplied schema. if there is no
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
}

def asReadSupportWithSchema: ReadSupportWithSchema = {
source match {
case support: ReadSupportWithSchema =>
support
case _: ReadSupport =>
throw new AnalysisException(
s"Data source does not support user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
Expand All @@ -146,7 +130,7 @@ object DataSourceV2Relation {
val v2Options = new DataSourceOptions(options.asJava)
userSpecifiedSchema match {
case Some(s) =>
asReadSupportWithSchema.createReader(s, v2Options)
asReadSupport.createReader(s, v2Options)
case _ =>
asReadSupport.createReader(v2Options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.types.StructType;

public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupport {

class Reader implements DataSourceReader {
private final StructType schema;
Expand All @@ -47,6 +47,11 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
}
}

@Override
public DataSourceReader createReader(DataSourceOptions options) {
throw new IllegalArgumentException("requires a user-supplied schema");
}

@Override
public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
return new Reader(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import java.util.{ArrayList, List => JList}
import test.org.apache.spark.sql.sources.v2._

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
Expand Down Expand Up @@ -135,8 +134,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
test("schema required data source") {
Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
withClue(cls.getName) {
val e = intercept[AnalysisException](spark.read.format(cls.getName).load())
assert(e.message.contains("requires a user-supplied schema"))
val e = intercept[IllegalArgumentException](spark.read.format(cls.getName).load())
assert(e.getMessage.contains("requires a user-supplied schema"))

val schema = new StructType().add("i", "int").add("s", "string")
val df = spark.read.format(cls.getName).schema(schema).load()
Expand Down Expand Up @@ -455,15 +454,20 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
}


class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupport {

class Reader(val readSchema: StructType) extends DataSourceReader {
override def planInputPartitions(): JList[InputPartition[InternalRow]] =
java.util.Collections.emptyList()
}

override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader =
override def createReader(options: DataSourceOptions): DataSourceReader = {
throw new IllegalArgumentException("requires a user-supplied schema")
}

override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
new Reader(schema)
}
}

class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
Expand Down